Kafka

Publish observability event data to Apache Kafka topics

status: stable delivery: at-least-once acknowledgements: yes egress: dynamic state: stateless

Configuration

Example configurations

{
  "sinks": {
    "my_sink_id": {
      "type": "kafka",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
      "key_field": "user_id",
      "topic": "topic-1234",
      "acknowledgements": null,
      "batch": null,
      "compression": "none",
      "encoding": {
        "codec": "json"
      },
      "healthcheck": null
    }
  }
}
[sinks.my_sink_id]
type = "kafka"
inputs = [ "my-source-or-transform-id" ]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
key_field = "user_id"
topic = "topic-1234"
compression = "none"

  [sinks.my_sink_id.encoding]
  codec = "json"
---
sinks:
  my_sink_id:
    type: kafka
    inputs:
      - my-source-or-transform-id
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    key_field: user_id
    topic: topic-1234
    acknowledgements: null
    batch: null
    compression: none
    encoding:
      codec: json
    healthcheck: null
{
  "sinks": {
    "my_sink_id": {
      "type": "kafka",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
      "key_field": "user_id",
      "librdkafka_options": {
        "client.id": "${ENV_VAR}",
        "fetch.error.backoff.ms": "1000",
        "socket.send.buffer.bytes": "100"
      },
      "message_timeout_ms": 300000,
      "sasl": null,
      "socket_timeout_ms": 60000,
      "topic": "topic-1234",
      "buffer": null,
      "acknowledgements": null,
      "batch": null,
      "compression": "none",
      "encoding": {
        "codec": "json"
      },
      "healthcheck": null,
      "headers_key": "headers"
    }
  }
}
[sinks.my_sink_id]
type = "kafka"
inputs = [ "my-source-or-transform-id" ]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
key_field = "user_id"
message_timeout_ms = 300_000
socket_timeout_ms = 60_000
topic = "topic-1234"
compression = "none"
headers_key = "headers"

  [sinks.my_sink_id.librdkafka_options]
  "client.id" = "${ENV_VAR}"
  "fetch.error.backoff.ms" = "1000"
  "socket.send.buffer.bytes" = "100"

  [sinks.my_sink_id.encoding]
  codec = "json"
---
sinks:
  my_sink_id:
    type: kafka
    inputs:
      - my-source-or-transform-id
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    key_field: user_id
    librdkafka_options:
      client.id: ${ENV_VAR}
      fetch.error.backoff.ms: "1000"
      socket.send.buffer.bytes: "100"
    message_timeout_ms: 300000
    sasl: null
    socket_timeout_ms: 60000
    topic: topic-1234
    buffer: null
    acknowledgements: null
    batch: null
    compression: none
    encoding:
      codec: json
    healthcheck: null
    headers_key: headers

acknowledgements

common optional object
Controls how acknowledgements are handled by this sink. When enabled, all connected sources that support end-to-end acknowledgements will wait for the destination of this sink to acknowledge receipt of events before providing acknowledgement to the sending source. These settings override the global acknowledgement settings.

acknowledgements.enabled

common optional bool
Controls if all connected sources will wait for this sink to deliver the events before acknowledging receipt.
default: false

batch

common optional object
Configures the sink batching behavior.

batch.max_bytes

common optional uint
The maximum size of a batch, in bytes, before it is flushed.

batch.max_events

common optional uint
The maximum size of a batch, in events, before it is flushed.

batch.timeout_secs

common optional float
The maximum age of a batch before it is flushed.

bootstrap_servers

required string literal
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a “bootstrap” Kafka cluster that a Kafka client connects to initially to bootstrap itself.
Examples
"10.14.22.123:9092,10.14.23.332:9092"

buffer

optional object
Configures the sink specific buffer behavior.

buffer.max_events

common optional uint
The maximum number of events allowed in the buffer.
Relevant when: type = "memory"
default: 500 (events)

buffer.max_size

required uint

The maximum size of the buffer on the disk. Must be at least 128 megabytes (134217728 bytes).

Note that during normal disk buffer operation, the disk buffer can create one additional 128 megabyte block so the minimum disk space required is actually 256 megabytes.

Relevant when: type = "disk"
Examples
104900000

buffer.type

common optional string literal enum
The buffer’s type and storage mechanism.
Enum options
OptionDescription
diskStores the sink’s buffer on disk. This is less performant, but durable. Data will not be lost between restarts. Will also hold data in memory to enhance performance. WARNING: This may stall the sink if disk performance isn’t on par with the throughput. For comparison, AWS gp2 volumes are usually too slow for common cases.
memoryStores the sink’s buffer in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully.
default: memory

buffer.when_full

optional string literal enum
The behavior when the buffer becomes full.
Enum options
OptionDescription
blockApplies back pressure when the buffer is full. This prevents data loss, but will cause data to pile up on the edge.
drop_newestDrops new data as it’s received. This data is lost. This should be used when performance is the highest priority.
default: block

compression

common optional string literal enum

The compression strategy used to compress the encoded event data before transmission.

Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.

Enum options string literal
OptionDescription
gzipGzip standard DEFLATE compression.
lz4lz4 compression.
noneNo compression.
snappySnappy compression.
zstdzstd compression.
default: none

encoding

required object

