AWS Kinesis Data Streams Sink

The Vector aws_kinesis_streams sink batches log events to Amazon Web Service's Kinesis Data Stream service via the PutRecords API endpoint.

Configuration

vector.toml
[sinks.my_sink_id]
# General
type = "aws_kinesis_streams" # required
inputs = ["my-source-or-transform-id"] # required
compression = "none" # optional, default
partition_key_field = "user_id" # optional, no default
region = "us-east-1" # required, required when endpoint = ""
stream_name = "my-stream" # required
# Encoding
encoding.codec = "json" # required
  • stringoptional

    assume_role

    The ARN of an IAM role to assume at startup. See AWS Authentication for more info.

    • No default
    • View examples
  • tableoptional

    batch

    Configures the sink batching behavior.

    • uint (events)commonoptional

      max_events

      The maximum size of a batch, in events, before it is flushed. See Buffers & Batches for more info.

      • Default: 500 (events)
    • uint (seconds)commonoptional

      timeout_secs

      The maximum age of a batch before it is flushed. See Buffers & Batches for more info.

      • Default: 1 (seconds)
  • tableoptional

    buffer

    Configures the sink specific buffer behavior.

    • uint (events)commonoptional

      max_events

      The maximum number of events allowed in the buffer. See Buffers & Batches for more info.

      • Only relevant when: type = "memory"
      • Default: 500 (events)
    • uint (bytes)required*

      max_size

      The maximum size of the buffer on the disk.

      • Only required when: type = "disk"
      • No default
      • View examples
    • stringenumcommonoptional

      type

      The buffer's type and storage mechanism.

      • Default: "memory"
      • Enum, must be one of: "memory" "disk"
      • View examples
    • stringenumoptional

      when_full

      The behavior when the buffer becomes full.

      • Default: "block"
      • Enum, must be one of: "block" "drop_newest"
      • View examples
  • stringenumcommonoptional

    compression

    The compression strategy used to compress the encoded event data before transmission.

    • Default: "none"
    • Enum, must be one of: "none" "gzip"
    • View examples
  • tablecommonrequired

    encoding

    Configures the encoding specific sink behavior.

    • stringenumcommonrequired

      codec

      The encoding codec used to serialize the events before outputting.

      • No default
      • Enum, must be one of: "json" "text"
      • View examples
    • [string]optional

      except_fields

      Prevent the sink from encoding the specified labels.

      • No default
      • View examples
    • [string]optional

      only_fields

      Limit the sink to only encoding the specified labels.

      • No default
      • View examples
    • stringenumoptional

      timestamp_format

      How to format event timestamps.

      • Default: "rfc3339"
      • Enum, must be one of: "rfc3339" "unix"
      • View examples
  • stringoptional

    endpoint

    Custom endpoint for use with AWS-compatible services. Providing a value for this option will make region moot.

    • Only relevant when: region = null
    • No default
    • View examples
  • stringcommonoptional

    partition_key_field

    The log field used as the Kinesis record's partition key value. See Partitioning for more info.

    • No default
    • View examples
  • stringcommonrequired*

    region

    The AWS region of the target service. If endpoint is provided it will override this value since the endpoint includes the region.

    • Only required when: endpoint = null
    • No default
    • View examples
  • tableoptional

    request

    Configures the sink request behavior.

    • uint (requests)commonoptional

      in_flight_limit

      The maximum number of in-flight requests allowed at any given time. See Rate Limits for more info.

      • Default: 5 (requests)
    • uint (seconds)commonoptional

      rate_limit_duration_secs

      The time window, in seconds, used for the rate_limit_num option. See Rate Limits for more info.

      • Default: 1 (seconds)
    • uintcommonoptional

      rate_limit_num

      The maximum number of requests allowed within the rate_limit_duration_secs time window. See Rate Limits for more info.

      • Default: 5
    • uintoptional

      retry_attempts

      The maximum number of retries to make for failed requests. The default, for all intents and purposes, represents an infinite number of retries. See Retry Policy for more info.

      • Default: 18446744073709552000
    • uint (seconds)optional

      retry_initial_backoff_secs

      The amount of time to wait before attempting the first retry for a failed request. Once, the first retry has failed the fibonacci sequence will be used to select future backoffs.

      • Default: 1 (seconds)
    • uint (seconds)optional

      retry_max_duration_secs

      The maximum amount of time, in seconds, to wait between retries.

      • Default: 10 (seconds)
    • uint (seconds)commonoptional

      timeout_secs

      The maximum time a request can take before being aborted. It is highly recommended that you do not lower value below the service's internal timeout, as this could create orphaned requests, pile on retries, and result in duplicate data downstream. See Buffers & Batches for more info.

      • Default: 30 (seconds)
  • stringcommonrequired

    stream_name

    The stream name of the target Kinesis Logs stream.

    • No default
    • View examples

