Kafka Source

The Vector kafka source collects logs from Kafka.

Requirements

Configuration

[sources.my_source_id]
type = "kafka" # required
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # required
group_id = "consumer-group-name" # required
key_field = "message_key" # optional, no default
topics = ["^(prefix1|prefix2)-.+", "topic-1", "topic-2"] # required
  • optionalstring

    auto_offset_reset

    If offsets for consumer group do not exist, set them using this strategy. librdkafka documentation for auto.offset.reset option for explanation.

    • Default: "largest"
    • View examples
  • commonrequiredstring

    bootstrap_servers

    A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

    • View examples
  • optionaluint

    commit_interval_ms

    The frequency that the consumer offsets are committed (written) to offset storage.

    • Default: 5000 (milliseconds)
    • View examples
  • optionaluint

    fetch_wait_max_ms

    Maximum time the broker may wait to fill the response.

    • Default: 100 (milliseconds)
    • View examples
  • commonrequiredstring

    group_id

    The consumer group name to be used to consume events from Kafka.

    • View examples
  • commonoptionalstring

    key_field

    The log field name to use for the Kafka message key. If unspecified, the key would not be added to the log event. If the message has null key, then this field would not be added to the log event.

    • View examples
  • optionaltable

    librdkafka_options

    Advanced options. See librdkafka documentation for details.

  • optionalstring

    offset_key

    The log field name to use for the Kafka offset. If unspecified, the key would not be added to the log event.

    • View examples
  • optionalstring

    partition_key

    The log field name to use for the Kafka partition name. If unspecified, the key would not be added to the log event.

    • View examples
  • optionaltable

    sasl

    Options for SASL/SCRAM authentication support.

    • commonoptionalbool

      enabled

      Enable SASL/SCRAM authentication to the remote. (Not supported on Windows at this time.)

      • View examples
    • commonoptionalstring

      mechanism

      The Kafka SASL/SCRAM mechanisms.

      • View examples
    • commonoptionalstring

      password

      The Kafka SASL/SCRAM authentication password.

      • View examples
    • commonoptionalstring

      username

      The Kafka SASL/SCRAM authentication username.

      • View examples
  • optionaluint

    session_timeout_ms

    The Kafka session timeout in milliseconds.

    • Default: 10000 (milliseconds)
    • View examples
  • optionaluint

    socket_timeout_ms

    Default timeout for network requests.

    • Default: 60000 (milliseconds)
    • View examples
  • optionaltable

    tls

    Configures the TLS options for incoming connections.

    • optionalstring

      ca_file

      Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.

      • View examples
    • commonoptionalstring

      crt_file

      Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.

      • View examples
    • commonoptionalbool

      enabled

      Enable TLS during connections to the remote.

      • Default: false
      • View examples
    • commonoptionalstring

      key_file

      Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.

      • View examples
    • optionalstring

      key_pass

      Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.

      • View examples
    • optionalbool

      verify_certificate

      If true (the default), Vector will validate the TLS certificate of the remote host.

      • Default: true
      • View examples
  • optionalstring

    topic_key

    The log field name to use for the Kafka topic. If unspecified, the key would not be added to the log event.

    • View examples
  • commonrequired[string]

    topics

    The Kafka topics names to read events from. Regex is supported if the topic begins with ^.

    • View examples

Output

This component outputs log events with the following fields:

{
"message" : "53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] \"GET /disintermediate HTTP/2.0\" 401 20308",
"offset" : 100,
"partition" : "partition",
"timestamp" : "2020-10-10T17:07:36+00:00",
"topic" : "topic"
}
  • commonrequiredstring

    message

    The raw line from the Kafka record.

    • View examples
  • commonrequireduint

    offset

    The Kafka offset at the time the record was retrieved.

    • View examples
  • commonrequiredstring

    partition

    The Kafka partition that the record came from.

    • View examples
  • commonrequiredtimestamp

    timestamp

    If the Splunk HEC event endpoint is used then the value of the time field will be used. If the Splunk HEC raw endpoint is used, then the current time the event was received will be used.

    • View examples
  • commonrequiredstring

    topic

    The Kafka topic that the record came from.

    • View examples

Telemetry

This component provides the following metrics that can be retrieved through the internal_metrics source. See the metrics section in the monitoring page for more info.

  • counter

    processed_events_total

    The total number of events processed by this component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • file - The file that produced the error

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

  • counter

    processed_bytes_total

    The total number of bytes processed by the component. This metric includes the following tags:

    • component_kind - The Vector component kind.

    • component_name - The Vector component ID.

    • component_type - The Vector component type.

    • instance - The Vector instance identified by host and port.

    • job - The name of the job producing Vector metrics.

How It Works

Context

By default, the kafka source will augment events with helpful context keys as shown in the "Output" section.

Transport Layer Security (TLS)

Vector uses Openssl for TLS protocols. You can adjust TLS behavior via the tls.* options.

librdkafka

The kafka sink uses librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.