Kafka

Collect observability data from Apache Kafka topics

status: stable role: aggregator delivery: at-least-once acknowledgements: yes egress: stream state: stateless output: log

Configuration

Example configurations

{
  "sources": {
    "my_source_id": {
      "type": "kafka",
      "bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
      "group_id": "consumer-group-name",
      "topics": [
        "^(prefix1|prefix2)-.+"
      ]
    }
  }
}
[sources.my_source_id]
type = "kafka"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
group_id = "consumer-group-name"
topics = [ "^(prefix1|prefix2)-.+" ]
sources:
  my_source_id:
    type: kafka
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    group_id: consumer-group-name
    topics:
      - ^(prefix1|prefix2)-.+
{
  "sources": {
    "my_source_id": {
      "type": "kafka",
      "auto_offset_reset": "largest",
      "bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
      "commit_interval_ms": 5000,
      "drain_timeout_ms": 2500,
      "fetch_wait_max_ms": 100,
      "group_id": "consumer-group-name",
      "headers_key": "headers",
      "key_field": "message_key",
      "librdkafka_options": {
        "client.id": "${ENV_VAR}",
        "fetch.error.backoff.ms": "1000",
        "socket.send.buffer.bytes": "100"
      },
      "offset_key": "offset",
      "partition_key": "partition",
      "session_timeout_ms": 10000,
      "socket_timeout_ms": 60000,
      "topic_key": "topic",
      "topics": [
        "^(prefix1|prefix2)-.+"
      ]
    }
  }
}
[sources.my_source_id]
type = "kafka"
auto_offset_reset = "largest"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
commit_interval_ms = 5_000
drain_timeout_ms = 2_500
fetch_wait_max_ms = 100
group_id = "consumer-group-name"
headers_key = "headers"
key_field = "message_key"
offset_key = "offset"
partition_key = "partition"
session_timeout_ms = 10_000
socket_timeout_ms = 60_000
topic_key = "topic"
topics = [ "^(prefix1|prefix2)-.+" ]

  [sources.my_source_id.librdkafka_options]
  "client.id" = "${ENV_VAR}"
  "fetch.error.backoff.ms" = "1000"
  "socket.send.buffer.bytes" = "100"
sources:
  my_source_id:
    type: kafka
    auto_offset_reset: largest
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    commit_interval_ms: 5000
    drain_timeout_ms: 2500
    fetch_wait_max_ms: 100
    group_id: consumer-group-name
    headers_key: headers
    key_field: message_key
    librdkafka_options:
      client.id: ${ENV_VAR}
      fetch.error.backoff.ms: "1000"
      socket.send.buffer.bytes: "100"
    offset_key: offset
    partition_key: partition
    session_timeout_ms: 10000
    socket_timeout_ms: 60000
    topic_key: topic
    topics:
      - ^(prefix1|prefix2)-.+

acknowledgements

optional object

Deprecated

This field is deprecated.

Controls how acknowledgements are handled by this source.

This setting is deprecated in favor of enabling acknowledgements at the global or sink level.

Enabling or disabling acknowledgements at the source level has no effect on acknowledgement behavior.

See End-to-end Acknowledgements for more information on how event acknowledgement is handled.

Whether or not end-to-end acknowledgements are enabled for this source.

auto_offset_reset

optional string literal

If offsets for consumer group do not exist, set them using this strategy.

See the librdkafka documentation for the auto.offset.reset option for further clarification.

Examples
"smallest"
"earliest"
"beginning"
"largest"
"latest"
"end"
"error"
default: largest

bootstrap_servers

required string literal

A comma-separated list of Kafka bootstrap servers.

These are the servers in a Kafka cluster that a client should use to bootstrap its connection to the cluster, allowing discovery of all the other hosts in the cluster.

Must be in the form of host:port, and comma-separated.

Examples
"10.14.22.123:9092,10.14.23.332:9092"

commit_interval_ms

optional uint
The frequency that the consumer offsets are committed (written) to offset storage.
Examples
5000
10000
default: 5000 (milliseconds)

decoding

optional object
Configures how events are decoded from raw bytes.

decoding.avro

required object
Apache Avro-specific encoder options.
Relevant when: codec = "avro"
decoding.avro.schema
required string literal

The Avro schema definition. Please note that the following [apache_avro::types::Value] variants are currently not supported:

  • Date
  • Decimal
  • Duration
  • Fixed
  • TimeMillis
Examples
"{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"
For Avro datum encoded in Kafka messages, the bytes are prefixed with the schema ID. Set this to true to strip the schema ID prefix. According to Confluent Kafka’s document.

decoding.codec

