Elasticsearch
Index observability events in Elasticsearch
Requirements
create
bulk.action
.
This is not enabled by default.Configuration
Example configurations
{
"sinks": {
"my_sink_id": {
"type": "elasticsearch",
"inputs": [
"my-source-or-transform-id"
]
}
}
}
[sinks.my_sink_id]
type = "elasticsearch"
inputs = [ "my-source-or-transform-id" ]
sinks:
my_sink_id:
type: elasticsearch
inputs:
- my-source-or-transform-id
{
"sinks": {
"my_sink_id": {
"type": "elasticsearch",
"inputs": [
"my-source-or-transform-id"
],
"api_version": "auto",
"compression": "none",
"doc_type": "_doc",
"endpoints": [
"http://10.24.32.122:9000"
],
"id_key": "id",
"mode": "bulk",
"opensearch_service_type": "managed",
"pipeline": "pipeline-name",
"query": {
"X-Powered-By": "Vector"
}
}
}
}
[sinks.my_sink_id]
type = "elasticsearch"
inputs = [ "my-source-or-transform-id" ]
api_version = "auto"
compression = "none"
doc_type = "_doc"
endpoints = [ "http://10.24.32.122:9000" ]
id_key = "id"
mode = "bulk"
opensearch_service_type = "managed"
pipeline = "pipeline-name"
[sinks.my_sink_id.query]
X-Powered-By = "Vector"
sinks:
my_sink_id:
type: elasticsearch
inputs:
- my-source-or-transform-id
api_version: auto
compression: none
doc_type: _doc
endpoints:
- http://10.24.32.122:9000
id_key: id
mode: bulk
opensearch_service_type: managed
pipeline: pipeline-name
query:
X-Powered-By: Vector
acknowledgements
optional objectControls how acknowledgements are handled for this sink.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional boolWhether or not end-to-end acknowledgements are enabled.
When enabled for a sink, any source connected to that sink, where the source supports end-to-end acknowledgements as well, waits for events to be acknowledged by all connected sinks before acknowledging them at the source.
Enabling or disabling acknowledgements at the sink level takes precedence over any global
acknowledgements
configuration.
api_version
optional string literal enumThe API version of Elasticsearch.
Amazon OpenSearch Serverless requires this option to be set to auto
(the default).
Option | Description |
---|---|
auto | Auto-detect the API version. If the cluster state version endpoint isn’t reachable, a warning is logged to
stdout, and the version is assumed to be V6 if the |
v6 | Use the Elasticsearch 6.x API. |
v7 | Use the Elasticsearch 7.x API. |
v8 | Use the Elasticsearch 8.x API. |
auto
auth
optional objectauth.assume_role
required string literalstrategy = "aws"
auth.credentials_file
required string literalstrategy = "aws"
auth.external_id
optional string literalstrategy = "aws"
auth.imds
optional objectstrategy = "aws"
auth.imds.max_attempts
optional uint4
auth.load_timeout_secs
optional uintTimeout for successfully loading any credentials, in seconds.
Relevant when the default credentials chain or assume_role
is used.
strategy = "aws"
auth.profile
optional string literalThe credentials profile to use.
Used to select AWS credentials from a provided credentials file.
strategy = "aws"
default
auth.region
optional string literalThe AWS region to send STS requests to.
If not set, this defaults to the configured region for the service itself.
strategy = "aws"
auth.secret_access_key
required string literalstrategy = "aws"
auth.strategy
required string literal enumThe authentication strategy to use.
Amazon OpenSearch Serverless requires this option to be set to aws
.
Option | Description |
---|---|
aws | Amazon OpenSearch Service-specific authentication. |
basic | HTTP Basic Authentication. |
aws
optional objectaws.endpoint
optional string literalbatch
optional objectbatch.max_bytes
optional uintThe maximum size of a batch that is processed by a sink.
This is based on the uncompressed size of the batched events, before they are serialized/compressed.
1e+07
(bytes)batch.max_events
optional uintbatch.timeout_secs
optional float1
(seconds)buffer
optional objectConfigures the buffering behavior for this sink.
More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.
buffer.max_events
optional uinttype = "memory"
500
buffer.max_size
required uintThe maximum size of the buffer on disk.
Must be at least ~256 megabytes (268435488 bytes).
type = "disk"
buffer.type
optional string literal enumOption | Description |
---|---|
disk | Events are buffered on disk. This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes. Data is synchronized to disk every 500ms. |
memory | Events are buffered in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes. |
memory
buffer.when_full
optional string literal enumOption | Description |
---|---|
block | Wait for free space in the buffer. This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge. |
drop_newest | Drops the event instead of waiting for free space in buffer. The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events. |
block
bulk
optional objectbulk.action
optional string templateAction to use when making requests to the Elasticsearch Bulk API.
Only index
, create
and update
actions are supported.
index
bulk.index
optional string templatevector-%Y.%m.%d
bulk.template_fallback_index
optional string literalbulk.index
cannot be resolvedbulk.version
optional string templatebulk.version_type
optional string literal enumVersion type.
Possible values are internal
, external
or external_gt
and external_gte
.
Option | Description |
---|---|
external | The external or external_gt type. |
external_gte | The external_gte type. |
internal | The internal type. |
internal
compression
optional string literal enumCompression configuration.
All compression algorithms use the default compression level unless otherwise specified.
none
data_stream
optional objectdata_stream.auto_routing
optional boolAutomatically routes events by deriving the data stream name using specific event fields.
The format of the data stream name is <type>-<dataset>-<namespace>
, where each value comes
from the data_stream
configuration field of the same name.
If enabled, the value of the data_stream.type
, data_stream.dataset
, and
data_stream.namespace
event fields are used if they are present. Otherwise, the values
set in this configuration are used.
true
data_stream.dataset
optional string templategeneric
data_stream.namespace
optional string templatedefault
data_stream.sync_fields
optional boolAutomatically adds and syncs the data_stream.*
event fields if they are missing from the event.
This ensures that fields match the name of the data stream that is receiving events.
true
data_stream.type
optional string templatelogs
distribution
optional objectdistribution.retry_initial_backoff_secs
optional uint1
(seconds)distribution.retry_max_duration_secs
optional uint3600
(seconds)doc_type
optional string literalThe doc_type
for your index data.
This is only relevant for Elasticsearch <= 6.X. If you are using >= 7.0 you do not need to set this option since Elasticsearch has removed it.
_doc
encoding
optional objectencoding.except_fields
optional [string]encoding.only_fields
optional [string]encoding.timestamp_format
optional string literal enumOption | Description |
---|---|
rfc3339 | Represent the timestamp as a RFC 3339 timestamp. |
unix | Represent the timestamp as a Unix timestamp. |
unix_float | Represent the timestamp as a Unix timestamp in floating point. |
unix_ms | Represent the timestamp as a Unix timestamp in milliseconds. |
unix_ns | Represent the timestamp as a Unix timestamp in nanoseconds. |
unix_us | Represent the timestamp as a Unix timestamp in microseconds |
endpoint
optional string literalThe Elasticsearch endpoint to send logs to.
The endpoint must contain an HTTP scheme, and may specify a hostname or IP address and port.
endpoints
optional [string]A list of Elasticsearch endpoints to send logs to.
The endpoint must contain an HTTP scheme, and may specify a hostname or IP address and port.
healthcheck
optional objecthealthcheck.enabled
optional booltrue
id_key
optional string literalThe name of the event key that should map to Elasticsearch’s _id
field.
By default, the _id
field is not set, which allows Elasticsearch to set this
automatically. Setting your own Elasticsearch IDs can hinder performance.
inputs
required [string]A list of upstream source or transform IDs.
Wildcards (*
) are supported.
See configuration for more info.
metrics
optional objectmetric_to_log
transform.metrics.host_tag
optional string literalName of the tag in the metric to use for the source host.
If present, the value of the tag is set on the generated log event in the host
field,
where the field key uses the global host_key
option.
metrics.metric_tag_values
optional string literal enumControls how metric tag values are encoded.
When set to single
, only the last non-bare value of tags are displayed with the
metric. When set to full
, all metric tags are exposed as separate assignments as
described by the native_json
codec.
Option | Description |
---|---|
full | All tags are exposed as arrays of either string or null values. |
single | Tag values are exposed as single strings, the same as they were before this config option. Tags with multiple values show the last assigned value, and null values are ignored. |
single
metrics.timezone
optional string literalThe name of the time zone to apply to timestamp conversions that do not contain an explicit time zone.
This overrides the global timezone
option. The time zone name may be
any name in the TZ database or local
to indicate system local time.
mode
optional string literal enumOption | Description |
---|---|
bulk | Ingests documents in bulk, using the bulk API index action. |
data_stream | Ingests documents in bulk, using the bulk API Elasticsearch Data Streams only support the |
bulk
opensearch_service_type
optional string literal enumOption | Description |
---|---|
managed | Elasticsearch or OpenSearch Managed domain |
serverless | OpenSearch Serverless collection |
managed
proxy
optional objectProxy configuration.
Configure to proxy traffic through an HTTP(S) proxy when making external requests.
Similar to common proxy configuration convention, you can set different proxies to use based on the type of traffic being proxied. You can also set specific hosts that should not be proxied.
proxy.http
optional string literalProxy endpoint to use when proxying HTTP traffic.
Must be a valid URI string.
proxy.https
optional string literalProxy endpoint to use when proxying HTTPS traffic.
Must be a valid URI string.
proxy.no_proxy
optional [string]A list of hosts to avoid proxying.
Multiple patterns are allowed:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.com matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0/16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
query
optional objectquery.*
required string literalrequest
optional objectrequest.adaptive_concurrency
optional objectConfiguration of adaptive concurrency parameters.
These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or unstable performance and sink behavior. Proceed with caution.
request.adaptive_concurrency.decrease_ratio
optional floatThe fraction of the current value to set the new concurrency limit when decreasing the limit.
Valid values are greater than 0
and less than 1
. Smaller values cause the algorithm to scale back rapidly
when latency increases.
Note that the new limit is rounded down after applying this ratio.
0.9
request.adaptive_concurrency.ewma_alpha
optional floatThe weighting of new measurements compared to older measurements.
Valid values are greater than 0
and less than 1
.
ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has unusually high response variability.
0.4
request.adaptive_concurrency.initial_concurrency
optional uintThe initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency).
It is recommended to set this value to your service’s average limit if you’re seeing that it takes a
long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
adaptive_concurrency_limit
metric.
1
request.adaptive_concurrency.max_concurrency_limit
optional uintThe maximum concurrency limit.
The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard.
200
request.adaptive_concurrency.rtt_deviation_scale
optional floatScale of RTT deviations which are not considered anomalous.
Valid values are greater than or equal to 0
, and we expect reasonable values to range from 1.0
to 3.0
.
When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable those values are. We use that deviation when comparing the past RTT average to the current measurements, so we can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT.
2.5
request.concurrency
optional string literal enum uintConfiguration for outbound request concurrency.
This can be set either to one of the below enum values or to a positive integer, which denotes a fixed concurrency limit.
Option | Description |
---|---|
adaptive | Concurrency will be managed by Vector’s Adaptive Request Concurrency feature. |
none | A fixed concurrency of 1. Only one request can be outstanding at any given time. |
adaptive
request.headers
optional objectrequest.headers.*
required string literalrequest.rate_limit_duration_secs
optional uintrate_limit_num
option.1
(seconds)request.rate_limit_num
optional uintrate_limit_duration_secs
time window.9.223372036854776e+18
(requests)request.retry_attempts
optional uint9.223372036854776e+18
(retries)request.retry_initial_backoff_secs
optional uintThe amount of time to wait before attempting the first retry for a failed request.
After the first retry has failed, the fibonacci sequence is used to select future backoffs.
1
(seconds)request.retry_jitter_mode
optional string literal enumOption | Description |
---|---|
Full | Full jitter. The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff strategy. Incorporating full jitter into your backoff strategy can greatly reduce the likelihood of creating accidental denial of service (DoS) conditions against your own systems when many clients are recovering from a failure state. |
None | No jitter. |
Full
request.retry_max_duration_secs
optional uint30
(seconds)request.timeout_secs
optional uintThe time a request can take before being aborted.
Datadog highly recommends that you do not lower this value below the service’s internal timeout, as this could create orphaned requests, pile on retries, and result in duplicate data downstream.
60
(seconds)request_retry_partial
optional boolWhether or not to retry successful requests containing partial failures.
To avoid duplicates in Elasticsearch, please use option id_key
.
false
suppress_type_name
optional boolWhether or not to send the type
field to Elasticsearch.
The type
field was deprecated in Elasticsearch 7.x and removed in Elasticsearch 8.x.
If enabled, the doc_type
option is ignored.
false
tls
optional objecttls.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.
tls.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls.crt_file
optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls.key_file
optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
tls.server_name
optional string literalServer name to use when using Server Name Indication (SNI).
Only relevant for outgoing connections.
tls.verify_certificate
optional boolEnables certificate verification. For components that create a server, this requires that the client connections have a valid client certificate. For components that initiate requests, this validates that the upstream has a valid certificate.
If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
tls.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
Telemetry
Metrics
linkbuffer_byte_size
gaugebuffer_discarded_events_total
counterbuffer_events
gaugebuffer_received_event_bytes_total
counterbuffer_received_events_total
counterbuffer_sent_event_bytes_total
counterbuffer_sent_events_total
countercomponent_discarded_events_total
counterfilter
transform, or false if due to an error.component_errors_total
countercomponent_received_event_bytes_total
countercomponent_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_received_events_total
countercomponent_sent_bytes_total
countercomponent_sent_event_bytes_total
countercomponent_sent_events_total
counterutilization
gaugeHow it works
AWS authentication
Vector checks for AWS credentials in the following order:
- The
auth.access_key_id
andauth.secret_access_key
options. - The
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
environment variables. - In Web Identity Token credentials from the environment or container (including EKS). These credentials will automatically refresh when expired.
- ECS credentials (IAM roles for tasks). These credentials will automatically refresh when expired.
- As entries in the credentials file in the
.aws
directory in your home directory (~/.aws/credentials
on Linux, OS X, and Unix;%userprofile%\.aws\credentials
on Microsoft Windows). - Using a named profile specified in the credentials file via the AWS_PROFILE environment variable.
- The IAM instance profile (only works if running on an EC2 instance with an instance profile/role). Requires IMDSv2 to be enabled. For EKS, you may need to increase the metadata token response hop limit to 2. These credentials will automatically refresh when expired.
If no credentials are found, Vector’s health check fails and an error is logged.
Obtaining an access key
auth.access_key_id
and auth.secret_access_key
options.Assuming roles
auth.assume_role
option. This is an
optional setting that is helpful for a variety of use cases, such as cross
account access.Buffers and batches
This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
timeout_secs
. - The batch size meets or exceeds the configured
max_bytes
ormax_events
.
Buffers are controlled via the buffer.*
options.
Conflicts
_bulk
API endpoint. By default, all events are
inserted via the index
action, which replaces documents if an existing
one has the same id
. If bulk.action
is configured with create
, Elasticsearch
does not replace an existing document and instead returns a conflict error.
When bulk.action
is set to update
, the document is updated with several constraints.
The message must be added in .doc
and have .doc_as_upsert
to true.
The update
operation requires the id_key
to be set, and the encoding
field should specify doc
and doc_as_upsert
as values.Data streams
index
action with Elasticsearch’s Bulk API.
To use Data streams, set the mode
to
data_stream
. Use the combination of data_stream.type
, data_stream.dataset
and
data_stream.namespace
instead of index
.Distribution
If multiple endpoints are specified in endpoints
option, events will be distributed among them
according to their estimated load with failover.
Rate limit is applied to the sink as a whole, while concurrency settings manage each endpoint individually.
Health of endpoints is actively monitored and if an endpoint is deemed unhealthy, Vector will stop sending events to it until it is healthy again. This is managed by a circuit breaker that monitors responses and triggers after a sufficient streak of failures. Once triggered it will enter exponential backoff loop and pass a single request in each iteration to test the endpoint. Once a successful response is received, the circuit breaker will reset.
Health checks
Require health checks
If you’d like to exit immediately upon a health check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.yaml --require-healthy
Disable health checks
healthcheck
option to
false
.Partial Failures
By default, Elasticsearch allows partial bulk ingestion failures. This is typically
due to Elasticsearch index mapping errors, where data keys aren’t consistently
typed. To change this behavior, refer to the Elasticsearch ignore_malformed
setting.
By default, partial failures are not retried. To enable retries, set request_retry_partial
. Once enabled it will
retry whole partially failed requests. As such it is advised to use id_key
to avoid duplicates.
Rate limits & adaptive concurrency
Adaptive Request Concurrency (ARC)
Adaptive Request Concurrency is a feature of Vector that does away with static concurrency limits and automatically optimizes HTTP concurrency based on downstream service responses. The underlying mechanism is a feedback loop inspired by TCP congestion control algorithms. Checkout the announcement blog post,
We highly recommend enabling this feature as it improves performance and reliability of Vector and the systems it communicates with. As such, we have made it the default, and no further configuration is required.
Static concurrency
If Adaptive Request Concurrency is not for you, you can manually set static concurrency
limits by specifying an integer for request.concurrency
:
sinks:
my-sink:
request:
concurrency: 10
Rate limits
In addition to limiting request concurrency, you can also limit the overall request
throughput via the request.rate_limit_duration_secs
and request.rate_limit_num
options.
sinks:
my-sink:
request:
rate_limit_duration_secs: 1
rate_limit_num: 10
These will apply to both adaptive
and fixed request.concurrency
values.
Retry policy
request.retry_attempts
and
request.retry_backoff_secs
options.Transport Layer Security (TLS)
tls.*
options and/or via an
OpenSSL configuration file. The file location defaults to
/usr/local/ssl/openssl.cnf
or can be specified with the OPENSSL_CONF
environment variable.