AMQP

Send events to AMQP 0.9.1 compatible brokers like RabbitMQ

status: beta delivery: at-least-once acknowledgements: no egress: dynamic state: stateless

Configuration

Example configurations

{
  "sinks": {
    "my_sink_id": {
      "type": "amqp",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "connection_string": "amqp://user:password@127.0.0.1:5672/%2f?timeout=10",
      "encoding": null,
      "exchange": null
    }
  }
}
[sinks.my_sink_id]
type = "amqp"
inputs = [ "my-source-or-transform-id" ]
connection_string = "amqp://user:password@127.0.0.1:5672/%2f?timeout=10"
---
sinks:
  my_sink_id:
    type: amqp
    inputs:
      - my-source-or-transform-id
    connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
    encoding: null
    exchange: null
{
  "sinks": {
    "my_sink_id": {
      "type": "amqp",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "acknowledgements": null,
      "connection_string": "amqp://user:password@127.0.0.1:5672/%2f?timeout=10",
      "encoding": null,
      "exchange": null,
      "routing_key": null,
      "buffer": null,
      "tls": null,
      "healthcheck": null
    }
  }
}
[sinks.my_sink_id]
type = "amqp"
inputs = [ "my-source-or-transform-id" ]
connection_string = "amqp://user:password@127.0.0.1:5672/%2f?timeout=10"
---
sinks:
  my_sink_id:
    type: amqp
    inputs:
      - my-source-or-transform-id
    acknowledgements: null
    connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
    encoding: null
    exchange: null
    routing_key: null
    buffer: null
    tls: null
    healthcheck: null

acknowledgements

optional object

Controls how acknowledgements are handled for this sink.

See End-to-end Acknowledgements for more information on how event acknowledgement is handled.

Whether or not end-to-end acknowledgements are enabled.

When enabled for a sink, any source connected to that sink, where the source supports end-to-end acknowledgements as well, will wait for events to be acknowledged by the sink before acknowledging them at the source.

Enabling or disabling acknowledgements at the sink level takes precedence over any global acknowledgements configuration.

buffer

optional object

Configures the buffering behavior for this sink.

More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.

buffer.max_events

optional uint
The maximum number of events allowed in the buffer.
Relevant when: type = "memory"
default: 500

buffer.max_size

required uint

The maximum size of the buffer on disk.

Must be at least ~256 megabytes (268435488 bytes).

Relevant when: type = "disk_v1" or type = "disk"

buffer.type

optional string literal enum
The type of buffer to use.
Enum options
OptionDescription
disk

Events are buffered on disk. (version 2)

This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes.

Data is synchronized to disk every 500ms.

disk_v1

Events are buffered on disk. (version 1)

This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes.

memory

Events are buffered in memory.

This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes.

default: memory

buffer.when_full

optional string literal enum
Event handling behavior when a buffer is full.
Enum options
OptionDescription
block

Wait for free space in the buffer.

This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge.

drop_newest

Drops the event instead of waiting for free space in buffer.

The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events.

default: block

connection_string

required string literal

URI for the AMQP server.

The URI has the format of amqp://<user>:<password>@<host>:<port>/<vhost>?timeout=<seconds>.

The default vhost can be specified by using a value of %2f.

In order to connect over TLS, a scheme of amqps can be specified instead i.e. amqps://.... Additional TLS settings, such as client certificate verification, etc, can be configured under the tls section.

Examples
"amqp://user:password@127.0.0.1:5672/%2f?timeout=10"

encoding

required object
Configures how events are encoded into raw bytes.

encoding.avro

required object
Apache Avro-specific encoder options.
Relevant when: codec = "avro"
encoding.avro.schema
required string literal
The Avro schema.
Examples
"{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"

encoding.codec

required string literal enum
The codec to use for encoding events.
Enum options
OptionDescription
avroEncodes an event as an Apache Avro message.
gelfEncodes an event as a GELF message.
jsonEncodes an event as JSON.
logfmtEncodes an event as a logfmt message.
native

Encodes an event in Vector’s native Protocol Buffers format.

This codec is experimental.

native_json

Encodes an event in Vector’s native JSON format.

This codec is experimental.

raw_message

No encoding.

This “encoding” simply uses the message field of a log event.

Users should take care if they’re modifying their log events (such as by using a remap transform, etc) and removing the message field while doing additional parsing on it, as this could lead to the encoding emitting empty strings for the given event.

text

Plain text encoding.

This “encoding” simply uses the message field of a log event. For metrics, it uses an encoding that resembles the Prometheus export format.