Env Vars

  • stringoptional

    AWS_ACCESS_KEY_ID

    Used for AWS authentication when communicating with AWS services. See relevant AWS components for more info. See AWS Authentication for more info.

    • No default
    • View examples
  • stringoptional

    AWS_SECRET_ACCESS_KEY

    Used for AWS authentication when communicating with AWS services. See relevant AWS components for more info. See AWS Authentication for more info.

    • No default
    • View examples

Examples

POST / HTTP/1.1
Host: kinesis.<region>.<domain>
Content-Length: <byte_size>
Content-Type: application/x-amz-json-1.1
Connection: Keep-Alive
X-Amz-Target: Kinesis_20131202.PutRecords
{
"Records": [
{
"Data": "<json_encoded_log>",
"PartitionKey": "<partition_key>"
},
{
"Data": "<json_encoded_log>",
"PartitionKey": "<partition_key>"
},
{
"Data": "<json_encoded_log>",
"PartitionKey": "<partition_key>"
},
],
"StreamName": "<stream_name>"
}

How It Works

AWS Authentication

Vector checks for AWS credentials in the following order:

  1. Environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  2. The credential_process command in the AWS config file. (usually located at ~/.aws/config)
  3. The AWS credentials file. (usually located at ~/.aws/credentials)
  4. The IAM instance profile. (will only work if running on an EC2 instance with an instance profile/role)

If credentials are not found the healtcheck will fail and an error will be logged.

Obtaining an access key

In general, we recommend using instance profiles/roles whenever possible. In cases where this is not possible you can generate an AWS access key for any user within your AWS account. AWS provides a detailed guide on how to do this.

Assuming Roles

Vector can assume an AWS IAM role via the assume_role option. This is an optional setting that is helpful for a variety of use cases, such as cross account access.

Buffers & Batches

The aws_kinesis_streams sink buffers & batches data as shown in the diagram above. You'll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.

Batches are flushed when 1 of 2 conditions are met:

  1. The batch age meets or exceeds the configured timeout_secs.
  2. The batch size meets or exceeds the configured max_events.

Buffers are controlled via the buffer.* options.

Environment Variables

Environment variables are supported through all of Vector's configuration. Simply add ${MY_ENV_VAR} in your Vector configuration file and the variable will be replaced before being evaluated.

You can learn more in the Environment Variables section.

Health Checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require Health Checks

If you'd like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable Health Checks

If you'd like to disable health checks for this sink you can set the healthcheck option to false.

Partitioning

By default, Vector issues random 16 byte values for each Kinesis record's partition key, evenly distributing records across your Kinesis partitions. Depending on your use case this might not be sufficient since random distribution does not preserve order. To override this, you can supply the partition_key_field option. This option presents an alternate field on your event to use as the partition key value instead. This is useful if you have a field already on your event, and it also pairs nicely with the add_fields transform.

Missing keys or blank values

Kenesis requires a value for the partition key and therefore if the key is missing or the value is blank the event will be dropped and a warning level log event will be logged. As such, the field specified in the partition_key_field option should always contain a value.

Values that exceed 256 characters

If the value provided exceeds the maximum allowed length of 256 characters Vector will slice the value and use the first 256 characters.

Non-string values

Vector will coerce the value into a string.

Provisioning & capacity planning

This is generally outside the scope of Vector but worth touching on. When you supply your own partition key it opens up the possibility for "hot spots", and you should be aware of your data distribution for the key you're providing. Kinesis provides the ability to manually split shards to accommodate this. If they key you're using is dynamic and unpredictable we highly recommend reconsidering your ordering policy to allow for even and random distribution.

Rate Limits

Vector offers a few levers to control the rate and volume of requests to the downstream service. Start with the rate_limit_duration_secs and rate_limit_num options to ensure Vector does not exceed the specified number of requests in the specified window. You can further control the pace at which this window is saturated with the in_flight_limit option, which will guarantee no more than the specified number of requests are in-flight at any given time.

Please note, Vector's defaults are carefully chosen and it should be rare that you need to adjust these. If you found a good reason to do so please share it with the Vector team by opening an issue.

Retry Policy

Vector will retry failed requests (status == 429, >= 500, and != 501). Other responses will not be retried. You can control the number of retry attempts and backoff rate with the retry_attempts and retry_backoff_secs options.