AWS S3
Collect logs from AWS S3
Requirements
Configuration
Example configurations
{
"sources": {
"my_source_id": {
"type": "aws_s3"
}
}
}
[sources.my_source_id]
type = "aws_s3"
sources:
my_source_id:
type: aws_s3
{
"sources": {
"my_source_id": {
"type": "aws_s3",
"compression": "auto",
"endpoint": "http://127.0.0.0:5000/path/to/service",
"region": "us-east-1"
}
}
}
[sources.my_source_id]
type = "aws_s3"
compression = "auto"
endpoint = "http://127.0.0.0:5000/path/to/service"
region = "us-east-1"
sources:
my_source_id:
type: aws_s3
compression: auto
endpoint: http://127.0.0.0:5000/path/to/service
region: us-east-1
acknowledgements
optional objectControls how acknowledgements are handled by this source.
This setting is deprecated in favor of enabling acknowledgements
at the global or sink level.
Enabling or disabling acknowledgements at the source level has no effect on acknowledgement behavior.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional boolauth
optional objectauth.assume_role
required string literalauth.credentials_file
required string literalauth.external_id
optional string literalauth.imds
optional objectauth.imds.max_attempts
optional uint4
auth.load_timeout_secs
optional uintTimeout for successfully loading any credentials, in seconds.
Relevant when the default credentials chain or assume_role
is used.
auth.profile
optional string literalThe credentials profile to use.
Used to select AWS credentials from a provided credentials file.
default
auth.region
optional string literalThe AWS region to send STS requests to.
If not set, this defaults to the configured region for the service itself.
auth.secret_access_key
required string literalcompression
optional string literal enumOption | Description |
---|---|
auto | Automatically attempt to determine the compression scheme. The compression scheme of the object is determined from its It is set to |
gzip | GZIP. |
none | Uncompressed. |
zstd | ZSTD. |
auto
decoding
optional objectdecoding.avro
required objectcodec = "avro"
decoding.avro.schema
required string literalThe Avro schema definition.
Please note that the following [apache_avro::types::Value
] variants are currently not supported:
Date
Decimal
Duration
Fixed
TimeMillis
decoding.avro.strip_schema_id_prefix
required booldecoding.codec
optional string literal enumOption | Description |
---|---|
avro | Decodes the raw bytes as as an Apache Avro message. |
bytes | Uses the raw bytes as-is. |
gelf | Decodes the raw bytes as a GELF message. This codec is experimental for the following reason: The GELF specification is more strict than the actual Graylog receiver.
Vector’s decoder currently adheres more strictly to the GELF spec, with
the exception that some characters such as Other GELF codecs such as Loki’s, use a Go SDK that is maintained by Graylog, and is much more relaxed than the GELF spec. Going forward, Vector will use that Go SDK as the reference implementation, which means the codec may continue to relax the enforcement of specification. |
influxdb | Decodes the raw bytes as an Influxdb Line Protocol message. |
json | Decodes the raw bytes as JSON. |
native | Decodes the raw bytes as native Protocol Buffers format. This codec is experimental. |
native_json | Decodes the raw bytes as native JSON format. This codec is experimental. |
protobuf | Decodes the raw bytes as protobuf. |
syslog | Decodes the raw bytes as a Syslog message. Decodes either as the RFC 3164-style format (“old” style) or the RFC 5424-style format (“new” style, includes structured data). |
vrl | Decodes the raw bytes as a string and passes them as input to a VRL program. |
bytes
decoding.gelf
optional objectcodec = "gelf"
decoding.gelf.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.influxdb
optional objectcodec = "influxdb"
decoding.influxdb.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.json
optional objectcodec = "json"
decoding.json.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.native_json
optional objectcodec = "native_json"
decoding.native_json.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.protobuf
optional objectcodec = "protobuf"
decoding.protobuf.desc_file
optional string literaldecoding.protobuf.message_type
optional string literaldecoding.syslog
optional objectcodec = "syslog"
decoding.syslog.lossy
optional boolDetermines whether or not to replace invalid UTF-8 sequences instead of failing.
When true, invalid UTF-8 sequences are replaced with the U+FFFD REPLACEMENT CHARACTER
.
true
decoding.vrl
required objectcodec = "vrl"
decoding.vrl.source
required string literal.
target will be used as the decoding result.
Compilation error or use of ‘abort’ in a program will result in a decoding error.decoding.vrl.timezone
optional string literalThe name of the timezone to apply to timestamp conversions that do not contain an explicit
time zone. The time zone name may be any name in the TZ database, or local
to indicate system local time.
If not set, local
will be used.
endpoint
optional string literalframing
optional objectFraming configuration.
Framing handles how events are separated when encoded in a raw byte form, where each event is a frame that must be prefixed, or delimited, in a way that marks where an event begins and ends within the byte stream.
framing.character_delimited
required objectmethod = "character_delimited"
framing.character_delimited.delimiter
required ascii_charframing.character_delimited.max_length
optional uintThe maximum length of the byte buffer.
This length does not include the trailing delimiter.
By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.
If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.
framing.chunked_gelf
optional objectmethod = "chunked_gelf"
framing.chunked_gelf.decompression
optional string literal enumOption | Description |
---|---|
Auto | Automatically detect the decompression method based on the magic bytes of the message. |
Gzip | Use Gzip decompression. |
None | Do not decompress the message. |
Zlib | Use Zlib decompression. |
Auto
framing.chunked_gelf.max_length
optional uintThe maximum length of a single GELF message, in bytes. Messages longer than this length will be dropped. If this option is not set, the decoder does not limit the length of messages and the per-message memory is unbounded.
Note that a message can be composed of multiple chunks and this limit is applied to the whole message, not to individual chunks.
This limit takes only into account the message’s payload and the GELF header bytes are excluded from the calculation. The message’s payload is the concatenation of all the chunks' payloads.
framing.chunked_gelf.pending_messages_limit
optional uintframing.chunked_gelf.timeout_secs
optional float5
framing.length_delimited
required objectmethod = "length_delimited"
framing.length_delimited.length_field_is_big_endian
optional booltrue
framing.length_delimited.length_field_length
optional uint4
framing.length_delimited.length_field_offset
optional uintframing.method
optional string literal enumOption | Description |
---|---|
bytes | Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments). |
character_delimited | Byte frames which are delimited by a chosen character. |
chunked_gelf | Byte frames which are chunked GELF messages. |
length_delimited | Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length. |
newline_delimited | Byte frames which are delimited by a newline character. |
octet_counting | Byte frames according to the octet counting format. |
newline_delimited
framing.newline_delimited
optional objectmethod = "newline_delimited"
framing.newline_delimited.max_length
optional uintThe maximum length of the byte buffer.
This length does not include the trailing delimiter.
By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.
If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.
framing.octet_counting
optional objectmethod = "octet_counting"
framing.octet_counting.max_length
optional uintmultiline
optional objectMultiline aggregation configuration.
If not specified, multiline aggregation is disabled.
multiline.condition_pattern
required string literalRegular expression pattern that is used to determine whether or not more lines should be read.
This setting must be configured in conjunction with mode
.
multiline.mode
required string literal enumAggregation mode.
This setting must be configured in conjunction with condition_pattern
.
Option | Description |
---|---|
continue_past | All consecutive lines matching this pattern, plus one additional line, are included in the group. This is useful in cases where a log message ends with a continuation marker, such as a backslash, indicating that the following line is part of the same message. |
continue_through | All consecutive lines matching this pattern are included in the group. The first line (the line that matched the start pattern) does not need to match the This is useful in cases such as a Java stack trace, where some indicator in the line (such as a leading whitespace) indicates that it is an extension of the proceeding line. |
halt_before | All consecutive lines not matching this pattern are included in the group. This is useful where a log line contains a marker indicating that it begins a new message. |
halt_with | All consecutive lines, up to and including the first line matching this pattern, are included in the group. This is useful where a log line ends with a termination marker, such as a semicolon. |
multiline.start_pattern
required string literalmultiline.timeout_ms
required uintThe maximum amount of time to wait for the next additional line, in milliseconds.
Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete.
proxy
optional objectProxy configuration.
Configure to proxy traffic through an HTTP(S) proxy when making external requests.
Similar to common proxy configuration convention, you can set different proxies to use based on the type of traffic being proxied. You can also set specific hosts that should not be proxied.
proxy.http
optional string literalProxy endpoint to use when proxying HTTP traffic.
Must be a valid URI string.
proxy.https
optional string literalProxy endpoint to use when proxying HTTPS traffic.
Must be a valid URI string.
proxy.no_proxy
optional [string]A list of hosts to avoid proxying.
Multiple patterns are allowed:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.com matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0/16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
sqs
optional objectsqs.client_concurrency
optional uintNumber of concurrent tasks to create for polling the queue for messages.
Defaults to the number of available CPUs on the system.
Should not typically need to be changed, but it can sometimes be beneficial to raise this value when there is a high rate of messages being pushed into the queue and the objects being fetched are small. In these cases, system resources may not be fully utilized without fetching more messages per second, as the SQS message consumption rate affects the S3 object retrieval rate.
sqs.connect_timeout_seconds
optional uintThe connection timeout for AWS requests
Limits the amount of time allowed to initiate a socket connection.
sqs.delete_failed_message
optional boolWhether to delete non-retryable messages.
If a message is rejected by the sink and not retryable, it is deleted from the queue.
true
sqs.delete_message
optional boolWhether to delete the message once it is processed.
It can be useful to set this to false
for debugging or during the initial setup.
true
sqs.max_number_of_messages
optional uintMaximum number of messages to poll from SQS in a batch
Defaults to 10
Should be set to a smaller value when the files are large to help prevent the ingestion of one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
10
sqs.operation_timeout_seconds
optional uintThe operation timeout for AWS requests
Limits the amount of time allowed for an operation to be fully serviced; an
operation represents the full request/response lifecycle of a call to a service.
Take care when configuring this settings to allow enough time for the polling
interval configured in poll_secs
sqs.poll_secs
optional uintHow long to wait while polling the queue for new messages, in seconds.
Generally, this should not be changed unless instructed to do so, as if messages are available,
they are always consumed, regardless of the value of poll_secs
.
15
(seconds)sqs.queue_url
required string literalsqs.read_timeout_seconds
optional uintThe read timeout for AWS requests
Limits the amount of time allowed to read the first byte of a response from the
time the request is initiated. Take care when configuring this settings to allow
enough time for the polling interval configured in poll_secs
sqs.tls_options
optional objectsqs.tls_options.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.
sqs.tls_options.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
sqs.tls_options.crt_file
optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
sqs.tls_options.key_file
optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
sqs.tls_options.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
sqs.tls_options.server_name
optional string literalServer name to use when using Server Name Indication (SNI).
Only relevant for outgoing connections.
sqs.tls_options.verify_certificate
optional boolEnables certificate verification. For components that create a server, this requires that the client connections have a valid client certificate. For components that initiate requests, this validates that the upstream has a valid certificate.
If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
sqs.tls_options.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
sqs.visibility_timeout_secs
optional uintThe visibility timeout to use for messages, in seconds.
This controls how long a message is left unavailable after it is received. If a message is received, and
takes longer than visibility_timeout_secs
to process and delete the message from the queue, it is made available again for another consumer.
This can happen if there is an issue between consuming a message and deleting it.
300
(seconds)tls_options
optional objecttls_options.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.
tls_options.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls_options.crt_file
optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls_options.key_file
optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls_options.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
tls_options.server_name
optional string literalServer name to use when using Server Name Indication (SNI).
Only relevant for outgoing connections.
tls_options.verify_certificate
optional boolEnables certificate verification. For components that create a server, this requires that the client connections have a valid client certificate. For components that initiate requests, this validates that the upstream has a valid certificate.
If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
tls_options.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
Environment variables
AWS_ACCESS_KEY_ID
common optional string literalAWS_CONFIG_FILE
common optional string literal~/.aws/config
AWS_DEFAULT_REGION
common optional string literalAWS_PROFILE
common optional string literaldefault
AWS_ROLE_SESSION_NAME
common optional string literalAWS_SECRET_ACCESS_KEY
common optional string literalAWS_SESSION_TOKEN
common optional string literalAWS_SHARED_CREDENTIALS_FILE
common optional string literal~/.aws/credentials
HTTPS_PROXY
common optional string literalThe global URL to proxy HTTPS requests through.
If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
HTTP_PROXY
common optional string literalThe global URL to proxy HTTP requests through.
If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
NO_PROXY
common optional string literalList of hosts to avoid proxying globally.
Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.come matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
If another no_proxy
value is set in the configuration file or at a component level, this
one is overridden.
The lowercase variant has priority over the uppercase one.
http_proxy
common optional string literalThe global URL to proxy HTTP requests through.
If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
https_proxy
common optional string literalThe global URL to proxy HTTPS requests through.
If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
no_proxy
common optional string literalList of hosts to avoid proxying globally.
Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.come matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
If another no_proxy
value is set in the configuration file or at a component level, this
one is overridden.
The lowercase variant has priority over the uppercase one.
Outputs
<component_id>
Output Data
Logs
Warning
Object
my-bucket
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
AWSLogs/111111111111/vpcflowlogs/us-east-1/2020/10/26/111111111111_vpcflowlogs_us-east-1_fl-0c5605d9f1baf680d_20201026T1950Z_b1ea4a7a.log.gz
us-east-1
aws_s3
2020-10-10T17:07:36.452332Z
Telemetry
Metrics
linkcomponent_discarded_events_total
counterfilter
transform, or false if due to an error.component_errors_total
countercomponent_received_bytes_total
countercomponent_received_event_bytes_total
countercomponent_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_received_events_total
countercomponent_sent_event_bytes_total
countercomponent_sent_events_total
countersource_lag_time_seconds
histogramsqs_message_delete_succeeded_total
countersqs_message_processing_succeeded_total
countersqs_message_receive_succeeded_total
countersqs_message_received_messages_total
countersqs_s3_event_record_ignored_total
counterObjectCreated
).Permissions
Policy | Required for | Required when |
---|---|---|
s3:GetObject |
|
Policy | Required for | Required when |
---|---|---|
sqs:ReceiveMessage |
| |
sqs:DeleteMessage |
| delete_message is set to true |
How it works
AWS authentication
Vector checks for AWS credentials in the following order:
- The
auth.access_key_id
andauth.secret_access_key
options. - The
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environment variables. - In Web Identity Token credentials from the environment or container (including EKS). These credentials will automatically refresh when expired.
- ECS credentials (IAM roles for tasks). These credentials will automatically refresh when expired.
- As entries in the credentials file in the
.aws
directory in your home directory (~/.aws/credentials
on Linux, OS X, and Unix;%userprofile%\.aws\credentials
on Microsoft Windows). - Using a named profile specified in the credentials file via the AWS_PROFILE environment variable.
- The IAM instance profile (only works if running on an EC2 instance with an instance profile/role). Requires IMDSv2 to be enabled. For EKS, you may need to increase the metadata token response hop limit to 2. These credentials will automatically refresh when expired.
If no credentials are found, Vector’s health check fails and an error is logged.
Obtaining an access key
auth.access_key_id
and auth.secret_access_key
options.Assuming roles
auth.assume_role
option. This is an
optional setting that is helpful for a variety of use cases, such as cross
account access.Handling events from the aws_s3
source
This source behaves very similarly to the file
source in that
it will output one event per line (unless the multiline
configuration option is used).
You will commonly want to use transforms to
parse the data. For example, to parse VPC flow logs sent to S3 you can
chain the remap
transform:
[transforms.flow_logs]
type = "remap" # required
inputs = ["s3"]
drop_on_error = false
source = '''
. = parse_aws_vpc_flow_log!(string!(.message))
'''
To parse AWS load balancer logs, the remap
transform can be used:
[transforms.elasticloadbalancing_fields_parsed]
type = "remap" # required
inputs = ["s3"]
drop_on_error = false
source = '''
. = parse_aws_alb_log!(string!(.message))
.request_url_parts = parse_url!(.request_url)
'''
Transport Layer Security (TLS)
tls.*
options and/or via an
OpenSSL configuration file. The file location defaults to
/usr/local/ssl/openssl.cnf
or can be specified with the OPENSSL_CONF
environment variable.