Configures the encoding specific sink behavior.

Note: When data in encoding is malformed, currently only a very generic error “data did not match any variant of untagged enum EncodingConfig” is reported. Follow this issue to track progress on improving these error messages.

encoding.codec

required string literal enum
The encoding codec used to serialize the events before outputting.
Enum options
OptionDescription
jsonJSON encoded event.
ndjsonNewline delimited list of JSON encoded events.
textThe message field from the event.
Examples
"json"
"text"
"ndjson"

encoding.except_fields

optional [string]
Prevent the sink from encoding the specified fields.

encoding.only_fields

optional [string]
Makes the sink encode only the specified fields.

encoding.timestamp_format

optional string literal enum
How to format event timestamps.
Enum options
OptionDescription
rfc3339Formats as a RFC3339 string
unixFormats as a unix timestamp
default: rfc3339

headers_key

optional string literal
The log field name to use for the Kafka headers. If omitted, no headers will be written.
Examples
"headers"

healthcheck

common optional object
Health check options for the sink.

healthcheck.enabled

common optional bool
Enables/disables the healthcheck upon Vector boot.
default: true

inputs

required [string]

A list of upstream source or transform IDs. Wildcards (*) are supported.

See configuration for more info.

Array string literal
Examples
[
  "my-source-or-transform-id",
  "prefix-*"
]

key_field

common optional string literal
The log field name or tags key to use for the topic key. If the field does not exist in the log or in tags, a blank value will be used. If unspecified, the key is not sent. Kafka uses a hash of the key to choose the partition or uses round-robin if the record has no key.
Examples
"user_id"

librdkafka_options

optional object
Advanced options. See librdkafka documentation for details.

message_timeout_ms

optional uint
Local message timeout.
Examples
150000
450000
default: 300000

sasl

optional object
Options for SASL/SCRAM authentication support.

sasl.enabled

common optional bool
Enable SASL/SCRAM authentication to the remote. (Not supported on Windows at this time.)

sasl.mechanism

common optional string literal
The Kafka SASL/SCRAM mechanisms.
Examples
"SCRAM-SHA-256"
"SCRAM-SHA-512"

sasl.password

common optional string literal
The Kafka SASL/SCRAM authentication password.
Examples
"password"

sasl.username

common optional string literal
The Kafka SASL/SCRAM authentication username.
Examples
"username"

socket_timeout_ms

optional uint
Default timeout for network requests.
Examples
30000
60000
default: 60000 (milliseconds)

topic

required string template
The Kafka topic name to write events to.
Note: This parameter supports Vector's template syntax, which enables you to use dynamic per-event values.
Examples
"topic-1234"
"logs-{{unit}}-%Y-%m-%d"

Telemetry

Metrics

link

buffer_byte_size

gauge
The number of bytes current in the buffer.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_discarded_events_total

counter
The number of events dropped by this non-blocking buffer.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_events

gauge
The number of events currently in the buffer.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_received_event_bytes_total

counter
The number of bytes received by this buffer.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_received_events_total

counter
The number of events received by this buffer.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_sent_event_bytes_total

counter
The number of bytes sent by this buffer.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_sent_events_total

counter
The number of events sent by this buffer.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_count

histogram
A histogram of Vector the number of events passed in each internal batch in Vector’s internal topology. Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_sent_bytes_total

counter
The number of raw bytes sent by this component to destination sinks.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
endpoint optional
The endpoint to which the bytes were sent. For HTTP, this will be the host and path only, excluding the query string.
file optional
The absolute path of the destination file.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
protocol required
The protocol used to send the bytes.
region optional
The AWS region name to which the bytes were sent. In some configurations, this may be a literal hostname.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

events_discarded_total

counter
The total number of events discarded by this component.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
reason required
The type of the error

events_in_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins. This metric is deprecated and will be removed in a future version. Use component_received_events_total instead.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

kafka_consumed_messages_bytes_total

counter
Total number of message bytes (including framing) received from Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_consumed_messages_total

counter
Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_produced_messages_bytes_total

counter
Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_produced_messages_total

counter
Total number of messages transmitted (produced) to Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_queue_messages

gauge
Current number of messages in producer queues.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_queue_messages_bytes

gauge
Current total size of messages in producer queues.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_requests_bytes_total

counter
Total number of bytes transmitted to Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_requests_total

counter
Total number of requests sent to Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_responses_bytes_total

counter
Total number of bytes received from Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_responses_total

counter
Total number of responses received from Kafka brokers.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

processing_errors_total

counter
The total number of processing errors encountered by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
error_type required
The type of the error
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

utilization

gauge
A ratio from 0 to 1 of the load on a component. A value of 0 would indicate a completely idle component that is simply waiting for input. A value of 1 would indicate a that is never idle. This value is updated every 5 seconds.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

How it works

Buffers and batches

This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.

Batches are flushed when 1 of 2 conditions are met:

  1. The batch age meets or exceeds the configured timeout_secs.
  2. The batch size meets or exceeds the configured max_bytes or max_events.

Buffers are controlled via the buffer.* options.

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require health checks

If you’d like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable health checks

If you’d like to disable health checks for this sink you can set the healthcheck option to false.

librdkafka

The kafka sink uses librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.

State

This component is stateless, meaning its behavior is consistent across each input.