GCP PubSub Sink
The Vector gcp_pubsub
sink
sends logs to GCP PubSub.
Configuration
- Common
- Advanced
- vector.toml
- vector.yaml
- vector.json
[sinks.my_sink_id]# Generaltype = "gcp_pubsub" # requiredinputs = ["my-source-or-transform-id"] # requiredcredentials_path = "/path/to/credentials.json" # optional, no defaultproject = "vector-123456" # requiredtopic = "this-is-a-topic" # required# Encoding# Healthcheckhealthcheck.enabled = true # optional, default
- optionalstring
api_key
A Google Cloud API key used to authenticate access the pubsub project and topic. Either this or
credentials_path
must be set. See GCP Authentication for more info.- View examples
- optionaltable
batch
Configures the sink batching behavior.
- optionaluint
max_bytes
The maximum size of a batch, in bytes, before it is flushed.
- Default:
10485760
(bytes)
- Default:
- optionaluint
max_events
The maximum size of a batch, in events, before it is flushed. See Buffers & batches for more info.
- Default:
1000
(events)
- Default:
- optionaluint
timeout_secs
The maximum age of a batch before it is flushed. See Buffers & batches for more info.
- Default:
1
(seconds)
- Default:
- optionaltable
buffer
Configures the sink specific buffer behavior.
- optionaluint
max_events
The maximum number of events allowed in the buffer. See Buffers & batches for more info.
- Only relevant when: type = "memory"
- Default:
500
(events)
- required*uint
max_size
The maximum size of the buffer on the disk. See Buffers & batches for more info.
- Only required when: type = "disk"
- View examples
- optionalstring
type
The buffer's type and storage mechanism.
- Default:
"memory"
- Enum, must be one of:
"memory"
"disk"
- View examples
- Default:
- optionalstring
when_full
The behavior when the buffer becomes full.
- Default:
"block"
- Enum, must be one of:
"block"
"drop_newest"
- View examples
- Default:
- optionalstring
credentials_path
The filename for a Google Cloud service account credentials JSON file used to authenticate access to the pubsub project and topic. If this is unset, Vector checks the
GOOGLE_APPLICATION_CREDENTIALS
environment variable for a filename.If no filename is named, Vector will attempt to fetch an instance service account for the compute instance the program is running on. If Vector is not running on a GCE instance, you must define a credentials file as above. See GCP Authentication for more info.
- View examples
- requiredtable
encoding
Configures the encoding specific sink behavior.
- optional[string]
except_fields
Prevent the sink from encoding the specified labels.
- View examples
- optional[string]
only_fields
Prevent the sink from encoding the specified labels.
- View examples
- optionalstring
timestamp_format
How to format event timestamps.
- Default:
"rfc3339"
- Enum, must be one of:
"rfc3339"
"unix"
- View examples
- Default:
- optionalstring
endpoint
The endpoint to which to send data.
- Default:
"https://pubsub.googleapis.com"
- View examples
- Default:
- optionaltable
healthcheck
Health check options for the sink. See Health checks for more info.
- optionalbool
enabled
Enables/disables the healthcheck upon Vector boot.
- Default:
true
- View examples
- Default:
- requiredstring
project
The project name to which to publish logs.
- View examples
- optionaltable
request
Configures the sink request behavior.
- optionaltable
adaptive_concurrency
Configure the adaptive concurrency algorithms. These values have been tuned by optimizing simulated results. In general you should not need to adjust these.
- optionalfloat
decrease_ratio
The fraction of the current value to set the new concurrency limit when decreasing the limit. Valid values are greater than 0 and less than 1. Smaller values cause the algorithm to scale back rapidly when latency increases. Note that the new limit is rounded down after applying this ratio.
- Default:
0.9
- Default:
- optionalfloat
ewma_alpha
The adaptive concurrency algorithm uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with the current RTT. This value controls how heavily new measurements are weighted compared to older ones. Valid values are greater than 0 and less than 1. Smaller values cause this reference to adjust more slowly, which may be useful if a service has unusually high response variability.
- Default:
0.7
- Default:
- optionalfloat
rtt_threshold_ratio
When comparing the past RTT average to the current measurements, we ignore changes that are less than this ratio higher than the past RTT. Valid values are greater than or equal to 0. Larger values cause the algorithm to ignore larger increases in the RTT.
- Default:
0.05
- Default:
- optionaluint
concurrency
The maximum number of in-flight requests allowed at any given time, or "auto" to allow Vector to automatically set the limit based on current network and service conditions.
- Default:
5
(requests)
- Default:
- optionaluint
rate_limit_duration_secs
The time window, in seconds, used for the
rate_limit_num
option.- Default:
1
(seconds)
- Default:
- optionaluint
rate_limit_num
The maximum number of requests allowed within the
rate_limit_duration_secs
time window.- Default:
100
- Default:
- optionaluint
retry_attempts
The maximum number of retries to make for failed requests. The default, for all intents and purposes, represents an infinite number of retries.
- Default:
18446744073709552000
- Default:
- optionaluint
retry_initial_backoff_secs
The amount of time to wait before attempting the first retry for a failed request. Once, the first retry has failed the fibonacci sequence will be used to select future backoffs.
- Default:
1
(seconds)
- Default:
- optionaluint
retry_max_duration_secs
The maximum amount of time, in seconds, to wait between retries.
- Default:
10
(seconds)
- Default:
- optionaluint
timeout_secs
The maximum time a request can take before being aborted. It is highly recommended that you do not lower this value below the service's internal timeout, as this could create orphaned requests, pile on retries, and result in duplicate data downstream. See Buffers & batches for more info.
- Default:
60
(seconds)
- Default:
- optionaltable
tls
Configures the TLS options for incoming connections.
- optionalstring
ca_file
Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.
- View examples
- optionalstring
crt_file
Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive,
key_file
must also be set.- View examples
- optionalstring
key_file
Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set,
crt_file
must also be set.- View examples
- optionalstring
key_pass
Pass phrase used to unlock the encrypted key file. This has no effect unless
key_file
is set.- View examples
- optionalbool
verify_hostname
If
true
(the default), Vector will validate the configured remote host name against the remote host's TLS certificate. Do NOT set this tofalse
unless you understand the risks of not verifying the remote hostname.- Default:
true
- View examples
- Default:
- requiredstring
topic
The topic within the project to which to publish logs.
- View examples
Env Vars
- optionalstring
GOOGLE_APPLICATION_CREDENTIALS
The filename for a Google Cloud service account credentials JSON file used for authentication. See GCP Authentication for more info.
- Only relevant when: endpoint = null
- View examples
Telemetry
This component provides the following metrics that can be retrieved through
the internal_metrics
source. See the
metrics section in the
monitoring page for more info.
- counter
processed_events_total
The total number of events processed by this component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.file
- The file that produced the errorinstance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
- counter
processed_bytes_total
The total number of bytes processed by the component. This metric includes the following tags:
component_kind
- The Vector component kind.component_name
- The Vector component ID.component_type
- The Vector component type.instance
- The Vector instance identified by host and port.job
- The name of the job producing Vector metrics.
How It Works
Buffers & batches
This component buffers & batches data as shown in the diagram above. You'll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
timeout_secs
. - The batch size meets or exceeds the configured <% if component.options.batch.children.respond_to?(:max_size) %>
max_size
<% else %>max_events
<% end %>.
Buffers are controlled via the buffer.*
options.
GCP Authentication
GCP offers a variety of authentication methods and Vector is concerned with the server to server methods and will find credentials in the following order:
- If the
credentials_path
option is set. - If the
api_key
option is set. - If the
GOOGLE_APPLICATION_CREDENTIALS
envrionment variable is set. - Finally, Vector will check for an instance service account.
If credentials are not found the healtcheck will fail and an error will be logged.
Health checks
Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.
Require health checks
If you'd like to exit immediately upon a health
check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.toml --require-healthy
Disable health checks
If you'd like to disable health checks for this
sink you can set the healthcheck
option to
false
.
Partitioning
Vector supports dynamic configuration values through a simple template syntax. If an option supports templating, it will be noted with a badge and you can use event fields to create dynamic values. For example:
[sinks.my-sink]dynamic_option = "application={{ application_id }}"
In the above example, the application_id
for each event will be
used to partition outgoing data.
Rate limits & adapative concurrency
Adaptive Request Concurrency (ARC)
Adaptive Requst Concurrency is a feature of Vector that does away with static rate limits and automatically optimizes HTTP concurrency limits based on downstream service responses. The underlying mechanism is a feedback loop inspired by TCP congestion control algorithms. Checkout the announcement blog post,
We highly recommend enabling this feature as it improves performance and reliability of Vector and the systems it communicates with.
To enable, set the request.concurrency
option to adaptive
:
[sinks.my-sink]request.concurrency = "adaptive"
Static rate limits
If Adaptive Request Concurrency is not for you, you can manually
set static rate limits with the request.rate_limit_duration_secs
,
request.rate_limit_num
, and request.concurrency
options:
[sinks.my-sink]request.rate_limit_duration_secs = 1request.rate_limit_num = 10request.concurrency = 10
Retry policy
Vector will retry failed requests (status == 429, >= 500, and != 501).
Other responses will not be retried. You can control the number of
retry attempts and backoff rate with the request.retry_attempts
and
request.retry_backoff_secs
options.
Transport Layer Security (TLS)
Vector uses Openssl for TLS protocols for it's
maturity. You can enable and adjust TLS behavior via the tls.*
options.