AWS Kinesis Data Firehose logs
Publish logs to AWS Kinesis Data Firehose topics
Configuration
Example configurations
{
"sinks": {
"my_sink_id": {
"type": "aws_kinesis_firehose",
"inputs": [
"my-source-or-transform-id"
],
"stream_name": "my-stream",
"acknowledgements": null,
"compression": "none",
"encoding": {
"codec": "json"
},
"healthcheck": null,
"region": "us-east-1"
}
}
}
[sinks.my_sink_id]
type = "aws_kinesis_firehose"
inputs = [ "my-source-or-transform-id" ]
stream_name = "my-stream"
compression = "none"
region = "us-east-1"
[sinks.my_sink_id.encoding]
codec = "json"
---
sinks:
my_sink_id:
type: aws_kinesis_firehose
inputs:
- my-source-or-transform-id
stream_name: my-stream
acknowledgements: null
compression: none
encoding:
codec: json
healthcheck: null
region: us-east-1
{
"sinks": {
"my_sink_id": {
"type": "aws_kinesis_firehose",
"inputs": [
"my-source-or-transform-id"
],
"auth": null,
"endpoint": "http://127.0.0.0:5000/path/to/service",
"stream_name": "my-stream",
"buffer": null,
"acknowledgements": null,
"batch": null,
"compression": "none",
"encoding": {
"codec": "json"
},
"healthcheck": null,
"request": null,
"tls": null,
"proxy": null,
"region": "us-east-1"
}
}
}
[sinks.my_sink_id]
type = "aws_kinesis_firehose"
inputs = [ "my-source-or-transform-id" ]
endpoint = "http://127.0.0.0:5000/path/to/service"
stream_name = "my-stream"
compression = "none"
region = "us-east-1"
[sinks.my_sink_id.encoding]
codec = "json"
---
sinks:
my_sink_id:
type: aws_kinesis_firehose
inputs:
- my-source-or-transform-id
auth: null
endpoint: http://127.0.0.0:5000/path/to/service
stream_name: my-stream
buffer: null
acknowledgements: null
batch: null
compression: none
encoding:
codec: json
healthcheck: null
request: null
tls: null
proxy: null
region: us-east-1
acknowledgements
common optional objectacknowledgement
settings.acknowledgements.enabled
optional boolfalse
auth
optional objectauth.access_key_id
optional string literalauth.assume_role
optional string literalauth.load_timeout_secs
optional uintassume_role
.5
(seconds)auth.profile
optional string literaldefault
auth.secret_access_key
optional string literalbatch
optional objectbatch.max_bytes
optional uintbatch.max_events
optional uintbatch.timeout_secs
optional float1
(seconds)buffer
optional objectbuffer.max_events
optional uinttype = "memory"
500
(events)buffer.type
optional string literal enumOption | Description |
---|---|
disk | Stores the sink’s buffer on disk. This is less performant, but durable. Data will not be lost between restarts. Will also hold data in memory to enhance performance. WARNING: This may stall the sink if disk performance isn’t on par with the throughput. For comparison, AWS gp2 volumes are usually too slow for common cases. |
memory | Stores the sink’s buffer in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully. |
memory
buffer.when_full
optional string literal enumOption | Description |
---|---|
block | Applies back pressure when the buffer is full. This prevents data loss, but will cause data to pile up on the edge. |
drop_newest | Drops new data as it’s received. This data is lost. This should be used when performance is the highest priority. |
block
compression
common optional string literal enumThe compression strategy used to compress the encoded event data before transmission.
Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.
Option | Description |
---|---|
gzip | Gzip standard DEFLATE compression. |
none | No compression. |
none
encoding
required objectencoding.codec
optional string literal enumOption | Description |
---|---|
json | JSON encoded event. |
ndjson | Newline delimited list of JSON encoded events. |
text | The message field from the event. |
encoding.except_fields
optional [string]encoding.only_fields
optional [string]encoding.timestamp_format
optional string literal enumOption | Description |
---|---|
rfc3339 | Formats as a RFC3339 string |
unix | Formats as a unix timestamp |
rfc3339
endpoint
optional string literalinputs
required [string]A list of upstream source or transform
IDs. Wildcards (*
) are supported.
See configuration for more info.
proxy
optional objectproxy.http
optional string literalproxy.https
optional string literalproxy.no_proxy
optional [string]A list of hosts to avoid proxying. Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.com matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
request
optional objectrequest.adaptive_concurrency
optional objectrequest.adaptive_concurrency.decrease_ratio
optional float0.9
request.adaptive_concurrency.ewma_alpha
optional float0.7
request.adaptive_concurrency.rtt_deviation_scale
optional float2
request.concurrency
optional uintrequest.rate_limit_duration_secs
optional uintrate_limit_num
option.1
(seconds)request.rate_limit_num
optional uintrate_limit_duration_secs
time window.9.223372036854776e+18
request.retry_attempts
optional uint1.8446744073709552e+19
request.retry_initial_backoff_secs
optional uint1
(seconds)request.retry_max_duration_secs
optional uint3600
(seconds)request.timeout_secs
optional uint60
(seconds)stream_name
required string literaltls
optional objecttls.ca_file
optional string literaltls.crt_file
optional string literalkey_file
must also be set.tls.key_file
optional string literalcrt_file
must also be set.tls.key_pass
optional string literalkey_file
is set.tls.verify_certificate
optional booltrue
(the default), Vector will validate the TLS certificate of the remote host.true
tls.verify_hostname
optional booltrue
(the default), Vector will validate the configured remote host name against the remote host’s TLS certificate. Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.true
Environment variables
AWS_ACCESS_KEY_ID
common optional string literalAWS_CONFIG_FILE
common optional string literal~/.aws/config
AWS_CREDENTIAL_EXPIRATION
common optional string literalAWS_DEFAULT_REGION
common optional string literalAWS_PROFILE
common optional string literaldefault
AWS_ROLE_SESSION_NAME
common optional string literalAWS_SECRET_ACCESS_KEY
common optional string literalAWS_SESSION_TOKEN
common optional string literalAWS_SHARED_CREDENTIALS_FILE
common optional string literal~/.aws/credentials
Telemetry
Metrics
linkbuffer_byte_size
gaugecomponent_id
instead. The value is the same as component_id
.buffer_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_events
gaugecomponent_id
instead. The value is the same as component_id
.buffer_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_received_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.events_in_total
countercomponent_received_events_total
instead.component_id
instead. The value is the same as component_id
.utilization
gaugecomponent_id
instead. The value is the same as component_id
.Permissions
Policy | Required for | Required when |
---|---|---|
firehose:DescribeDeliveryStream |
| |
firehose:PutRecordBatch |
|
How it works
AWS authentication
Vector checks for AWS credentials in the following order:
- The
access_key_id
andsecret_access_key
options. - The
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environment variables. - The AWS credentials file (usually located at
~/.aws/credentials
). - The IAM instance profile (only works if running on an EC2 instance with an instance profile/role). Requires IMDSv2 to be enabled. For EKS, you may need to increase the metadata token response hop limit to 2.
Note that use of
credentials_process
in AWS credentials files is not supported as the underlying AWS SDK currently lacks
support.
If no credentials are found, Vector’s health check fails and an error is logged. If your AWS credentials expire, Vector will automatically search for up-to-date credentials in the places (and order) described above.
Obtaining an access key
access_key_id
and secret_access_key
options.Assuming roles
assume_role
option. This is an
optional setting that is helpful for a variety of use cases, such as cross
account access.Buffers and batches
This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
timeout_secs
. - The batch size meets or exceeds the configured
max_bytes
ormax_events
.
Buffers are controlled via the buffer.*
options.
Health checks
Require health checks
If you’d like to exit immediately upon a health check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.toml --require-healthy
Disable health checks
healthcheck
option to
false
.Partitioning
Vector supports dynamic configuration values through a simple template syntax. If an option supports templating, it will be noted with a badge and you can use event fields to create dynamic values. For example:
[sinks.my-sink]
dynamic_option = "application={{ application_id }}"
In the above example, the application_id
for each event will be
used to partition outgoing data.
Rate limits & adapative concurrency
Adaptive Request Concurrency (ARC)
Adaptive Request Concurrency is a feature of Vector that does away with static concurrency limits and automatically optimizes HTTP concurrency based on downstream service responses. The underlying mechanism is a feedback loop inspired by TCP congestion control algorithms. Checkout the announcement blog post,
We highly recommend enabling this feature as it improves performance and reliability of Vector and the systems it communicates with. As such, we have made it the default, and no further configuration is required.
Static concurrency
If Adaptive Request Concurrency is not for you, you can manually set static concurrency
limits by specifying an integer for request.concurrency
:
[sinks.my-sink]
request.concurrency = 10
Rate limits
In addition to limiting request concurrency, you can also limit the overall request
throughput via the request.rate_limit_duration_secs
and request.rate_limit_num
options.
[sinks.my-sink]
request.rate_limit_duration_secs = 1
request.rate_limit_num = 10
These will apply to both adaptive
and fixed request.concurrency
values.