AWS Kinesis Firehose

Collect logs from AWS Kinesis Firehose

status: beta role: aggregator delivery: at-least-once egress: batch state: stateless output: log

Requirements

AWS Kinesis Firehose can only deliver data over HTTP. You will need to solve TLS termination by fronting Vector with a load balancer or configuring the tls.* options.

Configuration

Example configurations

{
  "sources": {
    "my_source_id": {
      "type": "aws_kinesis_firehose",
      "address": "0.0.0.0:443",
      "access_key": "A94A8FE5CCB19BA61C4C08",
      "record_compression": "text"
    }
  }
}
[sources.my_source_id]
type = "aws_kinesis_firehose"
address = "0.0.0.0:443"
access_key = "A94A8FE5CCB19BA61C4C08"
record_compression = "text"
---
sources:
  my_source_id:
    type: aws_kinesis_firehose
    address: 0.0.0.0:443
    access_key: A94A8FE5CCB19BA61C4C08
    acknowledgements: null
    record_compression: text
{
  "sources": {
    "my_source_id": {
      "type": "aws_kinesis_firehose",
      "address": "0.0.0.0:443",
      "access_key": "A94A8FE5CCB19BA61C4C08",
      "record_compression": "text"
    }
  }
}
[sources.my_source_id]
type = "aws_kinesis_firehose"
address = "0.0.0.0:443"
access_key = "A94A8FE5CCB19BA61C4C08"
record_compression = "text"
---
sources:
  my_source_id:
    type: aws_kinesis_firehose
    address: 0.0.0.0:443
    access_key: A94A8FE5CCB19BA61C4C08
    acknowledgements: null
    tls: null
    record_compression: text

access_key

common optional string literal
AWS Kinesis Firehose can be configured to pass along an access key to authenticate requests. If configured, access_key should be set to the same value. If not specified, vector will treat all requests as authenticated.
Examples
"A94A8FE5CCB19BA61C4C08"

acknowledgements

common optional object
Controls how acknowledgements are handled by this source.
Controls if the source will wait for destination sinks to deliver the events before acknowledging receipt.
default: false

address

required string literal
The address to listen for connections on
Examples
"0.0.0.0:443"
"localhost:443"

record_compression

common optional string literal enum

The compression of records within the Firehose message.

Some services, like AWS CloudWatch Logs, will compress the events with gzip, before sending them AWS Kinesis Firehose. This option can be used to automatically decompress them before forwarding them to the next component.

Note that this is different from Content encoding option of the Firehose HTTP endpoint destination. That option controls the content encoding of the entire HTTP request.

Enum options string literal
OptionDescription
auto

Vector will try to determine the compression format of the object by looking at its file signature, also known as magic bytes.

Given that determining the encoding using magic bytes is not a perfect check, if the record fails to decompress with the discovered format, the record will be forwarded as-is. Thus, if you know the records will always be gzip encoded (for example if they are coming from AWS CloudWatch Logs) then you should prefer to set gzip here to have Vector reject any records that are not-gziped.

gzipGZIP format.
noneUncompressed.
default: text

tls

optional object
Configures the TLS options for incoming connections.

tls.ca_file

optional string literal
Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an in-line CA certificate in PEM format.
Examples
"/path/to/certificate_authority.crt"

tls.crt_file

optional string literal
Absolute path to a certificate file used to identify this server, in DER or PEM format (X.509) or PKCS#12, or an in-line certificate in PEM format. If this is set, and is not a PKCS#12 archive, key_file must also be set. This is required if enabled is set to true.
Examples
"/path/to/host_certificate.crt"

tls.enabled

optional bool
Require TLS for incoming connections. If this is set, an identity certificate is also required.
default: false

tls.key_file

