Kafka Sink

The Vector kafka sink sends logs to Kafka.

Requirements

Configuration

[sinks.my_sink_id]
# General
type = "kafka" # required
inputs = ["my-source-or-transform-id"] # required
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # required
compression = "none" # optional, default
healthcheck = true # optional, default
key_field = "user_id" # required
topic = "topic-1234" # required
# Batch
# Encoding
encoding.codec = "json" # required
  • commonoptionaltable

    batch

    Configures the sink batching behavior.

  • commonrequiredstring

    bootstrap_servers

    A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

    • View examples
  • optionaltable

    buffer

    Configures the sink specific buffer behavior.

    • commonoptionaluint

      max_events

      The maximum number of events allowed in the buffer. See Buffers & batches for more info.

      • Only relevant when: type = "memory"
      • Default: 500 (events)
    • commonrequired*uint

      max_size

      The maximum size of the buffer on the disk. See Buffers & batches for more info.

      • Only required when: type = "disk"
      • View examples
    • enumcommonoptionalstring

      type

      The buffer's type and storage mechanism.

      • Default: "memory"
      • Enum, must be one of: "memory" "disk"
      • View examples
    • enumoptionalstring

      when_full

      The behavior when the buffer becomes full.

      • Default: "block"
      • Enum, must be one of: "block" "drop_newest"
      • View examples
  • enumcommonoptionalstring

    compression

    The compression strategy used to compress the encoded event data before transmission.

    • Default: "none"
    • Enum, must be one of: "none" "gzip"
    • View examples
  • commonrequiredtable

    encoding

    Configures the encoding specific sink behavior.

    • commonrequiredstring

      codec

      The encoding codec used to serialize the events before outputting.

      • View examples
    • optional[string]

      except_fields

      Prevent the sink from encoding the specified labels.

      • View examples
    • optional[string]

      only_fields

      Prevent the sink from encoding the specified labels.

      • View examples
    • enumoptionalstring

      timestamp_format

      How to format event timestamps.

      • Default: "rfc3339"
      • Enum, must be one of: "rfc3339" "unix"
      • View examples
  • commonoptionalbool

    healthcheck

    Enables/disables the sink healthcheck upon Vector boot. See Health checks for more info.

    • Default: true
    • View examples
  • commonrequiredstring

    key_field

    The log field name to use for the topic key. If unspecified, the key will be randomly generated. If the field does not exist on the log, a blank value will be used.

    • View examples
  • optionaltable

    librdkafka_options

    Advanced options. See librdkafka documentation for details.

  • optionaluint

    message_timeout_ms

    Local message timeout.

    • Default: 300000
    • View examples
  • optionaltable

    sasl

    Options for SASL/SCRAM authentication support.

    • commonoptionalbool

      enabled

      Enable SASL/SCRAM authentication to the remote. (Not supported on Windows at this time.)

      • View examples
    • commonoptionalstring

      mechanism

      The Kafka SASL/SCRAM mechanisms.

      • View examples
    • commonoptionalstring

      password

      The Kafka SASL/SCRAM authentication password.

      • View examples
    • commonoptionalstring

      username

      The Kafka SASL/SCRAM authentication username.

      • View examples
  • optionaluint

    socket_timeout_ms

    Default timeout for network requests.

    • Default: 60000 (milliseconds)
    • View examples
  • optionaltable

    tls

    Configures the TLS options for incoming connections.

    • optionalstring

      ca_file

      Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.

      • View examples
    • commonoptionalstring

      crt_file

      Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.

      • View examples
    • commonoptionalbool

      enabled

      Enable TLS during connections to the remote.

      • Default: false
      • View examples
    • commonoptionalstring

      key_file

      Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.

      • View examples
    • optionalstring

      key_pass

      Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.

      • View examples
    • optionalbool

      verify_certificate

      If true (the default), Vector will validate the TLS certificate of the remote host.

      • Default: true
      • View examples
  • commonrequiredstring

    topic

    The Kafka topic name to write events to.

    • View examples

Telemetry

This component provides the following metrics that can be retrieved through the internal_metrics source. See the metrics section in the monitoring page for more info.

  • counter

    processed_events_total

    The total number of events processed by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • file - The file that produced the error

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    processed_bytes_total

    The total number of bytes processed by the component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

How It Works

Buffers & batches

This component buffers & batches data as shown in the diagram above. You'll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.

Batches are flushed when 1 of 2 conditions are met:

  1. The batch age meets or exceeds the configured timeout_secs.
  2. The batch size meets or exceeds the configured <% if component.options.batch.children.respond_to?(:max_size) %>max_size<% else %>max_events<% end %>.

Buffers are controlled via the buffer.* options.

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require health checks

If you'd like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable health checks

If you'd like to disable health checks for this sink you can set the healthcheck option to false.

Transport Layer Security (TLS)

Vector uses Openssl for TLS protocols for it's maturity. You can enable and adjust TLS behavior via the tls.* options.

librdkafka

The kafka sink uses librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.