Kafka Sink

The Vector kafka sink streams log events to Apache Kafka via the Kafka protocol.

Configuration

  • Common
  • Advanced
vector.toml
[sinks.my_sink_id]
type = "kafka" # must be: "kafka"
inputs = ["my-source-id"] # example
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # example
key_field = "user_id" # example
topic = "topic-1234" # example

Options

9 items
commonstringrequired

bootstrap_servers

A comma delimited list of host and port pairs that the Kafka client should contact to bootstrap its cluster metadata.

No default
View examples
tableoptional

buffer

Configures the sink buffer behavior.

inteventsoptional

max_events

The maximum number of events allowed in the buffer.

Default: 500
Only relevant when: type = "memory"
View examples
intbytesoptional

max_size

The maximum size of the buffer on the disk.

No default
Only relevant when: type = "disk"
View examples
stringenumoptional

type

The buffer's type / location. disk buffers are persistent and will be retained between restarts.

Default: "memory"
Enum, must be one of: "memory" "disk"
View examples
stringenumoptional

when_full

The behavior when the buffer becomes full.

Default: "block"
Enum, must be one of: "block" "drop_newest"
View examples
stringenumoptional

encoding

The encoding format used to serialize the events before outputting.

No default
Enum, must be one of: "json" "text"
View examples
stringoptional

endpoint

Custom endpoint for use with AWS-compatible services. Providing a value for this option will make region moot.

No default
View examples
booloptional

healthcheck

Enables/disables the sink healthcheck upon start. See Health Checks for more info.

Default: true
View examples
commonstringrequired

key_field

The log field name to use for the topic key. If unspecified, the key will be randomly generated. If the field does not exist on the log, a blank value will be used.

No default
View examples
stringoptional

region

The AWS region of the target service. If endpoint is provided it will override this value since the endpoint includes the region.

No default
View examples
tableoptional

tls

Configures the TLS options for connections from this sink.

stringoptional

ca_path

Absolute path to an additional CA certificate file, in DER or PEM format (X.509).

No default
View examples
stringoptional

crt_path

Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12. If this is set and is not a PKCS#12 archive, key_path must also be set.

No default
View examples
booloptional

enabled

Enable TLS during connections to the remote.

No default
View examples
stringoptional

key_pass

Pass phrase used to unlock the encrypted key file. This has no effect unless key_pass above is set.

No default
View examples
stringoptional

key_path

Absolute path to a certificate key file used to identify this connection, in DER or PEM format (PKCS#8). If this is set, crt_path must also be set.

No default
View examples
commonstringrequired

topic

The Kafka topic name to write events to.

No default
View examples

Env Vars

2 items
stringoptional

AWS_ACCESS_KEY_ID

Used for AWS authentication when communicating with AWS services. See relevant AWS components for more info. See AWS Authentication for more info.

No default
View examples
stringoptional

AWS_SECRET_ACCESS_KEY

Used for AWS authentication when communicating with AWS services. See relevant AWS components for more info. See AWS Authentication for more info.

No default
View examples

How It Works

AWS Authentication

Vector checks for AWS credentials in the following order:

  1. Environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  2. The credential_process command in the AWS config file. (usually located at ~/.aws/config)
  3. The AWS credentials file. (usually located at ~/.aws/credentials)
  4. The IAM instance profile. (will only work if running on an EC2 instance with an instance profile/role)

If credentials are not found the healtcheck will fail and an error will be logged.

Obtaining an access key

In general, we recommend using instance profiles/roles whenever possible. In cases where this is not possible you can generate an AWS access key for any user within your AWS account. AWS provides a detailed guide on how to do this.

Environment Variables

Environment variables are supported through all of Vector's configuration. Simply add ${MY_ENV_VAR} in your Vector configuration file and the variable will be replaced before being evaluated.

You can learn more in the Environment Variables section.

Health Checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require Health Checks

If you'd like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable Health Checks

If you'd like to disable health checks for this sink you can set the healthcheck option to false.

Streaming

The kafka sink streams data on a real-time event-by-event basis. It does not batch data.