Kafka
Collect logs from Kafka
Configuration
Example configurations
{
"sources": {
"my_source_id": {
"type": "kafka",
"acknowledgements": null,
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"group_id": "consumer-group-name",
"key_field": "message_key",
"topics": [
"^(prefix1|prefix2)-.+"
]
}
}
}
[sources.my_source_id]
type = "kafka"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
group_id = "consumer-group-name"
key_field = "message_key"
topics = [ "^(prefix1|prefix2)-.+" ]
---
sources:
my_source_id:
type: kafka
acknowledgements: null
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
group_id: consumer-group-name
key_field: message_key
topics:
- ^(prefix1|prefix2)-.+
{
"sources": {
"my_source_id": {
"type": "kafka",
"acknowledgements": null,
"auto_offset_reset": "largest",
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"commit_interval_ms": 5000,
"fetch_wait_max_ms": 100,
"group_id": "consumer-group-name",
"key_field": "message_key",
"topic_key": "topic",
"partition_key": "partition",
"offset_key": "offset",
"headers_key": "headers",
"librdkafka_options": {
"client.id": "${ENV_VAR}",
"fetch.error.backoff.ms": "1000",
"socket.send.buffer.bytes": "100"
},
"sasl": null,
"session_timeout_ms": 10000,
"socket_timeout_ms": 60000,
"framing": null,
"tls": null,
"topics": [
"^(prefix1|prefix2)-.+"
],
"decoding": null
}
}
}
[sources.my_source_id]
type = "kafka"
auto_offset_reset = "largest"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
commit_interval_ms = 5_000
fetch_wait_max_ms = 100
group_id = "consumer-group-name"
key_field = "message_key"
topic_key = "topic"
partition_key = "partition"
offset_key = "offset"
headers_key = "headers"
session_timeout_ms = 10_000
socket_timeout_ms = 60_000
topics = [ "^(prefix1|prefix2)-.+" ]
[sources.my_source_id.librdkafka_options]
"client.id" = "${ENV_VAR}"
"fetch.error.backoff.ms" = "1000"
"socket.send.buffer.bytes" = "100"
---
sources:
my_source_id:
type: kafka
acknowledgements: null
auto_offset_reset: largest
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
commit_interval_ms: 5000
fetch_wait_max_ms: 100
group_id: consumer-group-name
key_field: message_key
topic_key: topic
partition_key: partition
offset_key: offset
headers_key: headers
librdkafka_options:
client.id: ${ENV_VAR}
fetch.error.backoff.ms: "1000"
socket.send.buffer.bytes: "100"
sasl: null
session_timeout_ms: 10000
socket_timeout_ms: 60000
framing: null
tls: null
topics:
- ^(prefix1|prefix2)-.+
decoding: null
acknowledgements
common optional objectacknowledgement
settings. This setting is deprecated in favor of enabling acknowledgements
in the destination sink.acknowledgements.enabled
common optional boolfalse
auto_offset_reset
optional string literalauto.offset.reset
option for further
clarification.largest
bootstrap_servers
required string literalcommit_interval_ms
optional uint5000
(milliseconds)decoding
optional objectdecoding.codec
common optional string literal enumOption | Description |
---|---|
bytes | Uses the raw bytes as-is. |
gelf | Decodes the raw bytes as a GELF message. |
json | Decodes the raw bytes as JSON. |
native | Decodes the raw bytes as Vector’s native Protocol Buffers format. This codec is experimental. |
native_json | Decodes the raw bytes as Vector’s native JSON format. This codec is experimental. |
syslog | Decodes the raw bytes as a Syslog message. Will decode either as the RFC 3164-style format (“old” style) or the more modern RFC 5424-style format (“new” style, includes structured data). |
bytes
fetch_wait_max_ms
optional uint100
(milliseconds)framing
optional objectframing.character_delimited
required objectcharacter_delimited
framing.method = `character_delimited`
framing.character_delimited.delimiter
required ascii_charframing.character_delimited.max_length
optional uintmax_length
bytes will be discarded entirely.framing.method
common optional string literal enumOption | Description |
---|---|
bytes | Byte frames are passed through as-is according to the underlying I/O boundaries (e.g. split between messages or stream segments). |
character_delimited | Byte frames which are delimited by a chosen character. |
length_delimited | Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length. |
newline_delimited | Byte frames which are delimited by a newline character. |
octet_counting | Byte frames according to the octet counting format. |
bytes
framing.newline_delimited
optional objectnewline_delimited
framing.method = `newline_delimited`
framing.newline_delimited.max_length
optional uintmax_length
bytes will be discarded entirely.framing.octet_counting
optional objectoctet_counting
framing.method = `octet_counting`
framing.octet_counting.max_length
optional uintmax_length
bytes will be discarded entirely.group_id
required string literalkey_field
common optional string literalmessage_key
partition_key
optional string literalpartition
sasl
optional objectsasl.enabled
common optional boolsasl.mechanism
common optional string literalsasl.password
common optional string literalsasl.username
common optional string literalsession_timeout_ms
optional uint10000
(milliseconds)tls
optional objecttls.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. Prioritized in the order they are defined.
tls.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls.crt_file
common optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls.enabled
common optional boolWhether or not to require TLS for incoming/outgoing connections.
When enabled and used for incoming connections, an identity certificate is also required. See tls.crt_file
for
more information.
false
tls.key_file
common optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
topics
required [string]^
.Outputs
<component_id>
Output Data
Logs
Record
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
100
partition
kafka
2020-10-10T17:07:36.452332Z
topic
Telemetry
Metrics
linkcomponent_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.component_errors_total
countercomponent_id
instead. The value is the same as component_id
.component_received_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.consumer_offset_updates_failed_total
countercomponent_id
instead. The value is the same as component_id
.events_failed_total
countercomponent_id
instead. The value is the same as component_id
.events_in_total
countercomponent_received_events_total
instead.component_id
instead. The value is the same as component_id
.events_out_total
countercomponent_sent_events_total
instead.component_id
instead. The value is the same as component_id
.kafka_consumed_messages_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_consumed_messages_total
countercomponent_id
instead. The value is the same as component_id
.kafka_produced_messages_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_produced_messages_total
countercomponent_id
instead. The value is the same as component_id
.kafka_queue_messages
gaugecomponent_id
instead. The value is the same as component_id
.kafka_queue_messages_bytes
gaugecomponent_id
instead. The value is the same as component_id
.kafka_requests_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_requests_total
countercomponent_id
instead. The value is the same as component_id
.kafka_responses_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_responses_total
countercomponent_id
instead. The value is the same as component_id
.processed_bytes_total
countercomponent_id
instead. The value is the same as component_id
.processed_events_total
countercomponent_received_events_total
and
component_sent_events_total
metrics.component_id
instead. The value is the same as component_id
.source_lag_time_seconds
histogramcomponent_id
instead. The value is the same as component_id
.How it works
Azure Event Hubs
It is possible to use the kafka
source and sink with Azure Event Hubs
for all tiers other than the Basic tier. More details
can be found here. To configure the source and
sink to connect to Azure Event Hubs set the following options:
bootstrap_servers
-<namespace name>.servicebus.windows.net:9093
group_id
- The consumer group. Note that if the default group ($Default
) is used it must be specified as$$Default
to escape the$
used for environment variables.topics
- The event hub name.sasl.enabled
- Set totrue
.sasl.mechanism
- Set toPLAIN
.sasl.username
- Set to$$ConnectionString
(note the double$$
).sasl.password
- Set to the connection string. See here.tls.enabled
- Set totrue
.tls.ca_file
- The certificate authority file.tls.verify_certificate
- Set totrue
.
librdkafka
kafka
sink uses librdkafka
under the hood. This
is a battle-tested, high performance, and reliable library that facilitates
communication with Kafka. As Vector produces static MUSL builds,
this dependency is packaged with Vector, meaning you do not need to install it.Transport Layer Security (TLS)
tls.*
options.