AMQP
Send events to AMQP 0.9.1 compatible brokers like RabbitMQ
Configuration
Example configurations
{
"sinks": {
"my_sink_id": {
"type": "amqp",
"inputs": [
"my-source-or-transform-id"
],
"connection_string": "amqp://user:password@127.0.0.1:5672/%2f?timeout=10"
}
}
}
[sinks.my_sink_id]
type = "amqp"
inputs = [ "my-source-or-transform-id" ]
connection_string = "amqp://user:password@127.0.0.1:5672/%2f?timeout=10"
sinks:
my_sink_id:
type: amqp
inputs:
- my-source-or-transform-id
connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
{
"sinks": {
"my_sink_id": {
"type": "amqp",
"inputs": [
"my-source-or-transform-id"
],
"connection_string": "amqp://user:password@127.0.0.1:5672/%2f?timeout=10"
}
}
}
[sinks.my_sink_id]
type = "amqp"
inputs = [ "my-source-or-transform-id" ]
connection_string = "amqp://user:password@127.0.0.1:5672/%2f?timeout=10"
sinks:
my_sink_id:
type: amqp
inputs:
- my-source-or-transform-id
connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
acknowledgements
optional objectControls how acknowledgements are handled for this sink.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional boolWhether or not end-to-end acknowledgements are enabled.
When enabled for a sink, any source connected to that sink, where the source supports end-to-end acknowledgements as well, waits for events to be acknowledged by all connected sinks before acknowledging them at the source.
Enabling or disabling acknowledgements at the sink level takes precedence over any global
acknowledgements
configuration.
buffer
optional objectConfigures the buffering behavior for this sink.
More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.
buffer.max_events
optional uinttype = "memory"
500
buffer.max_size
required uintThe maximum size of the buffer on disk.
Must be at least ~256 megabytes (268435488 bytes).
type = "disk"
buffer.type
optional string literal enumOption | Description |
---|---|
disk | Events are buffered on disk. This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes. Data is synchronized to disk every 500ms. |
memory | Events are buffered in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes. |
memory
buffer.when_full
optional string literal enumOption | Description |
---|---|
block | Wait for free space in the buffer. This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge. |
drop_newest | Drops the event instead of waiting for free space in buffer. The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events. |
block
connection_string
required string literalURI for the AMQP server.
The URI has the format of
amqp://<user>:<password>@<host>:<port>/<vhost>?timeout=<seconds>
.
The default vhost can be specified by using a value of %2f
.
To connect over TLS, a scheme of amqps
can be specified instead. For example,
amqps://...
. Additional TLS settings, such as client certificate verification, can be
configured under the tls
section.
encoding
required objectencoding.avro
required objectcodec = "avro"
encoding.avro.schema
required string literalencoding.codec
required string literal enumOption | Description |
---|---|
avro | Encodes an event as an Apache Avro message. |
csv | Encodes an event as a CSV message. This codec must be configured with fields to encode. |
gelf | Encodes an event as a GELF message. This codec is experimental for the following reason: The GELF specification is more strict than the actual Graylog receiver.
Vector’s encoder currently adheres more strictly to the GELF spec, with
the exception that some characters such as Other GELF codecs such as Loki’s, use a Go SDK that is maintained by Graylog, and is much more relaxed than the GELF spec. Going forward, Vector will use that Go SDK as the reference implementation, which means the codec may continue to relax the enforcement of specification. |
json | Encodes an event as JSON. |
logfmt | Encodes an event as a logfmt message. |
native | Encodes an event in the native Protocol Buffers format. This codec is experimental. |
native_json | Encodes an event in the native JSON format. This codec is experimental. |
protobuf | Encodes an event as a Protobuf message. |
raw_message | No encoding. This encoding uses the Be careful if you are modifying your log events (for example, by using a |
text | Plain text encoding. This encoding uses the Be careful if you are modifying your log events (for example, by using a |
encoding.csv
required objectcodec = "csv"
encoding.csv.capacity
optional uint8192
encoding.csv.double_quote
optional boolEnable double quote escapes.
This is enabled by default, but it may be disabled. When disabled, quotes in field data are escaped instead of doubled.
true
encoding.csv.escape
optional ascii_charThe escape character to use when writing CSV.
In some variants of CSV, quotes are escaped using a special escape character like \ (instead of escaping quotes by doubling them).
To use this, double_quotes
needs to be disabled as well otherwise it is ignored.
"
encoding.csv.fields
required [string]Configures the fields that will be encoded, as well as the order in which they appear in the output.
If a field is not present in the event, the output will be an empty string.
Values of type Array
, Object
, and Regex
are not supported and the
output will be an empty string.
encoding.csv.quote_style
optional string literal enumOption | Description |
---|---|
always | Always puts quotes around every field. |
necessary | Puts quotes around fields only when necessary. They are necessary when fields contain a quote, delimiter, or record terminator. Quotes are also necessary when writing an empty record (which is indistinguishable from a record with one empty field). |
never | Never writes quotes, even if it produces invalid CSV data. |
non_numeric | Puts quotes around all fields that are non-numeric. Namely, when writing a field that does not parse as a valid float or integer, then quotes are used even if they aren’t strictly necessary. |
necessary
encoding.except_fields
optional [string]encoding.metric_tag_values
optional string literal enumControls how metric tag values are encoded.
When set to single
, only the last non-bare value of tags are displayed with the
metric. When set to full
, all metric tags are exposed as separate assignments.
codec = "json" or codec = "text"
Option | Description |
---|---|
full | All tags are exposed as arrays of either string or null values. |
single | Tag values are exposed as single strings, the same as they were before this config option. Tags with multiple values show the last assigned value, and null values are ignored. |
single
encoding.only_fields
optional [string]encoding.protobuf
required objectcodec = "protobuf"
encoding.protobuf.desc_file
required string literalThe path to the protobuf descriptor set file.
This file is the output of protoc -o <path> ...
encoding.protobuf.message_type
required string literalencoding.timestamp_format
optional string literal enumOption | Description |
---|---|
rfc3339 | Represent the timestamp as a RFC 3339 timestamp. |
unix | Represent the timestamp as a Unix timestamp. |
unix_float | Represent the timestamp as a Unix timestamp in floating point. |
unix_ms | Represent the timestamp as a Unix timestamp in milliseconds. |
unix_ns | Represent the timestamp as a Unix timestamp in nanoseconds. |
unix_us | Represent the timestamp as a Unix timestamp in microseconds |
exchange
required string templatehealthcheck
optional objecthealthcheck.enabled
optional booltrue
inputs
required [string]A list of upstream source or transform IDs.
Wildcards (*
) are supported.
See configuration for more info.
properties
optional objectConfigure the AMQP message properties.
AMQP message properties.
properties.content_encoding
optional string literalproperties.content_type
optional string literalproperties.expiration_ms
optional uintrouting_key
optional string templatetls
optional objecttls.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.
tls.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls.crt_file
optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls.key_file
optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
tls.server_name
optional string literalServer name to use when using Server Name Indication (SNI).
Only relevant for outgoing connections.
tls.verify_certificate
optional boolEnables certificate verification. For components that create a server, this requires that the client connections have a valid client certificate. For components that initiate requests, this validates that the upstream has a valid certificate.
If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
tls.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
Telemetry
Metrics
linkbuffer_byte_size
gaugebuffer_discarded_events_total
counterbuffer_events
gaugebuffer_received_event_bytes_total
counterbuffer_received_events_total
counterbuffer_sent_event_bytes_total
counterbuffer_sent_events_total
countercomponent_discarded_events_total
counterfilter
transform, or false if due to an error.component_errors_total
countercomponent_received_event_bytes_total
countercomponent_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_received_events_total
countercomponent_sent_bytes_total
countercomponent_sent_event_bytes_total
countercomponent_sent_events_total
counterutilization
gaugeHow it works
Health checks
Require health checks
If you’d like to exit immediately upon a health check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.yaml --require-healthy
Disable health checks
healthcheck
option to
false
.Lapin
amqp
source and sink uses lapin
under the hood. This
is a reliable pure rust library that facilitates communication with AMQP servers
such as RabbitMQ.Transport Layer Security (TLS)
tls.*
options and/or via an
OpenSSL configuration file. The file location defaults to
/usr/local/ssl/openssl.cnf
or can be specified with the OPENSSL_CONF
environment variable.