optional string literal enum
The codec to use for decoding events.
Enum options
OptionDescription
avroDecodes the raw bytes as as an Apache Avro message.
bytesUses the raw bytes as-is.
gelf

Decodes the raw bytes as a GELF message.

This codec is experimental for the following reason:

The GELF specification is more strict than the actual Graylog receiver. Vector’s decoder currently adheres more strictly to the GELF spec, with the exception that some characters such as @ are allowed in field names.

Other GELF codecs such as Loki’s, use a Go SDK that is maintained by Graylog, and is much more relaxed than the GELF spec.

Going forward, Vector will use that Go SDK as the reference implementation, which means the codec may continue to relax the enforcement of specification.

influxdbDecodes the raw bytes as an Influxdb Line Protocol message.
jsonDecodes the raw bytes as JSON.
native

Decodes the raw bytes as native Protocol Buffers format.

This codec is experimental.

native_json

Decodes the raw bytes as native JSON format.

This codec is experimental.

protobufDecodes the raw bytes as protobuf.
syslog

Decodes the raw bytes as a Syslog message.

Decodes either as the RFC 3164-style format (“old” style) or the RFC 5424-style format (“new” style, includes structured data).

vrlDecodes the raw bytes as a string and passes them as input to a VRL program.
default: bytes

decoding.gelf

optional object
GELF-specific decoding options.
Relevant when: codec = "gelf"
decoding.gelf.lossy
optional bool

Determines whether or not to replace invalid UTF-8 sequences instead of failing.

When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER.

default: true

decoding.influxdb

optional object
Influxdb-specific decoding options.
Relevant when: codec = "influxdb"

Determines whether or not to replace invalid UTF-8 sequences instead of failing.

When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER.

default: true

decoding.json

optional object
JSON-specific decoding options.
Relevant when: codec = "json"
decoding.json.lossy
optional bool

Determines whether or not to replace invalid UTF-8 sequences instead of failing.

When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER.

default: true

decoding.native_json

optional object
Vector’s native JSON-specific decoding options.
Relevant when: codec = "native_json"

Determines whether or not to replace invalid UTF-8 sequences instead of failing.

When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER.

default: true

decoding.protobuf

optional object
Protobuf-specific decoding options.
Relevant when: codec = "protobuf"
decoding.protobuf.desc_file
optional string literal

The path to the protobuf descriptor set file.

This file is the output of protoc -I <include path> -o <desc output path> <proto>

You can read more here.

decoding.protobuf.message_type
optional string literal
The name of the message type to use for serializing.
Examples
"package.Message"

decoding.syslog

optional object
Syslog-specific decoding options.
Relevant when: codec = "syslog"

Determines whether or not to replace invalid UTF-8 sequences instead of failing.

When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER.

default: true

decoding.vrl

required object
VRL-specific decoding options.
Relevant when: codec = "vrl"
decoding.vrl.source
required string literal
The Vector Remap Language (VRL) program to execute for each event. Note that the final contents of the . target will be used as the decoding result. Compilation error or use of ‘abort’ in a program will result in a decoding error.
decoding.vrl.timezone
optional string literal

The name of the timezone to apply to timestamp conversions that do not contain an explicit time zone. The time zone name may be any name in the TZ database, or local to indicate system local time.

If not set, local will be used.

Examples
"local"
"America/New_York"
"EST5EDT"

drain_timeout_ms

optional uint

Timeout to drain pending acknowledgements during shutdown or a Kafka consumer group rebalance.

When Vector shuts down or the Kafka consumer group revokes partitions from this consumer, wait a maximum of drain_timeout_ms for the source to process pending acknowledgements. Must be less than session_timeout_ms to ensure the consumer is not excluded from the group during a rebalance.

Default value is half of session_timeout_ms.

Examples
2500
5000

fetch_wait_max_ms

optional uint
Maximum time the broker may wait to fill the response.
Examples
50
100
default: 100 (milliseconds)

framing

optional object

Framing configuration.

Framing handles how events are separated when encoded in a raw byte form, where each event is a frame that must be prefixed, or delimited, in a way that marks where an event begins and ends within the byte stream.

Options for the character delimited decoder.
Relevant when: method = "character_delimited"
The character that delimits byte sequences.

The maximum length of the byte buffer.

This length does not include the trailing delimiter.

By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.

If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.

framing.chunked_gelf

optional object
Options for the chunked GELF decoder.
Relevant when: method = "chunked_gelf"
framing.chunked_gelf.decompression
optional string literal enum
Decompression configuration for GELF messages.
Enum options
OptionDescription
AutoAutomatically detect the decompression method based on the magic bytes of the message.
GzipUse Gzip decompression.
NoneDo not decompress the message.
ZlibUse Zlib decompression.
default: Auto

