Kafka
Collect logs from Kafka
status: stable
role: aggregator
delivery: at-least-once
acknowledgements: yes
egress: stream
state: stateless
output: log
Configuration
Example configurations
{
"sources": {
"my_source_id": {
"type": "kafka",
"acknowledgements": null,
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"group_id": "consumer-group-name",
"key_field": "message_key",
"topics": [
"^(prefix1|prefix2)-.+"
]
}
}
}
[sources.my_source_id]
type = "kafka"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
group_id = "consumer-group-name"
key_field = "message_key"
topics = [ "^(prefix1|prefix2)-.+" ]
---
sources:
my_source_id:
type: kafka
acknowledgements: null
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
group_id: consumer-group-name
key_field: message_key
topics:
- ^(prefix1|prefix2)-.+
{
"sources": {
"my_source_id": {
"type": "kafka",
"acknowledgements": null,
"auto_offset_reset": "largest",
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"commit_interval_ms": 5000,
"fetch_wait_max_ms": 100,
"group_id": "consumer-group-name",
"key_field": "message_key",
"topic_key": "topic",
"partition_key": "partition",
"offset_key": "offset",
"headers_key": "headers",
"librdkafka_options": {
"client.id": "${ENV_VAR}",
"fetch.error.backoff.ms": "1000",
"socket.send.buffer.bytes": "100"
},
"sasl": null,
"session_timeout_ms": 10000,
"socket_timeout_ms": 60000,
"framing": null,
"topics": [
"^(prefix1|prefix2)-.+"
],
"decoding": null
}
}
}
[sources.my_source_id]
type = "kafka"
auto_offset_reset = "largest"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
commit_interval_ms = 5_000
fetch_wait_max_ms = 100
group_id = "consumer-group-name"
key_field = "message_key"
topic_key = "topic"
partition_key = "partition"
offset_key = "offset"
headers_key = "headers"
session_timeout_ms = 10_000
socket_timeout_ms = 60_000
topics = [ "^(prefix1|prefix2)-.+" ]
[sources.my_source_id.librdkafka_options]
"client.id" = "${ENV_VAR}"
"fetch.error.backoff.ms" = "1000"
"socket.send.buffer.bytes" = "100"
---
sources:
my_source_id:
type: kafka
acknowledgements: null
auto_offset_reset: largest
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
commit_interval_ms: 5000
fetch_wait_max_ms: 100
group_id: consumer-group-name
key_field: message_key
topic_key: topic
partition_key: partition
offset_key: offset
headers_key: headers
librdkafka_options:
client.id: ${ENV_VAR}
fetch.error.backoff.ms: "1000"
socket.send.buffer.bytes: "100"
sasl: null
session_timeout_ms: 10000
socket_timeout_ms: 60000
framing: null
topics:
- ^(prefix1|prefix2)-.+
decoding: null
acknowledgements
common optional objectControls how acknowledgements are handled by this source. These settings override the global
acknowledgement
settings. This setting is deprecated in favor of enabling acknowledgements
in the destination sink.acknowledgements.enabled
common optional boolControls if the source will wait for destination sinks to deliver the events before acknowledging receipt.
default:
false
auto_offset_reset
optional string literalIf offsets for consumer group do not exist, set them using this strategy. See the
librdkafka documentation for the
auto.offset.reset
option for further
clarification.default:
largest
bootstrap_servers
required string literalA comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a “bootstrap” Kafka cluster that a Kafka client connects to initially to bootstrap itself.
commit_interval_ms
optional uintThe frequency that the consumer offsets are committed (written) to offset storage.
default:
5000
(milliseconds)decoding
optional objectConfigures in which way frames are decoded into events.
decoding.codec
common optional string literal enumThe decoding method.
Enum options
Option | Description |
---|---|
bytes | Events containing the byte frame as-is. |
json | Events being parsed from a JSON string. |
native | Events being parsed from Vector’s native protobuf format (EXPERIMENTAL). |
native_json | Events being parsed from Vector’s native JSON format (EXPERIMENTAL). |
syslog | Events being parsed from a Syslog message. |
default:
bytes
fetch_wait_max_ms
optional uintMaximum time the broker may wait to fill the response.
default:
100
(milliseconds)framing
optional objectConfigures in which way incoming byte sequences are split up into byte frames.
framing.character_delimited
required objectOptions for
character_delimited
framing.Relevant when:
method = `character_delimited`
framing.character_delimited.delimiter
required ascii_charThe character used to separate frames.
framing.character_delimited.max_length
optional uintThe maximum frame length limit. Any frames longer than
max_length
bytes will be discarded entirely.framing.method
common optional string literal enumThe framing method.
Enum options
Option | Description |
---|---|
bytes | Byte frames are passed through as-is according to the underlying I/O boundaries (e.g. split between messages or stream segments). |
character_delimited | Byte frames which are delimited by a chosen character. |
length_delimited | Byte frames whose length is encoded in a header. |
newline_delimited | Byte frames which are delimited by a newline character. |
octet_counting | Byte frames according to the octet counting format. |
default:
bytes
framing.newline_delimited
optional objectOptions for
newline_delimited
framing.Relevant when:
method = `newline_delimited`
framing.newline_delimited.max_length
optional uintThe maximum frame length limit. Any frames longer than
max_length
bytes will be discarded entirely.framing.octet_counting
optional objectOptions for
octet_counting
framing.Relevant when:
method = `octet_counting`
framing.octet_counting.max_length
optional uintThe maximum frame length limit. Any frames longer than
max_length
bytes will be discarded entirely.group_id
required string literalThe consumer group name to be used to consume events from Kafka.
key_field
common optional string literalThe log field name to use for the Kafka message key.
default:
message_key
partition_key
optional string literalThe log field name to use for the Kafka partition name.
default:
partition
sasl
optional objectOptions for SASL/SCRAM authentication support.
sasl.enabled
common optional boolEnable SASL/SCRAM authentication to the remote (not supported on Windows at this time).
sasl.mechanism
common optional string literalThe Kafka SASL/SCRAM mechanisms.
sasl.password
common optional string literalThe Kafka SASL/SCRAM authentication password.
sasl.username
common optional string literalThe Kafka SASL/SCRAM authentication username.
session_timeout_ms
optional uintThe Kafka session timeout in milliseconds.
default:
10000
(milliseconds)topics
required [string]The Kafka topics names to read events from. Regex is supported if the topic begins with
^
.Outputs
<component_id>
Default output stream of the component. Use this component’s ID as an input to downstream transforms and sinks.
Output Data
Logs
Record
An individual Kafka record
message
required
string
literal
The raw line from the Kafka record.
Examples
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
offset
required
uint
The Kafka offset at the time the record was retrieved.
Examples
100
partition
required
string
literal
The Kafka partition that the record came from.
Examples
partition
timestamp
required
timestamp
The timestamp encoded in the Kafka message or the current time if it cannot be fetched.
Examples
2020-10-10T17:07:36.452332Z
topic
required
string
literal
The Kafka topic that the record came from.
Examples
topic
Telemetry
Metrics
linkcomponent_discarded_events_total
counterThe number of events dropped by this component.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
component_errors_total
counterThe total number of errors encountered by this component.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
error_type
required
The type of the error
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
stage
required
The stage within the component at which the error occurred.
component_received_bytes_total
counterThe number of raw bytes accepted by this component from source origins.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the data originated.
file
optional
The file from which the data originated.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the data originated.
peer_path
optional
The pathname from which the data originated.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the data originated.
uri
optional
The sanitized URI from which the data originated.
component_received_event_bytes_total
counterThe number of event bytes accepted by this component either from
tagged origins like file and uri, or cumulatively from other origins.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the data originated.
file
optional
The file from which the data originated.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the data originated.
peer_path
optional
The pathname from which the data originated.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the data originated.
uri
optional
The sanitized URI from which the data originated.
component_received_events_total
counterThe number of events accepted by this component either from tagged
origins like file and uri, or cumulatively from other origins.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the data originated.
file
optional
The file from which the data originated.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the data originated.
peer_path
optional
The pathname from which the data originated.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the data originated.
uri
optional
The sanitized URI from which the data originated.
component_sent_event_bytes_total
counterThe total number of event bytes emitted by this component.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
output
optional
The specific output of the component.
pid
optional
The process ID of the Vector instance.
component_sent_events_total
counterThe total number of events emitted by this component.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
output
optional
The specific output of the component.
pid
optional
The process ID of the Vector instance.
consumer_offset_updates_failed_total
counterThe total number of failures to update a Kafka consumer offset.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
events_failed_total
counterThe total number of failures to read a Kafka message.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
events_in_total
counterThe number of events accepted by this component either from tagged
origins like file and uri, or cumulatively from other origins.
This metric is deprecated and will be removed in a future version.
Use
component_received_events_total
instead.component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the data originated.
file
optional
The file from which the data originated.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the data originated.
peer_path
optional
The pathname from which the data originated.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the data originated.
uri
optional
The sanitized URI from which the data originated.
events_out_total
counterThe total number of events emitted by this component.
This metric is deprecated and will be removed in a future version.
Use
component_sent_events_total
instead.component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
output
optional
The specific output of the component.
pid
optional
The process ID of the Vector instance.
kafka_consumed_messages_bytes_total
counterTotal number of message bytes (including framing) received from Kafka brokers.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_consumed_messages_total
counterTotal number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_produced_messages_bytes_total
counterTotal number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_produced_messages_total
counterTotal number of messages transmitted (produced) to Kafka brokers.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_queue_messages
gaugeCurrent number of messages in producer queues.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_queue_messages_bytes
gaugeCurrent total size of messages in producer queues.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_requests_bytes_total
counterTotal number of bytes transmitted to Kafka brokers.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_requests_total
counterTotal number of requests sent to Kafka brokers.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_responses_bytes_total
counterTotal number of bytes received from Kafka brokers.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
kafka_responses_total
counterTotal number of responses received from Kafka brokers.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
processed_bytes_total
counterThe number of bytes processed by the component.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the bytes originate.
file
optional
The file from which the bytes originate.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the bytes originate.
peer_path
optional
The pathname from which the bytes originate.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the bytes originate.
uri
optional
The sanitized URI from which the bytes originate.
processed_events_total
counterThe total number of events processed by this component.
This metric is deprecated in place of using
component_received_events_total
and
component_sent_events_total
metrics.component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
How it works
librdkafka
The
kafka
sink uses librdkafka
under the hood. This
is a battle-tested, high performance, and reliable library that facilitates
communication with Kafka. As Vector produces static MUSL builds,
this dependency is packaged with Vector, meaning you do not need to install it.