optional string literal
Absolute path to a private key file used to identify this server, in DER or PEM format (PKCS#8), or an in-line private key in PEM format.
Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal
Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.
Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"
If true, Vector will require a TLS certificate from the connecting host and terminate the connection if the certificate is not valid. If false (the default), Vector will not request a certificate from the client.
default: false

Output

Logs

Line

One event will be published per incoming AWS Kinesis Firehose record.
Fields
message required string literal
The raw record from the incoming payload.
Examples
Started GET / for 127.0.0.1 at 2012-03-10 14:28:14 +0100
request_id required string literal
The AWS Kinesis Firehose request ID, value of the X-Amz-Firehose-Request-Id header.
Examples
ed1d787c-b9e2-4631-92dc-8e7c9d26d804
source_arn required string literal
The AWS Kinises Firehose delivery stream that issued the request, value of the X-Amz-Firehose-Source-Arn header.
Examples
arn:aws:firehose:us-east-1:111111111111:deliverystream/test
timestamp required timestamp
The exact time the event was ingested into Vector.
Examples
2020-10-10T17:07:36.452332Z

Telemetry

Metrics

link

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

events_in_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins. This metric is deprecated and will be removed in a future version. Use component_received_events_total instead.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

events_out_total

counter
The total number of events emitted by this component. This metric is deprecated and will be removed in a future version. Use component_sent_events_total instead.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

processed_bytes_total

counter
The number of bytes processed by the component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the bytes originate.
file optional
The file from which the bytes originate.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the bytes originate.
peer_path optional
The pathname from which the bytes originate.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the bytes originate.
uri optional
The sanitized URI from which the bytes originate.

request_automatic_decode_errors_total

counter
The total number of request errors for this component when it attempted to automatically discover and handle the content-encoding of incoming request data.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

request_read_errors_total

counter
The total number of request read errors for this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

requests_received_total

counter
The total number of requests received by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

Examples

AWS CloudWatch Subscription message

Given this event...
{
  "requestId": "ed1d787c-b9e2-4631-92dc-8e7c9d26d804",
  "timestamp": 1600110760138,
  "records": [
	{
	  "data": "H4sIABk1bV8AA52TzW7bMBCE734KQ2db/JdI3QzETS8FAtg91UGgyOuEqCQq5Mqua+TdS8lu0hYNUpQHAdoZDcn9tKfJdJo0EEL5AOtjB0kxTa4W68Xdp+VqtbheJrPB4A4t+EFiv6yzVLuHa+/6blARAr5UV+ihbH4vh/4+VN52aF37wdYIPkTDlyhF8SrabFsOWhIrtz+Dlnto8dV3Gp9RstshXKhMi0xpqk3GpNJccpFRKYw0WvCM5kIbzrVWipm4VK55rrSk44HGHLTx/lg2wxVYRiljVGWGCvPiuPRn2O60Se6P8UKbpOBZrulsk2xLhCEjljYJk2QFHeGU04KxQqpCsumcSko3SfQ+uoBnn8pTJmjKWZYyI0axAXx021G++bweS5136CpXj8WP6/UNYek5ycMOPPhReETsQkHI4XBIO2/bynZlXXkXwryrS9w536TWkab0XwED6e/tU2/R9eGS9NTD5VgEvnWwtQikcu0e/AO0FYyu4HpfwR3Gf2R0Btza9qxgiUNUISiLr30AP7fbyMzu7OWA803ynIzdfJ69B1EZpoVhsWMRZ8a5UVJoRoUyUlDNspxzZWiEnOXiXYiSvQOR5TnN/xsiNalmKZcy5Yr/yfB6+RZD/gbDC0IbOx8wQrMhxGGYx4lBW5X1wJBLkpO981jWf6EXogvIrm+rYYrKOn4Hgbg4b439/s8cFeVvcNwBtHBkOdWvQIdRnTxPfgCXvyEgSQQAAA=="
	}
  ]
}
...and this configuration...
[sources.my_source_id]
type = "aws_kinesis_firehose"
address = "0.0.0.0:443"
---
sources:
  my_source_id:
    type: aws_kinesis_firehose
    address: 0.0.0.0:443
{
  "sources": {
    "my_source_id": {
      "type": "aws_kinesis_firehose",
      "address": "0.0.0.0:443"
    }
  }
}
...this Vector event is produced:
[{"log":{"message":"{\"messageType\":\"DATA_MESSAGE\",\"owner\":\"111111111111\",\"logGroup\":\"test\",\"logStream\":\"test\",\"subscriptionFilters\":[\"Destination\"],\"logEvents\":[{\"id\":\"35683658089614582423604394983260738922885519999578275840\",\"timestamp\":1600110569039,\"message\":\"{\\\"bytes\\\":26780,\\\"datetime\\\":\\\"14/Sep/2020:11:45:41 -0400\\\",\\\"host\\\":\\\"157.130.216.193\\\",\\\"method\\\":\\\"PUT\\\",\\\"protocol\\\":\\\"HTTP/1.0\\\",\\\"referer\\\":\\\"https://www.principalcross-platform.io/markets/ubiquitous\\\",\\\"request\\\":\\\"/expedite/convergence\\\",\\\"source_type\\\":\\\"stdin\\\",\\\"status\\\":301,\\\"user-identifier\\\":\\\"-\\\"}\"},{\"id\":\"35683658089659183914001456229543810359430816722590236673\",\"timestamp\":1600110569041,\"message\":\"{\\\"bytes\\\":17707,\\\"datetime\\\":\\\"14/Sep/2020:11:45:41 -0400\\\",\\\"host\\\":\\\"109.81.244.252\\\",\\\"method\\\":\\\"GET\\\",\\\"protocol\\\":\\\"HTTP/2.0\\\",\\\"referer\\\":\\\"http://www.investormission-critical.io/24/7/vortals\\\",\\\"request\\\":\\\"/scale/functionalities/optimize\\\",\\\"source_type\\\":\\\"stdin\\\",\\\"status\\\":502,\\\"user-identifier\\\":\\\"feeney1708\\\"}\"}]}","request_id":"ed1d787c-b9e2-4631-92dc-8e7c9d26d804","source_arn":"arn:aws:firehose:us-east-1:111111111111:deliverystream/test","timestamp":"2020-09-14T19:12:40.138Z"}}]

How it works

Context

By default, the aws_kinesis_firehose source augments events with helpful context keys.

State

This component is stateless, meaning its behavior is consistent across each input.

Forwarding CloudWatch Log events

This source is the recommended way to ingest logs from AWS CloudWatch logs via AWS CloudWatch Log subscriptions. To set this up:

  1. Deploy vector with a publicly exposed HTTP endpoint using this source. You will likely also want to use the aws_cloudwatch_logs_subscription_parser transform to extract the log events. Make sure to set the access_key to secure this endpoint. Your configuration might look something like:

     [sources.firehose]
     # General
     type = "aws_kinesis_firehose"
     address = "127.0.0.1:9000"
     access_key = "secret"
    
     [transforms.cloudwatch]
     type = "aws_cloudwatch_logs_subscription_parser"
     inputs = ["firehose"]
    
     [sinks.console]
     type = "console"
     inputs = ["cloudwatch"]
     encoding.codec = "json"
    
  2. Create a Kinesis Firewatch delivery stream in the region where the CloudWatch Logs groups exist that you want to ingest.

  3. Set the stream to forward to your Vector instance via its HTTP Endpoint destination. Make sure to configure the same access_key you set earlier.

  4. Setup a CloudWatch Logs subscription to forward the events to your delivery stream

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols. You can adjust TLS behavior via the tls.* options.