GCP PubSub
Fetch observability events from GCP’s PubSub messaging system
Requirements
The GCP Pub/Sub source requires a Pub/Sub subscription.
Configuration
Example configurations
{
"sources": {
"my_source_id": {
"type": "gcp_pubsub",
"acknowledgements": null,
"credentials_path": "/path/to/credentials.json",
"project": "vector-123456",
"subscription": "vector-123456"
}
}
}
[sources.my_source_id]
type = "gcp_pubsub"
credentials_path = "/path/to/credentials.json"
project = "vector-123456"
subscription = "vector-123456"
---
sources:
my_source_id:
type: gcp_pubsub
acknowledgements: null
credentials_path: /path/to/credentials.json
project: vector-123456
subscription: vector-123456
{
"sources": {
"my_source_id": {
"type": "gcp_pubsub",
"acknowledgements": null,
"ack_deadline_seconds": 600,
"ack_deadline_secs": 600,
"api_key": "${GCP_API_KEY}",
"credentials_path": "/path/to/credentials.json",
"endpoint": "https://pubsub.googleapis.com",
"full_response_size": 100,
"keepalive_secs": 60,
"max_concurrency": 5,
"poll_time_seconds": 2,
"project": "vector-123456",
"retry_delay_seconds": 1,
"retry_delay_secs": 1,
"framing": null,
"proxy": null,
"tls": null,
"subscription": "vector-123456",
"decoding": null
}
}
}
[sources.my_source_id]
type = "gcp_pubsub"
ack_deadline_seconds = 600
ack_deadline_secs = 600
api_key = "${GCP_API_KEY}"
credentials_path = "/path/to/credentials.json"
endpoint = "https://pubsub.googleapis.com"
full_response_size = 100
keepalive_secs = 60
max_concurrency = 5
poll_time_seconds = 2
project = "vector-123456"
retry_delay_seconds = 1
retry_delay_secs = 1
subscription = "vector-123456"
---
sources:
my_source_id:
type: gcp_pubsub
acknowledgements: null
ack_deadline_seconds: 600
ack_deadline_secs: 600
api_key: ${GCP_API_KEY}
credentials_path: /path/to/credentials.json
endpoint: https://pubsub.googleapis.com
full_response_size: 100
keepalive_secs: 60
max_concurrency: 5
poll_time_seconds: 2
project: vector-123456
retry_delay_seconds: 1
retry_delay_secs: 1
framing: null
proxy: null
tls: null
subscription: vector-123456
decoding: null
ack_deadline_seconds
optional uint600
(seconds)ack_deadline_secs
optional uint600
(seconds)acknowledgements
common optional objectacknowledgement
settings. This setting is deprecated in favor of enabling acknowledgements
in the destination sink.acknowledgements.enabled
common optional boolfalse
api_key
optional string literalcredentials_path
must be set.credentials_path
common optional string literalThe filename for a Google Cloud service account credentials JSON file used to authenticate access to the pubsub project and topic. If this is unset, Vector checks the GOOGLE_APPLICATION_CREDENTIALS
environment variable for a filename.
If no filename is named, Vector will attempt to fetch an instance service account for the compute instance the program is running on. If Vector is not running on a GCE instance, you must define a credentials file as above.
decoding
optional objectdecoding.codec
common optional string literal enumOption | Description |
---|---|
bytes | Uses the raw bytes as-is. |
gelf | Decodes the raw bytes as a GELF message. |
json | Decodes the raw bytes as JSON. |
native | Decodes the raw bytes as Vector’s native Protocol Buffers format. This codec is experimental. |
native_json | Decodes the raw bytes as Vector’s native JSON format. This codec is experimental. |
syslog | Decodes the raw bytes as a Syslog message. Will decode either as the RFC 3164-style format (“old” style) or the more modern RFC 5424-style format (“new” style, includes structured data). |
bytes
endpoint
optional string literalhttps://pubsub.googleapis.com
framing
optional objectframing.character_delimited
required objectcharacter_delimited
framing.method = `character_delimited`
framing.character_delimited.delimiter
required ascii_charframing.character_delimited.max_length
optional uintmax_length
bytes will be discarded entirely.framing.method
common optional string literal enumOption | Description |
---|---|
bytes | Byte frames are passed through as-is according to the underlying I/O boundaries (e.g. split between messages or stream segments). |
character_delimited | Byte frames which are delimited by a chosen character. |
length_delimited | Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length. |
newline_delimited | Byte frames which are delimited by a newline character. |
octet_counting | Byte frames according to the octet counting format. |
bytes
framing.newline_delimited
optional objectnewline_delimited
framing.method = `newline_delimited`
framing.newline_delimited.max_length
optional uintmax_length
bytes will be discarded entirely.framing.octet_counting
optional objectoctet_counting
framing.method = `octet_counting`
framing.octet_counting.max_length
optional uintmax_length
bytes will be discarded entirely.full_response_size
optional uintThe number of messages in a response to mark a stream as "busy".
This is used to determine if more streams should be started.
The GCP Pub/Sub servers send responses with 100 or more messages when
the subscription is busy.
100
keepalive_secs
optional float60
, you may see periodic errors sent from the server.60
max_concurrency
optional uint5
(concurrency)poll_time_seconds
optional float2
(seconds)proxy
optional objectproxy.http
optional string literalproxy.https
optional string literalproxy.no_proxy
optional [string]A list of hosts to avoid proxying. Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.com matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
retry_delay_seconds
optional float1
(seconds)retry_delay_secs
optional float1
(seconds)subscription
required string literaltls
optional objecttls.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. Prioritized in the order they are defined.
tls.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls.crt_file
common optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls.key_file
common optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
tls.verify_certificate
optional boolEnables certificate verification.
If enabled, certificates must be valid in terms of not being expired, as well as being issued by a trusted issuer. This verification operates in a hierarchical manner, checking that not only the leaf certificate (the certificate presented by the client/server) is valid, but also that the issuer of that certificate is valid, and so on until reaching a root certificate.
Relevant for both incoming and outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
true
tls.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
true
Environment variables
HTTPS_PROXY
common optional string literalThe global URL to proxy HTTPS requests through.
If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
HTTP_PROXY
common optional string literalThe global URL to proxy HTTP requests through.
If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
NO_PROXY
common optional string literalList of hosts to avoid proxying globally.
Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.come matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
If another no_proxy
value is set in the configuration file or at a component level, this
one is overridden.
The lowercase variant has priority over the uppercase one.
http_proxy
common optional string literalThe global URL to proxy HTTP requests through.
If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
https_proxy
common optional string literalThe global URL to proxy HTTPS requests through.
If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
no_proxy
common optional string literalList of hosts to avoid proxying globally.
Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.come matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
If another no_proxy
value is set in the configuration file or at a component level, this
one is overridden.
The lowercase variant has priority over the uppercase one.
Outputs
<component_id>
Output Data
Logs
Record
{
"key": "value"
}
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
2345
gcp_pubsub
2020-10-10T17:07:36.452332Z
Telemetry
Metrics
linkcomponent_received_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.events_out_total
countercomponent_sent_events_total
instead.component_id
instead. The value is the same as component_id
.source_lag_time_seconds
histogramcomponent_id
instead. The value is the same as component_id
.How it works
Automatic Concurrency Management
The `gcp_pubsub` source automatically manages the number of concurrent active streams by
monitoring the traffic flowing over the streams.
When a stream receives full responses (as determined by the `full_response_size` setting),
it marks itself as being "busy".
Periodically, the source will poll all the active connections and will start a new stream
if all the active streams are marked as busy and fewer than `max_concurrency` streams are
active.
Conversely, when a stream passes an idle interval (configured by the
`idle_timeout_seconds` setting) with no traffic and no outstanding acknowledgements,
it will drop the connection unless there are no other streams active.
This combination of actions allows this source to respond dynamically to high load levels
without opening up extra connections at startup.
GCP Pub/Sub
gcp_pubsub
source streams messages from GCP Pub/Sub.
This is a highly scalable / durable queueing system with at-least-once queuing semantics.
Messages are received in a stream and are either acknowledged immediately after receiving
or after it has been fully processed by the sink(s), depending on if any of the sink(s)
have the acknowledgements
setting enabled.Transport Layer Security (TLS)
tls.*
options.