Kafka

Collect logs from Kafka

status: stable role: aggregator delivery: at-least-once egress: stream state: stateless output: log

Configuration

Example configurations

{
  "sources": {
    "my_source_id": {
      "type": "kafka",
      "acknowledgements": null,
      "bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
      "group_id": "consumer-group-name",
      "key_field": "message_key",
      "topics": [
        "^(prefix1|prefix2)-.+"
      ]
    }
  }
}
[sources.my_source_id]
type = "kafka"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
group_id = "consumer-group-name"
key_field = "message_key"
topics = [ "^(prefix1|prefix2)-.+" ]
---
sources:
  my_source_id:
    type: kafka
    acknowledgements: null
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    group_id: consumer-group-name
    key_field: message_key
    topics:
      - ^(prefix1|prefix2)-.+
{
  "sources": {
    "my_source_id": {
      "type": "kafka",
      "acknowledgements": null,
      "auto_offset_reset": "largest",
      "bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
      "commit_interval_ms": 5000,
      "fetch_wait_max_ms": 100,
      "group_id": "consumer-group-name",
      "key_field": "message_key",
      "topic_key": "topic",
      "partition_key": "partition",
      "offset_key": "offset",
      "headers_key": "headers",
      "session_timeout_ms": 10000,
      "socket_timeout_ms": 60000,
      "topics": [
        "^(prefix1|prefix2)-.+"
      ]
    }
  }
}
[sources.my_source_id]
type = "kafka"
auto_offset_reset = "largest"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
commit_interval_ms = 5_000
fetch_wait_max_ms = 100
group_id = "consumer-group-name"
key_field = "message_key"
topic_key = "topic"
partition_key = "partition"
offset_key = "offset"
headers_key = "headers"
session_timeout_ms = 10_000
socket_timeout_ms = 60_000
topics = [ "^(prefix1|prefix2)-.+" ]
---
sources:
  my_source_id:
    type: kafka
    acknowledgements: null
    auto_offset_reset: largest
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    commit_interval_ms: 5000
    fetch_wait_max_ms: 100
    group_id: consumer-group-name
    key_field: message_key
    topic_key: topic
    partition_key: partition
    offset_key: offset
    headers_key: headers
    librdkafka_options: null
    sasl: null
    session_timeout_ms: 10000
    socket_timeout_ms: 60000
    framing: null
    tls: null
    topics:
      - ^(prefix1|prefix2)-.+
    decoding: null

acknowledgements

common optional bool
Controls if the source will wait for destination sinks to deliver the events before acknowledging receipt.
default: false

auto_offset_reset

optional string literal
If offsets for consumer group do not exist, set them using this strategy. See the librdkafka documentation for the auto.offset.reset option for further clarification.
Examples
"smallest"
"earliest"
"beginning"
"largest"
"latest"
"end"
"error"
default: largest

bootstrap_servers

required string literal
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a “bootstrap” Kafka cluster that a Kafka client connects to initially to bootstrap itself.
Examples
"10.14.22.123:9092,10.14.23.332:9092"

commit_interval_ms

optional uint
The frequency that the consumer offsets are committed (written) to offset storage.
Examples
5000
10000
default: 5000 (milliseconds)

decoding

optional object
Configures in which way frames are decoded into events.

decoding.codec

optional string literal enum
The decoding method.
Enum options
OptionDescription
bytesEvents containing the byte frame as-is.
jsonEvents being parsed from a JSON string.
syslogEvents being parsed from a Syslog message.
default: bytes

fetch_wait_max_ms

optional uint
Maximum time the broker may wait to fill the response.
Examples
50
100
default: 100 (milliseconds)

framing

optional object
Configures in which way incoming byte sequences are split up into byte frames.
Options for character_delimited framing.
Relevant when: method = `character_delimited`
The character used to separate frames.
Examples
"\n"
"\t"
The maximum frame length limit. Any frames longer than max_length bytes will be discarded entirely.
Examples
65535
102400

framing.method

optional string literal enum
The framing method.
Enum options
OptionDescription
bytesByte frames are passed through as-is according to the underlying I/O boundaries (e.g. split between messages or stream segments).
character_delimitedByte frames which are delimited by a chosen character.
length_delimitedByte frames whose length is encoded in a header.
newline_delimitedByte frames which are delimited by a newline character.
octet_countingByte frames according to the octet counting format.
default: bytes
Options for newline_delimited framing.
Relevant when: method = `newline_delimited`
The maximum frame length limit. Any frames longer than max_length bytes will be discarded entirely.
Examples
65535
102400

framing.octet_counting

optional object
Options for octet_counting framing.
Relevant when: method = `octet_counting`
The maximum frame length limit. Any frames longer than max_length bytes will be discarded entirely.
Examples
65535
102400