Users should take care if they’re modifying their log events (such as by using a remap transform, etc) and removing the message field while doing additional parsing on it, as this could lead to the encoding emitting empty strings for the given event.

Examples
"avro"
"gelf"
"json"
"logfmt"
"native"
"native_json"
"raw_message"
"text"

encoding.except_fields

optional [string]
List of fields that will be excluded from the encoded event.

encoding.metric_tag_values

optional string literal enum

Controls how metric tag values are encoded.

When set to single, only the last non-bare value of tags will be displayed with the metric. When set to full, all metric tags will be exposed as separate assignments.

Relevant when: codec = "json" or codec = "text"
Enum options
OptionDescription
fullAll tags will be exposed as arrays of either string or null values.
singleTag values will be exposed as single strings, the same as they were before this config option. Tags with multiple values will show the last assigned value, and null values will be ignored.
default: single

encoding.only_fields

optional [string]
List of fields that will be included in the encoded event.

encoding.timestamp_format

optional string literal enum
Format used for timestamp fields.
Enum options
OptionDescription
rfc3339Represent the timestamp as a RFC 3339 timestamp.
unixRepresent the timestamp as a Unix timestamp.

exchange

required string template
The exchange to publish messages to.
Note: This parameter supports Vector's template syntax, which enables you to use dynamic per-event values.

healthcheck

optional object
Healthcheck configuration.

healthcheck.enabled

optional bool
Whether or not to check the health of the sink when Vector starts up.
default: true

inputs

required [string]

A list of upstream source or transform IDs.

Wildcards (*) are supported.

See configuration for more info.

Array string literal
Examples
[
  "my-source-or-transform-id",
  "prefix-*"
]

routing_key

optional string template
Template used to generate a routing key which corresponds to a queue binding.
Note: This parameter supports Vector's template syntax, which enables you to use dynamic per-event values.

tls

optional object
TLS configuration.

tls.alpn_protocols

optional [string]

Sets the list of supported ALPN protocols.

Declare the supported ALPN protocols, which are used during negotiation with peer. Prioritized in the order they are defined.

tls.ca_file

optional string literal

Absolute path to an additional CA certificate file.

The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.

Examples
"/path/to/certificate_authority.crt"

tls.crt_file

optional string literal

Absolute path to a certificate file used to identify this server.

The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.

If this is set, and is not a PKCS#12 archive, key_file must also be set.

Examples
"/path/to/host_certificate.crt"

tls.key_file

optional string literal

Absolute path to a private key file used to identify this server.

The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.

Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal

Passphrase used to unlock the encrypted key file.

This has no effect unless key_file is set.

Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"

Enables certificate verification.

If enabled, certificates must be valid in terms of not being expired, as well as being issued by a trusted issuer. This verification operates in a hierarchical manner, checking that not only the leaf certificate (the certificate presented by the client/server) is valid, but also that the issuer of that certificate is valid, and so on until reaching a root certificate.

Relevant for both incoming and outgoing connections.

Do NOT set this to false unless you understand the risks of not verifying the validity of certificates.

tls.verify_hostname

optional bool

Enables hostname verification.

If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.

Only relevant for outgoing connections.

Do NOT set this to false unless you understand the risks of not verifying the remote hostname.

Telemetry

Metrics

link

buffer_byte_size

gauge
The number of bytes current in the buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_discarded_events_total

counter
The number of events dropped by this non-blocking buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_events

gauge
The number of events currently in the buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_received_event_bytes_total

counter
The number of bytes received by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_received_events_total

counter
The number of events received by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_sent_event_bytes_total

counter
The number of bytes sent by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_sent_events_total

counter
The number of events sent by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_count

histogram

A histogram of the number of events passed in each internal batch in Vector’s internal topology.

Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.

component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

events_discarded_total

counter
The total number of events discarded by this component.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
reason
The type of the error

events_in_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins. This metric is deprecated and will be removed in a future version. Use component_received_events_total instead.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

processing_errors_total

counter
The total number of processing errors encountered by this component. This metric is deprecated in favor of component_errors_total.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
error_type
The type of the error
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

utilization

gauge
A ratio from 0 to 1 of the load on a component. A value of 0 would indicate a completely idle component that is simply waiting for input. A value of 1 would indicate a that is never idle. This value is updated every 5 seconds.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

How it works

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require health checks

If you’d like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable health checks

If you’d like to disable health checks for this sink you can set the healthcheck option to false.

Lapin

The amqp source and sink uses lapin under the hood. This is a reliable pure rust library that facilitates communication with AMPQ servers such as RabbitMQ.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols due to OpenSSL’s maturity. You can enable and adjust TLS behavior using the tls.* options.