Kafka Sink

The Vector kafka sink streams log events to Apache Kafka via the Kafka protocol.

Configuration

  • Common
  • Advanced
vector.toml
[sinks.my_sink_id]
# REQUIRED - General
type = "kafka" # must be: "kafka"
inputs = ["my-source-id"] # example
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # example
key_field = "user_id" # example
topic = "topic-1234" # example
# REQUIRED - requests
encoding = "json" # example, enum
# OPTIONAL - General
healthcheck = true # default

Requirements

Options

10 items
stringcommonrequired

bootstrap_servers

A comma delimited list of host and port pairs that the Kafka client should contact to bootstrap its cluster metadata.

No default
View examples
tableoptional

buffer

Configures the sink specific buffer behavior.

int (events)commonrequired

max_events

The maximum number of events allowed in the buffer.

Default: 500 (events)
Only relevant when: type = "memory"
View examples
int (bytes)commonrequired

max_size

The maximum size of the buffer on the disk.

No default
Only relevant when: type = "disk"
View examples
stringenumcommonrequired

type

The buffer's type and storage mechanism.

Default: "memory"
Enum, must be one of: "memory" "disk"
View examples
stringenumoptional

when_full

The behavior when the buffer becomes full.

Default: "block"
Enum, must be one of: "block" "drop_newest"
View examples
stringenumcommonrequired

encoding

The encoding format used to serialize the events before outputting.

No default
Enum, must be one of: "json" "text"
View examples
boolcommonoptional

healthcheck

Enables/disables the sink healthcheck upon start. See Health Checks for more info.

Default: true
View examples
stringcommonrequired

key_field

The log field name to use for the topic key. If unspecified, the key will be randomly generated. If the field does not exist on the log, a blank value will be used.

No default
View examples
tableoptional

librdkafka_options

Advanced producer options. See librdkafka documentation for details.

stringoptional

[field-name]

The options and their values. Accepts string values.

No default
View examples
intoptional

message_timeout_ms

Local message timeout.

Default: 300000
View examples
intoptional

socket_timeout_ms

Default timeout for network requests.

Default: 60000
View examples
tableoptional

tls

Configures the TLS options for connections from this sink.

booloptional

enabled

Enable TLS during connections to the remote.

No default
View examples
stringoptional

ca_path

Absolute path to an additional CA certificate file, in DER or PEM format (X.509).

No default
View examples
stringoptional

crt_path

Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12. If this is set and is not a PKCS#12 archive, key_path must also be set.

No default
View examples
stringoptional

key_pass

Pass phrase used to unlock the encrypted key file. This has no effect unless key_pass is set.

No default
View examples
stringoptional

key_path

Absolute path to a certificate key file used to identify this connection, in DER or PEM format (PKCS#8). If this is set, crt_path must also be set.

No default
View examples
stringcommonrequired

topic

The Kafka topic name to write events to.

No default
View examples

How It Works

AWS Authentication

Vector checks for AWS credentials in the following order:

  1. Environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  2. The credential_process command in the AWS config file. (usually located at ~/.aws/config)
  3. The AWS credentials file. (usually located at ~/.aws/credentials)
  4. The IAM instance profile. (will only work if running on an EC2 instance with an instance profile/role)

If credentials are not found the healtcheck will fail and an error will be logged.

Obtaining an access key

In general, we recommend using instance profiles/roles whenever possible. In cases where this is not possible you can generate an AWS access key for any user within your AWS account. AWS provides a detailed guide on how to do this.

Buffers

The kafka sink buffers events as shown in the diagram above. This helps to smooth out data processing if the downstream service applies backpressure. Buffers are controlled via the buffer.* options.

Environment Variables

Environment variables are supported through all of Vector's configuration. Simply add ${MY_ENV_VAR} in your Vector configuration file and the variable will be replaced before being evaluated.

You can learn more in the Environment Variables section.

Health Checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require Health Checks

If you'd like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable Health Checks

If you'd like to disable health checks for this sink you can set the healthcheck option to false.

Streaming

The kafka sink streams data on a real-time event-by-event basis. It does not batch data.

librdkafka

The kafka sink uses lib_rdkafka under the hood. This is a battle tested, performant, and reliabile library that facilitates communication with Kafka. And because Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.