Kafka
Collect observability data from Apache Kafka topics
Configuration
Example configurations
{
"sources": {
"my_source_id": {
"type": "kafka",
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"group_id": "consumer-group-name",
"topics": [
"^(prefix1|prefix2)-.+"
]
}
}
}
[sources.my_source_id]
type = "kafka"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
group_id = "consumer-group-name"
topics = [ "^(prefix1|prefix2)-.+" ]
sources:
my_source_id:
type: kafka
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
group_id: consumer-group-name
topics:
- ^(prefix1|prefix2)-.+
{
"sources": {
"my_source_id": {
"type": "kafka",
"auto_offset_reset": "largest",
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"commit_interval_ms": 5000,
"drain_timeout_ms": 2500,
"fetch_wait_max_ms": 100,
"group_id": "consumer-group-name",
"headers_key": "headers",
"key_field": "message_key",
"librdkafka_options": {
"client.id": "${ENV_VAR}",
"fetch.error.backoff.ms": "1000",
"socket.send.buffer.bytes": "100"
},
"offset_key": "offset",
"partition_key": "partition",
"session_timeout_ms": 10000,
"socket_timeout_ms": 60000,
"topic_key": "topic",
"topics": [
"^(prefix1|prefix2)-.+"
]
}
}
}
[sources.my_source_id]
type = "kafka"
auto_offset_reset = "largest"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
commit_interval_ms = 5_000
drain_timeout_ms = 2_500
fetch_wait_max_ms = 100
group_id = "consumer-group-name"
headers_key = "headers"
key_field = "message_key"
offset_key = "offset"
partition_key = "partition"
session_timeout_ms = 10_000
socket_timeout_ms = 60_000
topic_key = "topic"
topics = [ "^(prefix1|prefix2)-.+" ]
[sources.my_source_id.librdkafka_options]
"client.id" = "${ENV_VAR}"
"fetch.error.backoff.ms" = "1000"
"socket.send.buffer.bytes" = "100"
sources:
my_source_id:
type: kafka
auto_offset_reset: largest
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
commit_interval_ms: 5000
drain_timeout_ms: 2500
fetch_wait_max_ms: 100
group_id: consumer-group-name
headers_key: headers
key_field: message_key
librdkafka_options:
client.id: ${ENV_VAR}
fetch.error.backoff.ms: "1000"
socket.send.buffer.bytes: "100"
offset_key: offset
partition_key: partition
session_timeout_ms: 10000
socket_timeout_ms: 60000
topic_key: topic
topics:
- ^(prefix1|prefix2)-.+
acknowledgements
optional objectControls how acknowledgements are handled by this source.
This setting is deprecated in favor of enabling acknowledgements
at the global or sink level.
Enabling or disabling acknowledgements at the source level has no effect on acknowledgement behavior.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional boolauto_offset_reset
optional string literalIf offsets for consumer group do not exist, set them using this strategy.
See the librdkafka documentation for the auto.offset.reset
option for further clarification.
largest
bootstrap_servers
required string literalA comma-separated list of Kafka bootstrap servers.
These are the servers in a Kafka cluster that a client should use to bootstrap its connection to the cluster, allowing discovery of all the other hosts in the cluster.
Must be in the form of host:port
, and comma-separated.
commit_interval_ms
optional uint5000
(milliseconds)decoding
optional objectdecoding.avro
required objectcodec = "avro"
decoding.avro.schema
required string literalThe Avro schema definition.
Please note that the following [apache_avro::types::Value
] variants are currently not supported:
Date
Decimal
Duration
Fixed
TimeMillis
decoding.avro.strip_schema_id_prefix
required booldecoding.codec
optional string literal enumOption | Description |
---|---|
avro | Decodes the raw bytes as as an Apache Avro message. |
bytes | Uses the raw bytes as-is. |
gelf | Decodes the raw bytes as a GELF message. This codec is experimental for the following reason: The GELF specification is more strict than the actual Graylog receiver.
Vector’s decoder currently adheres more strictly to the GELF spec, with
the exception that some characters such as Other GELF codecs such as Loki’s, use a Go SDK that is maintained by Graylog, and is much more relaxed than the GELF spec. Going forward, Vector will use that Go SDK as the reference implementation, which means the codec may continue to relax the enforcement of specification. |
influxdb | Decodes the raw bytes as an Influxdb Line Protocol message. |
json | Decodes the raw bytes as JSON. |
native | Decodes the raw bytes as native Protocol Buffers format. This codec is experimental. |
native_json | Decodes the raw bytes as native JSON format. This codec is experimental. |
protobuf | Decodes the raw bytes as protobuf. |
syslog | Decodes the raw bytes as a Syslog message. Decodes either as the RFC 3164-style format (“old” style) or the RFC 5424-style format (“new” style, includes structured data). |
vrl | Decodes the raw bytes as a string and passes them as input to a VRL program. |
bytes
decoding.gelf
optional objectcodec = "gelf"
decoding.gelf.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.influxdb
optional objectcodec = "influxdb"
decoding.influxdb.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.json
optional objectcodec = "json"
decoding.json.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.native_json
optional objectcodec = "native_json"
decoding.native_json.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.protobuf
optional objectcodec = "protobuf"
decoding.protobuf.desc_file
optional string literaldecoding.protobuf.message_type
optional string literaldecoding.syslog
optional objectcodec = "syslog"
decoding.syslog.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.vrl
required objectcodec = "vrl"
decoding.vrl.source
required string literal.
target will be used as the decoding result.
Compilation error or use of ‘abort’ in a program will result in a decoding error.decoding.vrl.timezone
optional string literalThe name of the timezone to apply to timestamp conversions that do not contain an explicit
time zone. The time zone name may be any name in the TZ database, or local
to indicate system local time.
If not set, local
will be used.
drain_timeout_ms
optional uintTimeout to drain pending acknowledgements during shutdown or a Kafka consumer group rebalance.
When Vector shuts down or the Kafka consumer group revokes partitions from this
consumer, wait a maximum of drain_timeout_ms
for the source to
process pending acknowledgements. Must be less than session_timeout_ms
to ensure the consumer is not excluded from the group during a rebalance.
Default value is half of session_timeout_ms
.
fetch_wait_max_ms
optional uint100
(milliseconds)framing
optional objectFraming configuration.
Framing handles how events are separated when encoded in a raw byte form, where each event is a frame that must be prefixed, or delimited, in a way that marks where an event begins and ends within the byte stream.
framing.character_delimited
required objectmethod = "character_delimited"
framing.character_delimited.delimiter
required ascii_charframing.character_delimited.max_length
optional uintThe maximum length of the byte buffer.
This length does not include the trailing delimiter.
By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.
If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.
framing.chunked_gelf
optional objectmethod = "chunked_gelf"
framing.chunked_gelf.decompression
optional string literal enumOption | Description |
---|---|
Auto | Automatically detect the decompression method based on the magic bytes of the message. |
Gzip | Use Gzip decompression. |
None | Do not decompress the message. |
Zlib | Use Zlib decompression. |
Auto
framing.chunked_gelf.max_length
optional uintThe maximum length of a single GELF message, in bytes. Messages longer than this length will be dropped. If this option is not set, the decoder does not limit the length of messages and the per-message memory is unbounded.
Note that a message can be composed of multiple chunks and this limit is applied to the whole message, not to individual chunks.
This limit takes only into account the message’s payload and the GELF header bytes are excluded from the calculation. The message’s payload is the concatenation of all the chunks' payloads.
framing.chunked_gelf.pending_messages_limit
optional uintframing.chunked_gelf.timeout_secs
optional float5
framing.length_delimited
required objectmethod = "length_delimited"
framing.length_delimited.length_field_is_big_endian
optional booltrue
framing.length_delimited.length_field_length
optional uint4
framing.length_delimited.length_field_offset
optional uintframing.method
optional string literal enumOption | Description |
---|---|
bytes | Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments). |
character_delimited | Byte frames which are delimited by a chosen character. |
chunked_gelf | Byte frames which are chunked GELF messages. |
length_delimited | Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length. |
newline_delimited | Byte frames which are delimited by a newline character. |
octet_counting | Byte frames according to the octet counting format. |
bytes
framing.newline_delimited
optional objectmethod = "newline_delimited"
framing.newline_delimited.max_length
optional uintThe maximum length of the byte buffer.
This length does not include the trailing delimiter.
By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.
If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.
framing.octet_counting
optional objectmethod = "octet_counting"
framing.octet_counting.max_length
optional uintgroup_id
required string literalheaders_key
optional string literalOverrides the name of the log field used to add the headers to each event.
The value is the headers of the Kafka message itself.
By default, "headers"
is used.
headers
key_field
optional string literalOverrides the name of the log field used to add the message key to each event.
The value is the message key of the Kafka message itself.
By default, "message_key"
is used.
message_key
librdkafka_options
optional objectAdvanced options set directly on the underlying librdkafka
client.
See the librdkafka documentation for details.
librdkafka_options.*
required string literalmetrics
optional objectmetrics.topic_lag_metric
optional boolkafka_consumer_lag
.false
offset_key
optional string literalOverrides the name of the log field used to add the offset to each event.
The value is the offset of the Kafka message itself.
By default, "offset"
is used.
offset
partition_key
optional string literalOverrides the name of the log field used to add the partition to each event.
The value is the partition from which the Kafka message was consumed from.
By default, "partition"
is used.
partition
sasl
optional objectsasl.enabled
optional boolEnables SASL authentication.
Only PLAIN
- and SCRAM
-based mechanisms are supported when configuring SASL authentication using sasl.*
. For
other mechanisms, librdkafka_options.*
must be used directly to configure other librdkafka
-specific values.
If using sasl.kerberos.*
as an example, where *
is service.name
, principal
, kinit.md
, etc., then
librdkafka_options.*
as a result becomes librdkafka_options.sasl.kerberos.service.name
,
librdkafka_options.sasl.kerberos.principal
, etc.
See the librdkafka documentation for details.
SASL authentication is not supported on Windows.
sasl.mechanism
optional string literaltls
optional objecttls.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.
tls.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls.crt_file
optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls.enabled
optional boolWhether or not to require TLS for incoming or outgoing connections.
When enabled and used for incoming connections, an identity certificate is also required. See tls.crt_file
for
more information.
tls.key_file
optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
tls.server_name
optional string literalServer name to use when using Server Name Indication (SNI).
Only relevant for outgoing connections.
tls.verify_certificate
optional boolEnables certificate verification. For components that create a server, this requires that the client connections have a valid client certificate. For components that initiate requests, this validates that the upstream has a valid certificate.
If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
tls.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
topic_key
optional string literalOverrides the name of the log field used to add the topic to each event.
The value is the topic from which the Kafka message was consumed from.
By default, "topic"
is used.
topic
topics
required [string]The Kafka topics names to read events from.
Regular expression syntax is supported if the topic begins with ^
.
Outputs
<component_id>
Output Data
Logs
Warning
Record
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
100
partition
kafka
2020-10-10T17:07:36.452332Z
topic
Telemetry
Metrics
linkcomponent_discarded_events_total
counterfilter
transform, or false if due to an error.component_errors_total
countercomponent_received_bytes_total
countercomponent_received_event_bytes_total
countercomponent_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_received_events_total
countercomponent_sent_event_bytes_total
countercomponent_sent_events_total
counterkafka_consumed_messages_bytes_total
counterkafka_consumed_messages_total
counterkafka_consumer_lag
gaugekafka_produced_messages_bytes_total
counterkafka_produced_messages_total
counterkafka_queue_messages
gaugekafka_queue_messages_bytes
gaugekafka_requests_bytes_total
counterkafka_requests_total
counterkafka_responses_bytes_total
counterkafka_responses_total
countersource_lag_time_seconds
histogramHow it works
Azure Event Hubs
It is possible to use the kafka
source and sink with Azure Event Hubs
for all tiers other than the Basic tier. More details
can be found here. To configure the source and
sink to connect to Azure Event Hubs set the following options:
bootstrap_servers
-<namespace name>.servicebus.windows.net:9093
group_id
- The consumer group. Note that if the default group ($Default
) is used it must be specified as$$Default
to escape the$
used for environment variables.topics
- The event hub name.sasl.enabled
- Set totrue
.sasl.mechanism
- Set toPLAIN
.sasl.username
- Set to$$ConnectionString
(note the double$$
).sasl.password
- Set to the connection string. See here.tls.enabled
- Set totrue
.tls.ca_file
- The certificate authority file.tls.verify_certificate
- Set totrue
.
librdkafka
kafka
source and sink use librdkafka
under the hood. This
is a battle-tested, high performance, and reliable library that facilitates
communication with Kafka. As Vector produces static MUSL builds,
this dependency is packaged with Vector, meaning you do not need to install it.Transport Layer Security (TLS)
tls.*
options and/or via an
OpenSSL configuration file. The file location defaults to
/usr/local/ssl/openssl.cnf
or can be specified with the OPENSSL_CONF
environment variable.