The maximum length of a single GELF message, in bytes. Messages longer than this length will be dropped. If this option is not set, the decoder does not limit the length of messages and the per-message memory is unbounded.

Note that a message can be composed of multiple chunks and this limit is applied to the whole message, not to individual chunks.

This limit takes only into account the message’s payload and the GELF header bytes are excluded from the calculation. The message’s payload is the concatenation of all the chunks’ payloads.

The maximum number of pending incomplete messages. If this limit is reached, the decoder starts dropping chunks of new messages, ensuring the memory usage of the decoder’s state is bounded. If this option is not set, the decoder does not limit the number of pending messages and the memory usage of its messages buffer can grow unbounded. This matches Graylog Server’s behavior.
The timeout, in seconds, for a message to be fully received. If the timeout is reached, the decoder drops all the received chunks of the timed out message.
default: 5
Options for the length delimited decoder.
Relevant when: method = "length_delimited"
Length field byte order (little or big endian)
default: true
Number of bytes representing the field length
default: 4
Number of bytes in the header before the length field
Maximum frame length
default: 8.388608e+06

framing.method

optional string literal enum
The framing method.
Enum options
OptionDescription
bytesByte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
character_delimitedByte frames which are delimited by a chosen character.
chunked_gelfByte frames which are chunked GELF messages.
length_delimitedByte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
newline_delimitedByte frames which are delimited by a newline character.
octet_countingByte frames according to the octet counting format.
default: bytes
Options for the newline delimited decoder.
Relevant when: method = "newline_delimited"

The maximum length of the byte buffer.

This length does not include the trailing delimiter.

By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.

If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.

framing.octet_counting

optional object
Options for the octet counting decoder.
Relevant when: method = "octet_counting"
The maximum length of the byte buffer.

group_id

required string literal
The consumer group name to be used to consume events from Kafka.
Examples
"consumer-group-name"

headers_key

optional string literal

Overrides the name of the log field used to add the headers to each event.

The value is the headers of the Kafka message itself.

By default, "headers" is used.

Examples
"headers"
default: headers

key_field

optional string literal

Overrides the name of the log field used to add the message key to each event.

The value is the message key of the Kafka message itself.

By default, "message_key" is used.

Examples
"message_key"
default: message_key

librdkafka_options

optional object

Advanced options set directly on the underlying librdkafka client.

See the librdkafka documentation for details.

librdkafka_options.*

required string literal
A librdkafka configuration option.

metrics

optional object
Metrics (beta) configuration.
Expose topic lag metrics for all topics and partitions. Metric names are kafka_consumer_lag.
default: false

offset_key

optional string literal

Overrides the name of the log field used to add the offset to each event.

The value is the offset of the Kafka message itself.

By default, "offset" is used.

Examples
"offset"
default: offset

partition_key

optional string literal

Overrides the name of the log field used to add the partition to each event.

The value is the partition from which the Kafka message was consumed from.

By default, "partition" is used.

Examples
"partition"
default: partition

sasl

optional object
Configuration for SASL authentication when interacting with Kafka.

sasl.enabled

optional bool

Enables SASL authentication.

Only PLAIN- and SCRAM-based mechanisms are supported when configuring SASL authentication using sasl.*. For other mechanisms, librdkafka_options.* must be used directly to configure other librdkafka-specific values. If using sasl.kerberos.* as an example, where * is service.name, principal, kinit.md, etc., then librdkafka_options.* as a result becomes librdkafka_options.sasl.kerberos.service.name, librdkafka_options.sasl.kerberos.principal, etc.

See the librdkafka documentation for details.

SASL authentication is not supported on Windows.

sasl.mechanism

optional string literal
The SASL mechanism to use.
Examples
"SCRAM-SHA-256"
"SCRAM-SHA-512"

sasl.password

optional string literal
The SASL password.
Examples
"password"

sasl.username

optional string literal
The SASL username.
Examples
"username"

session_timeout_ms

optional uint
The Kafka session timeout.
Examples
5000
10000
default: 10000 (milliseconds)

socket_timeout_ms

optional uint
Timeout for network requests.
Examples
30000
60000
default: 60000 (milliseconds)

tls

optional object
Configures the TLS options for incoming/outgoing connections.

tls.alpn_protocols

optional [string]

Sets the list of supported ALPN protocols.

Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.

tls.ca_file

optional string literal

Absolute path to an additional CA certificate file.

The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.

Examples
"/path/to/certificate_authority.crt"

tls.crt_file

optional string literal

Absolute path to a certificate file used to identify this server.

The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.

If this is set, and is not a PKCS#12 archive, key_file must also be set.

Examples
"/path/to/host_certificate.crt"

tls.enabled

optional bool

