Kafka Sink

The Vector kafka sink sends logs and metrics to Kafka.

Requirements

Configuration

[sinks.my_sink_id]
# General
type = "kafka" # required
inputs = ["my-source-or-transform-id", "prefix-*"] # required
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # required
compression = "none" # optional, default
key_field = "user_id" # required
topic = "topic-1234" # required
# Batch
# Encoding
encoding.codec = "json" # required
# Healthcheck
healthcheck.enabled = true # optional, default
  • commonoptionaltable

    batch

    Configures the sink batching behavior.

  • commonrequiredstring

    bootstrap_servers

    A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

    • Syntax: literal
  • optionaltable

    buffer

    Configures the sink specific buffer behavior.

    • commonoptionaluint

      max_events

      The maximum number of events allowed in the buffer. See Buffers & batches for more info.

      • Only relevant when: type = "memory"
      • Default: 500 (events)
    • commonrequired*uint

      max_size

      The maximum size of the buffer on the disk. See Buffers & batches for more info.

      • Only required when: type = "disk"
    • enumcommonoptionalstring

      type

      The buffer's type and storage mechanism.

      • Syntax: literal
      • Default: "memory"
      • Enum, must be one of: "memory" "disk"
    • enumoptionalstring

      when_full

      The behavior when the buffer becomes full.

      • Syntax: literal
      • Default: "block"
      • Enum, must be one of: "block" "drop_newest"
  • enumcommonoptionalstring

    compression

    The compression strategy used to compress the encoded event data before transmission.

    Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.

    • Syntax: literal
    • Default: "none"
    • Enum, must be one of: "none" "gzip" "syntax"
  • commonrequiredtable

    encoding

    Configures the encoding specific sink behavior.

    • commonrequiredstring

      codec

      The encoding codec used to serialize the events before outputting.

      • Syntax: literal
    • optional[string]

      except_fields

      Prevent the sink from encoding the specified labels.

    • optional[string]

      only_fields

      Prevent the sink from encoding the specified labels.

    • enumoptionalstring

      timestamp_format

      How to format event timestamps.

      • Syntax: literal
      • Default: "rfc3339"
      • Enum, must be one of: "rfc3339" "unix"
  • commonoptionaltable

    healthcheck

    Health check options for the sink. See Health checks for more info.

    • commonoptionalbool

      enabled

      Enables/disables the healthcheck upon Vector boot.

      • Default: true
  • commonrequiredstring

    key_field

    The log field name or tags key to use for the topic key. If unspecified, the key will be randomly generated. If the field does not exist on the log or in tags, a blank value will be used.

    • Syntax: literal
  • optionaltable

    librdkafka_options

    Advanced options. See librdkafka documentation for details.

  • optionaluint

    message_timeout_ms

    Local message timeout.

    • Default: 300000
  • optionaltable

    sasl

    Options for SASL/SCRAM authentication support.

    • commonoptionalbool

      enabled

      Enable SASL/SCRAM authentication to the remote. (Not supported on Windows at this time.)

    • commonoptionalstring

      mechanism

      The Kafka SASL/SCRAM mechanisms.

      • Syntax: literal
    • commonoptionalstring

      password

      The Kafka SASL/SCRAM authentication password.

      • Syntax: literal
    • commonoptionalstring

      username

      The Kafka SASL/SCRAM authentication username.

      • Syntax: literal
  • optionaluint

    socket_timeout_ms

    Default timeout for network requests.

    • Default: 60000 (milliseconds)
  • optionaltable

    tls

    Configures the TLS options for incoming connections.

    • optionalstring

      ca_file

      Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.

      • Syntax: literal
    • commonoptionalstring

      crt_file

      Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.

      • Syntax: literal
    • commonoptionalbool

      enabled

      Enable TLS during connections to the remote.

      • Default: false
    • commonoptionalstring

      key_file

      Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.

      • Syntax: literal
    • optionalstring

      key_pass

      Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.

      • Syntax: literal
    • optionalbool

      verify_certificate

      If true (the default), Vector will validate the TLS certificate of the remote host.

      • Default: true
  • commonrequiredstring

    topic

    The Kafka topic name to write events to.

    • Syntax: literal

Telemetry

This component provides the following metrics that can be retrieved through the internal_metrics source. See the metrics section in the monitoring page for more info.

  • counter

    events_discarded_total

    The total number of events discarded by this component. This metric includes the following tags:

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    events_in_total

    The total number of events accepted by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    processing_errors_total

    The total number of processing errors encountered by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • error_type - The type of the error

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    events_out_total

    The total number of events emitted by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

How It Works

Buffers & batches

Buffers and Batches

This component buffers & batches data as shown in the diagram above. You'll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.

Batches are flushed when 1 of 2 conditions are met:

  1. The batch age meets or exceeds the configured timeout_secs.
  2. The batch size meets or exceeds the configured <% if component.options.batch.children.respond_to?(:max_size) %>max_size<% else %>max_events<% end %>.

Buffers are controlled via the buffer.* options.

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require health checks

If you'd like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable health checks

If you'd like to disable health checks for this sink you can set the healthcheck option to false.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses Openssl for TLS protocols for it's maturity. You can enable and adjust TLS behavior via the tls.* options.

librdkafka

The kafka sink uses librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.