group_id

required string literal
The consumer group name to be used to consume events from Kafka.
Examples
"consumer-group-name"

headers_key

optional string literal
The log field name to use for the Kafka headers.
Examples
"headers"
default: headers

key_field

common optional string literal
The log field name to use for the Kafka message key.
Examples
"message_key"
default: message_key

librdkafka_options

optional object
Advanced options. See librdkafka documentation for details.

offset_key

optional string literal
The log field name to use for the Kafka offset.
Examples
"offset"
default: offset

partition_key

optional string literal
The log field name to use for the Kafka partition name.
Examples
"partition"
default: partition

sasl

optional object
Options for SASL/SCRAM authentication support.

sasl.enabled

optional bool
Enable SASL/SCRAM authentication to the remote (not supported on Windows at this time).

sasl.mechanism

optional string literal
The Kafka SASL/SCRAM mechanisms.
Examples
"SCRAM-SHA-256"
"SCRAM-SHA-512"

sasl.password

optional string literal
The Kafka SASL/SCRAM authentication password.
Examples
"password"

sasl.username

optional string literal
The Kafka SASL/SCRAM authentication username.
Examples
"username"

session_timeout_ms

optional uint
The Kafka session timeout in milliseconds.
Examples
5000
10000
default: 10000 (milliseconds)

socket_timeout_ms

optional uint
Default timeout for network requests.
Examples
30000
60000
default: 60000 (milliseconds)

tls

optional object
Configures the TLS options for incoming connections.

tls.ca_file

optional string literal
Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.
Examples
"/path/to/certificate_authority.crt"

tls.crt_file

optional string literal
Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.
Examples
"/path/to/host_certificate.crt"

tls.enabled

optional bool
Enable TLS during connections to the remote.
default: false

tls.key_file

optional string literal
Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.
Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal
Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.
Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"
If true (the default), Vector will validate the TLS certificate of the remote host.
default: true

topic_key

optional string literal
The log field name to use for the Kafka topic.
Examples
"topic"
default: topic

topics

required [string]
The Kafka topics names to read events from. Regex is supported if the topic begins with ^.
Array string literal
Examples
[
  "^(prefix1|prefix2)-.+",
  "topic-1",
  "topic-2"
]

Output

Logs

Record

An individual Kafka record
Fields
message required string literal
The raw line from the Kafka record.
Examples
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
offset required uint
The Kafka offset at the time the record was retrieved.
Examples
100
partition required string literal
The Kafka partition that the record came from.
Examples
partition
timestamp required timestamp
The timestamp encoded in the Kafka message or the current time if it cannot be fetched.
Examples
2020-10-10T17:07:36.452332Z
topic required string literal
The Kafka topic that the record came from.
Examples
topic

Telemetry

Metrics

link

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the event originates.
file optional
The file from which the event originates.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the event originates.
peer_path optional
The pathname from which the event originates.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the event originates.
uri optional
The sanitized URI from which the event originates.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

consumer_offset_updates_failed_total

counter
The total number of failures to update a Kafka consumer offset.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

events_failed_total

counter
The total number of failures to read a Kafka message.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

events_in_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins. This metric is deprecated and will be removed in a future version. Use component_received_events_total instead.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the event originates.
file optional
The file from which the event originates.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the event originates.
peer_path optional
The pathname from which the event originates.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the event originates.
uri optional
The sanitized URI from which the event originates.

events_out_total

counter
The total number of events emitted by this component. This metric is deprecated and will be removed in a future version. Use component_sent_events_total instead.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_consumed_messages_bytes_total

counter
Total number of message bytes (including framing) received from Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_consumed_messages_total

counter
Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_produced_messages_bytes_total

counter
Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_produced_messages_total

counter
Total number of messages transmitted (produced) to Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_queue_messages

gauge
Current number of messages in producer queues.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_queue_messages_bytes

gauge
Current total size of messages in producer queues.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_requests_bytes_total

counter
Total number of bytes transmitted to Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_requests_total

counter
Total number of requests sent to Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_responses_bytes_total

counter
Total number of bytes received from Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

kafka_responses_total

counter
Total number of responses received from Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

processed_bytes_total

counter
The number of bytes processed by the component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the bytes originate.
file optional
The file from which the bytes originate.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the bytes originate.
peer_path optional
The pathname from which the bytes originate.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the bytes originate.
uri optional
The sanitized URI from which the bytes originate.

processed_events_total

counter
The total number of events processed by this component. This metric is deprecated in place of using component_received_events_total and component_sent_events_total metrics.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

How it works

Context

By default, the kafka source augments events with helpful context keys.

librdkafka

The kafka sink uses librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols. You can adjust TLS behavior via the tls.* options.