Whether or not to require TLS for incoming or outgoing connections.

When enabled and used for incoming connections, an identity certificate is also required. See tls.crt_file for more information.

tls.key_file

optional string literal

Absolute path to a private key file used to identify this server.

The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.

Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal

Passphrase used to unlock the encrypted key file.

This has no effect unless key_file is set.

Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"

tls.server_name

optional string literal

Server name to use when using Server Name Indication (SNI).

Only relevant for outgoing connections.

Examples
"www.example.com"

Enables certificate verification. For components that create a server, this requires that the client connections have a valid client certificate. For components that initiate requests, this validates that the upstream has a valid certificate.

If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.

Do NOT set this to false unless you understand the risks of not verifying the validity of certificates.

tls.verify_hostname

optional bool

Enables hostname verification.

If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.

Only relevant for outgoing connections.

Do NOT set this to false unless you understand the risks of not verifying the remote hostname.

topic_key

optional string literal

Overrides the name of the log field used to add the topic to each event.

The value is the topic from which the Kafka message was consumed from.

By default, "topic" is used.

Examples
"topic"
default: topic

topics

required [string]

The Kafka topics names to read events from.

Regular expression syntax is supported if the topic begins with ^.

Array string literal
Examples
[
  "^(prefix1|prefix2)-.+",
  "topic-1",
  "topic-2"
]

Outputs

<component_id>

Default output stream of the component. Use this component’s ID as an input to downstream transforms and sinks.

Output Data

Logs

Warning

The fields shown below will be different if log namespacing is enabled. See Log Namespacing for more details

Record

An individual Kafka record
Fields
message required string literal
The raw line from the Kafka record.
Examples
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
offset required uint
The Kafka offset at the time the record was retrieved.
Examples
100
partition required string literal
The Kafka partition that the record came from.
Examples
partition
source_type required string literal
The name of the source type.
Examples
kafka
timestamp required timestamp
The timestamp encoded in the Kafka message or the current time if it cannot be fetched.
Examples
2020-10-10T17:07:36.452332Z
topic required string literal
The Kafka topic that the record came from.
Examples
topic

Telemetry

Metrics

link

component_discarded_events_total

counter
The number of events dropped by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
intentional
True if the events were discarded intentionally, like a filter transform, or false if due to an error.
pid optional
The process ID of the Vector instance.

component_errors_total

counter
The total number of errors encountered by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
error_type
The type of the error
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
stage
The stage within the component at which the error occurred.

component_received_bytes_total

counter
The number of raw bytes accepted by this component from source origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_count

histogram

A histogram of the number of events passed in each internal batch in Vector’s internal topology.

Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.

component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

kafka_consumed_messages_bytes_total

counter
Total number of message bytes (including framing) received from Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_consumed_messages_total

counter
Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_consumer_lag

gauge
The Kafka consumer lag.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
partition_id
The Kafka partition id.
pid optional
The process ID of the Vector instance.
topic_id
The Kafka topic id.

kafka_produced_messages_bytes_total

counter
Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_produced_messages_total

counter
Total number of messages transmitted (produced) to Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_queue_messages

gauge
Current number of messages in producer queues.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_queue_messages_bytes

gauge
Current total size of messages in producer queues.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_requests_bytes_total

counter
Total number of bytes transmitted to Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_requests_total

counter
Total number of requests sent to Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_responses_bytes_total

counter
Total number of bytes received from Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_responses_total

counter
Total number of responses received from Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

source_lag_time_seconds

histogram
The difference between the timestamp recorded in each event and the time when it was ingested, expressed as fractional seconds.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

How it works

Azure Event Hubs

It is possible to use the kafka source and sink with Azure Event Hubs for all tiers other than the Basic tier. More details can be found here. To configure the source and sink to connect to Azure Event Hubs set the following options:

  • bootstrap_servers - <namespace name>.servicebus.windows.net:9093
  • group_id - The consumer group. Note that if the default group ($Default) is used it must be specified as $$Default to escape the $ used for environment variables.
  • topics - The event hub name.
  • sasl.enabled - Set to true.
  • sasl.mechanism - Set to PLAIN.
  • sasl.username - Set to $$ConnectionString (note the double $$).
  • sasl.password - Set to the connection string. See here.
  • tls.enabled - Set to true.
  • tls.ca_file - The certificate authority file.
  • tls.verify_certificate - Set to true.

Context

By default, the kafka source augments events with helpful context keys.

librdkafka

The kafka source and sink use librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols due to OpenSSL’s maturity. You can enable and adjust TLS behavior via the tls.* options and/or via an OpenSSL configuration file. The file location defaults to /usr/local/ssl/openssl.cnf or can be specified with the OPENSSL_CONF environment variable.