GCP PubSub
Fetch observability events from GCP’s PubSub messaging system
Requirements
The GCP Pub/Sub source requires a Pub/Sub subscription.
Configuration
Example configurations
{
"sources": {
"my_source_id": {
"type": "gcp_pubsub"
}
}
}
[sources.my_source_id]
type = "gcp_pubsub"
---
sources:
my_source_id:
type: gcp_pubsub
{
"sources": {
"my_source_id": {
"type": "gcp_pubsub",
"ack_deadline_secs": 600,
"endpoint": "https://pubsub.googleapis.com",
"full_response_size": 100,
"keepalive_secs": 60,
"max_concurrency": 10,
"poll_time_seconds": 2,
"retry_delay_secs": 1
}
}
}
[sources.my_source_id]
type = "gcp_pubsub"
ack_deadline_secs = 600
endpoint = "https://pubsub.googleapis.com"
full_response_size = 100
keepalive_secs = 60
max_concurrency = 10
poll_time_seconds = 2
retry_delay_secs = 1
---
sources:
my_source_id:
type: gcp_pubsub
ack_deadline_secs: 600
endpoint: https://pubsub.googleapis.com
full_response_size: 100
keepalive_secs: 60
max_concurrency: 10
poll_time_seconds: 2
retry_delay_secs: 1
ack_deadline_seconds
optional uintThe acknowledgement deadline, in seconds, to use for this stream.
Messages that are not acknowledged when this deadline expires may be retransmitted.
ack_deadline_secs
optional uintThe acknowledgement deadline, in seconds, to use for this stream.
Messages that are not acknowledged when this deadline expires may be retransmitted.
600
(seconds)acknowledgements
optional objectControls how acknowledgements are handled by this source.
This setting is deprecated in favor of enabling acknowledgements
at the global or sink level.
Enabling or disabling acknowledgements at the source level has no effect on acknowledgement behavior.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional boolapi_key
optional string literalAn API key.
Either an API key or a path to a service account credentials JSON file can be specified.
If both are unset, the GOOGLE_APPLICATION_CREDENTIALS
environment variable is checked for a filename. If no
filename is named, an attempt is made to fetch an instance service account for the compute instance the program is
running on. If this is not on a GCE instance, then you must define it with an API key or service account
credentials JSON file.
credentials_path
optional string literalPath to a service account credentials JSON file.
Either an API key or a path to a service account credentials JSON file can be specified.
If both are unset, the GOOGLE_APPLICATION_CREDENTIALS
environment variable is checked for a filename. If no
filename is named, an attempt is made to fetch an instance service account for the compute instance the program is
running on. If this is not on a GCE instance, then you must define it with an API key or service account
credentials JSON file.
decoding
optional objectdecoding.codec
optional string literal enumOption | Description |
---|---|
bytes | Uses the raw bytes as-is. |
gelf | Decodes the raw bytes as a GELF message. |
json | Decodes the raw bytes as JSON. |
native | Decodes the raw bytes as Vector’s native Protocol Buffers format. This codec is experimental. |
native_json | Decodes the raw bytes as Vector’s native JSON format. This codec is experimental. |
syslog | Decodes the raw bytes as a Syslog message. Decodes either as the RFC 3164-style format (“old” style) or the RFC 5424-style format (“new” style, includes structured data). |
bytes
endpoint
optional string literalhttps://pubsub.googleapis.com
framing
optional objectFraming configuration.
Framing handles how events are separated when encoded in a raw byte form, where each event is a frame that must be prefixed, or delimited, in a way that marks where an event begins and ends within the byte stream.
framing.character_delimited
required objectmethod = "character_delimited"
framing.character_delimited.delimiter
required uintframing.character_delimited.max_length
optional uintThe maximum length of the byte buffer.
This length does not include the trailing delimiter.
By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.
If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.
framing.method
optional string literal enumOption | Description |
---|---|
bytes | Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments). |
character_delimited | Byte frames which are delimited by a chosen character. |
length_delimited | Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length. |
newline_delimited | Byte frames which are delimited by a newline character. |
octet_counting | Byte frames according to the octet counting format. |
bytes
framing.newline_delimited
optional objectmethod = "newline_delimited"
framing.newline_delimited.max_length
optional uintThe maximum length of the byte buffer.
This length does not include the trailing delimiter.
By default, there is no maximum length enforced. If events are malformed, this can lead to additional resource usage as events continue to be buffered in memory, and can potentially lead to memory exhaustion in extreme cases.
If there is a risk of processing malformed data, such as logs with user-controlled input, consider setting the maximum length to a reasonably large value as a safety net. This ensures that processing is not actually unbounded.
framing.octet_counting
optional objectmethod = "octet_counting"
framing.octet_counting.max_length
optional uintfull_response_size
optional uintThe number of messages in a response to mark a stream as “busy”. This is used to determine if more streams should be started.
The GCP Pub/Sub servers send responses with 100 or more messages when the subscription is busy.
100
keepalive_secs
optional float60
, you may see periodic errors sent from the server.60
(seconds)max_concurrency
optional uint10
poll_time_seconds
optional float2
(seconds)project
required string literalproxy
optional objectProxy configuration.
Configure to proxy traffic through an HTTP(S) proxy when making external requests.
Similar to common proxy configuration convention, users can set different proxies to use based on the type of traffic being proxied, as well as set specific hosts that should not be proxied.
proxy.http
optional string literalProxy endpoint to use when proxying HTTP traffic.
Must be a valid URI string.
proxy.https
optional string literalProxy endpoint to use when proxying HTTPS traffic.
Must be a valid URI string.
proxy.no_proxy
optional [string]A list of hosts to avoid proxying.
Multiple patterns are allowed:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.com matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0/16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
retry_delay_seconds
optional floatretry_delay_secs
optional float1
(seconds)subscription
required string literaltls
optional objecttls.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.
tls.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls.crt_file
optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls.key_file
optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
tls.verify_certificate
optional boolEnables certificate verification.
If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.
Relevant for both incoming and outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
tls.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
Environment variables
HTTPS_PROXY
common optional string literalThe global URL to proxy HTTPS requests through.
If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
HTTP_PROXY
common optional string literalThe global URL to proxy HTTP requests through.
If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
NO_PROXY
common optional string literalList of hosts to avoid proxying globally.
Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.come matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
If another no_proxy
value is set in the configuration file or at a component level, this
one is overridden.
The lowercase variant has priority over the uppercase one.
http_proxy
common optional string literalThe global URL to proxy HTTP requests through.
If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
https_proxy
common optional string literalThe global URL to proxy HTTPS requests through.
If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
no_proxy
common optional string literalList of hosts to avoid proxying globally.
Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.come matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
If another no_proxy
value is set in the configuration file or at a component level, this
one is overridden.
The lowercase variant has priority over the uppercase one.
Outputs
<component_id>
Output Data
Logs
Record
{
"key": "value"
}
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
2345
gcp_pubsub
2020-10-10T17:07:36.452332Z
Telemetry
Metrics
linkcomponent_received_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.events_out_total
countercomponent_sent_events_total
instead.component_id
instead. The value is the same as component_id
.source_lag_time_seconds
histogramcomponent_id
instead. The value is the same as component_id
.How it works
Automatic Concurrency Management
The `gcp_pubsub` source automatically manages the number of concurrent active streams by
monitoring the traffic flowing over the streams.
When a stream receives full responses (as determined by the `full_response_size` setting),
it marks itself as being "busy".
Periodically, the source will poll all the active connections and will start a new stream
if all the active streams are marked as busy and fewer than `max_concurrency` streams are
active.
Conversely, when a stream passes an idle interval (configured by the
`idle_timeout_seconds` setting) with no traffic and no outstanding acknowledgements,
it will drop the connection unless there are no other streams active.
This combination of actions allows this source to respond dynamically to high load levels
without opening up extra connections at startup.
GCP Pub/Sub
gcp_pubsub
source streams messages from GCP Pub/Sub.
This is a highly scalable / durable queueing system with at-least-once queuing semantics.
Messages are received in a stream and are either acknowledged immediately after receiving
or after it has been fully processed by the sink(s), depending on if any of the sink(s)
have the acknowledgements
setting enabled.Transport Layer Security (TLS)
tls.*
options.