Kafka Source

The Vector kafka source ingests data through Kafka and outputs log events.

Requirements

Configuration

vector.toml
[sources.my_source_id]
type = "kafka" # required
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # required
group_id = "consumer-group-name" # required
key_field = "message_key" # optional, no default
topics = ["^(prefix1|prefix2)-.+", "topic-1", "topic-2"] # required
  • stringoptional

    auto_offset_reset

    If offsets for consumer group do not exist, set them using this strategy. librdkafka documentation for auto.offset.reset option for explanation.

    • Default: "largest"
    • View examples
  • stringcommonrequired

    bootstrap_servers

    A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

    • No default
    • View examples
  • uint (milliseconds)optional

    commit_interval_ms

    The frequency that the consumer offsets are committed (written) to offset storage.

    • Default: 5000 (milliseconds)
    • View examples
  • uint (milliseconds)optional

    fetch_wait_max_ms

    Maximum time the broker may wait to fill the response.

    • Default: 100 (milliseconds)
    • View examples
  • stringcommonrequired

    group_id

    The consumer group name to be used to consume events from Kafka.

    • No default
    • View examples
  • stringcommonoptional

    key_field

    The log field name to use for the Kafka message key. If unspecified, the key would not be added to the log event. If the message has null key, then this field would not be added to the log event.

    • No default
    • View examples
  • tableoptional

    librdkafka_options

    Advanced options. See librdkafka documentation for details.

    • stringoptional

      [field-name]

      The options and their values. Accepts string values.

      • No default
      • View examples
  • tableoptional

    sasl

    Options for SASL/SCRAM authentication support.

    • boolcommonoptional

      enabled

      Enable SASL/SCRAM authentication to the remote. (Not supported on Windows at this time.)

      • No default
      • View examples
    • stringcommonoptional

      mechanism

      The Kafka SASL/SCRAM mechanisms.

      • No default
      • View examples
    • stringcommonoptional

      password

      The Kafka SASL/SCRAM authentication password.

      • No default
      • View examples
    • stringcommonoptional

      username

      The Kafka SASL/SCRAM authentication username.

      • No default
      • View examples
  • uint (milliseconds)optional

    session_timeout_ms

    The Kafka session timeout in milliseconds.

    • Default: 10000 (milliseconds)
    • View examples
  • uint (milliseconds)optional

    socket_timeout_ms

    Default timeout for network requests.

    • Default: 60000 (milliseconds)
    • View examples
  • tableoptional

    tls

    Configures the TLS options for connections from this sink.

    • stringoptional

      ca_file

      Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.

      • No default
      • View examples
    • stringcommonoptional

      crt_file

      Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.

      • No default
      • View examples
    • boolcommonoptional

      enabled

      Enable TLS during connections to the remote.

      • Default: false
      • View examples
    • stringcommonoptional

      key_file

      Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.

      • No default
      • View examples
    • stringoptional

      key_pass

      Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.

      • No default
      • View examples
  • [string]commonrequired

    topics

    The Kafka topics names to read events from. Regex is supported if the topic begins with ^.

    • No default
    • View examples

Fields

example log event
{
// ...
"message": "Started GET / for 127.0.0.1 at 2012-03-10 14:28:14 +0100",
"timestamp": "2019-11-01T21:15:47+00:00"
// ...
}
  • stringcommonrequired

    message

    The raw event message, unaltered.

    • No default
    • View examples
  • timestampcommonrequired

    timestamp

    Timestamp extracted from the event, or, if not present, the exact time the event was ingested.

    • No default
    • View examples

How It Works

Environment Variables

Environment variables are supported through all of Vector's configuration. Simply add ${MY_ENV_VAR} in your Vector configuration file and the variable will be replaced before being evaluated.

You can learn more in the Environment Variables section.

TLS

Vector uses Openssl for TLS protocols for it's battle-tested and reliable security. You can enable and adjust TLS behavior via the tls.* options.

librdkafka

The kafka source uses librdkafka under the hood. This is a battle tested, performant, and reliabile library that facilitates communication with Kafka. And because Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.