GCP PubSub

Fetch observability events from GCP’s PubSub messaging system

status: beta role: aggregator delivery: at-least-once acknowledgements: yes egress: stream state: stateless output: log

Requirements

The GCP Pub/Sub source requires a Pub/Sub subscription.

Configuration

Example configurations

{
  "sources": {
    "my_source_id": {
      "type": "gcp_pubsub",
      "acknowledgements": null,
      "credentials_path": "/path/to/credentials.json",
      "project": "vector-123456",
      "subscription": "vector-123456"
    }
  }
}
[sources.my_source_id]
type = "gcp_pubsub"
credentials_path = "/path/to/credentials.json"
project = "vector-123456"
subscription = "vector-123456"
---
sources:
  my_source_id:
    type: gcp_pubsub
    acknowledgements: null
    credentials_path: /path/to/credentials.json
    project: vector-123456
    subscription: vector-123456
{
  "sources": {
    "my_source_id": {
      "type": "gcp_pubsub",
      "acknowledgements": null,
      "ack_deadline_seconds": 600,
      "ack_deadline_secs": 600,
      "api_key": "${GCP_API_KEY}",
      "credentials_path": "/path/to/credentials.json",
      "endpoint": "https://pubsub.googleapis.com",
      "full_response_size": 100,
      "keepalive_secs": 60,
      "max_concurrency": 5,
      "poll_time_seconds": 2,
      "project": "vector-123456",
      "retry_delay_seconds": 1,
      "retry_delay_secs": 1,
      "framing": null,
      "proxy": null,
      "tls": null,
      "subscription": "vector-123456",
      "decoding": null
    }
  }
}
[sources.my_source_id]
type = "gcp_pubsub"
ack_deadline_seconds = 600
ack_deadline_secs = 600
api_key = "${GCP_API_KEY}"
credentials_path = "/path/to/credentials.json"
endpoint = "https://pubsub.googleapis.com"
full_response_size = 100
keepalive_secs = 60
max_concurrency = 5
poll_time_seconds = 2
project = "vector-123456"
retry_delay_seconds = 1
retry_delay_secs = 1
subscription = "vector-123456"
---
sources:
  my_source_id:
    type: gcp_pubsub
    acknowledgements: null
    ack_deadline_seconds: 600
    ack_deadline_secs: 600
    api_key: ${GCP_API_KEY}
    credentials_path: /path/to/credentials.json
    endpoint: https://pubsub.googleapis.com
    full_response_size: 100
    keepalive_secs: 60
    max_concurrency: 5
    poll_time_seconds: 2
    project: vector-123456
    retry_delay_seconds: 1
    retry_delay_secs: 1
    framing: null
    proxy: null
    tls: null
    subscription: vector-123456
    decoding: null

ack_deadline_seconds

optional uint
The acknowledgement deadline to use for this stream. Messages that are not acknowledged when this deadline expires may be retransmitted. This setting is deprecated and will be removed in a future version.
Examples
10
600
default: 600 (seconds)

ack_deadline_secs

optional uint
The acknowledgement deadline to use for this stream. Messages that are not acknowledged when this deadline expires may be retransmitted.
Examples
10
600
default: 600 (seconds)

acknowledgements

common optional object
Controls how acknowledgements are handled by this source. These settings override the global acknowledgement settings. This setting is deprecated in favor of enabling acknowledgements in the destination sink.

acknowledgements.enabled

common optional bool
Controls if the source will wait for destination sinks to deliver the events before acknowledging receipt.
default: false

api_key

optional string literal
A Google Cloud API key used to authenticate access the pubsub project and topic. Either this or credentials_path must be set.
Examples
"${GCP_API_KEY}"
"ef8d5de700e7989468166c40fc8a0ccd"

credentials_path

common optional string literal

The filename for a Google Cloud service account credentials JSON file used to authenticate access to the pubsub project and topic. If this is unset, Vector checks the GOOGLE_APPLICATION_CREDENTIALS environment variable for a filename.

If no filename is named, Vector will attempt to fetch an instance service account for the compute instance the program is running on. If Vector is not running on a GCE instance, you must define a credentials file as above.

Examples
"/path/to/credentials.json"

decoding

optional object
Configures in which way frames are decoded into events.

decoding.codec

common optional string literal enum
The decoding method.
Enum options
OptionDescription
bytesEvents containing the byte frame as-is.
gelfEvents being parsed from a GELF message.
jsonEvents being parsed from a JSON string.
nativeEvents being parsed from Vector’s native protobuf format (EXPERIMENTAL).
native_jsonEvents being parsed from Vector’s native JSON format (EXPERIMENTAL).
syslogEvents being parsed from a Syslog message.
default: bytes

endpoint

optional string literal
The endpoint from which to pull data.
Examples
"https://us-central1-pubsub.googleapis.com"
default: https://pubsub.googleapis.com

framing

optional object
Configures in which way incoming byte sequences are split up into byte frames.
Options for character_delimited framing.
Relevant when: method = `character_delimited`
The character used to separate frames.
Examples
"\n"
"\t"
The maximum frame length limit. Any frames longer than max_length bytes will be discarded entirely.
Examples
65535
102400

framing.method

common optional string literal enum
The framing method.
Enum options
OptionDescription
bytesByte frames are passed through as-is according to the underlying I/O boundaries (e.g. split between messages or stream segments).
character_delimitedByte frames which are delimited by a chosen character.
length_delimitedByte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
newline_delimitedByte frames which are delimited by a newline character.
octet_countingByte frames according to the octet counting format.
default: bytes
Options for newline_delimited framing.
Relevant when: method = `newline_delimited`
The maximum frame length limit. Any frames longer than max_length bytes will be discarded entirely.
Examples
65535
102400

framing.octet_counting

optional object
Options for octet_counting framing.
Relevant when: method = `octet_counting`
The maximum frame length limit. Any frames longer than max_length bytes will be discarded entirely.
Examples
65535
102400

full_response_size

optional uint
The number of messages in a response to mark a stream as "busy".
This is used to determine if more streams should be started.
The GCP Pub/Sub servers send responses with 100 or more messages when
the subscription is busy.
Examples
100
128
default: 100

keepalive_secs

optional float
The amount of time, in seconds, with no received activity before sending a keepalive request. If this is set larger than 60, you may see periodic errors sent from the server.
Examples
10
default: 60

max_concurrency

optional uint
The maximum number of concurrent stream connections to open at once.
Examples
1
9
default: 5 (concurrency)

poll_time_seconds

optional float
How often to poll the currently active streams to see if they are all busy and so open a new stream.
Examples
1
5
default: 2 (seconds)

project

required string literal
The project name from which to pull logs.
Examples
"vector-123456"

proxy

optional object
Configures an HTTP(S) proxy for Vector to use. By default, the globally configured proxy is used.

proxy.enabled

optional bool
If false the proxy will be disabled.
default: true

proxy.http

optional string literal
The URL to proxy HTTP requests through.
Examples
"http://foo.bar:3128"

proxy.https

optional string literal
The URL to proxy HTTPS requests through.
Examples
"http://foo.bar:3128"

proxy.no_proxy

optional [string]

A list of hosts to avoid proxying. Allowed patterns here include:

PatternExample match
Domain namesexample.com matches requests to example.com
Wildcard domains.example.com matches requests to example.com and its subdomains
IP addresses127.0.0.1 matches requests to 127.0.0.1
CIDR blocks192.168.0.0./16 matches requests to any IP addresses in this range
Splat* matches all hosts

retry_delay_seconds

optional float
The amount of time to wait between retry attempts after an error. This setting is deprecated and will be removed in a future version.
Examples
0.5
default: 1 (seconds)

retry_delay_secs

optional float
The amount of time to wait between retry attempts after an error.
Examples
0.5
default: 1 (seconds)

subscription

required string literal
The subscription within the project which is configured to receive logs.
Examples
"vector-123456"

tls

optional object
Configures the TLS options for outgoing connections.

tls.alpn_protocols

optional [string]
Sets the list of supported ALPN protolols, which are used during negotiation with peer. Prioritized in the order they are defined.

tls.ca_file

optional string literal
Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.
Examples
"/path/to/certificate_authority.crt"

tls.crt_file

common optional string literal
Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.
Examples
"/path/to/host_certificate.crt"

tls.key_file

common optional string literal
Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.
Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal
Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.
Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"
If true (the default), Vector will validate the TLS certificate of the remote host. Specifically the issuer is checked but not CRLs (Certificate Revocation Lists).
default: true

tls.verify_hostname

optional bool
If true (the default), Vector will validate the configured remote host name against the remote host’s TLS certificate. Do NOT set this to false unless you understand the risks of not verifying the remote hostname.
default: true

Environment variables

HTTPS_PROXY

common optional string literal

The global URL to proxy HTTPS requests through.

If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.

The lowercase variant has priority over the uppercase one.

Examples
http://foo.bar:3128

HTTP_PROXY

common optional string literal

The global URL to proxy HTTP requests through.

If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.

The lowercase variant has priority over the uppercase one.

Examples
http://foo.bar:3128

NO_PROXY

common optional string literal

List of hosts to avoid proxying globally.

Allowed patterns here include:

PatternExample match
Domain namesexample.com matches requests to example.com
Wildcard domains.example.come matches requests to example.com and its subdomains
IP addresses127.0.0.1 matches requests to 127.0.0.1
CIDR blocks192.168.0.0./16 matches requests to any IP addresses in this range
Splat* matches all hosts

If another no_proxy value is set in the configuration file or at a component level, this one is overridden.

The lowercase variant has priority over the uppercase one.

Examples
localhost,.example.com,192.168.0.0./16
*

http_proxy

common optional string literal

The global URL to proxy HTTP requests through.

If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.

The lowercase variant has priority over the uppercase one.

Examples
http://foo.bar:3128

https_proxy

common optional string literal

The global URL to proxy HTTPS requests through.

If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.

The lowercase variant has priority over the uppercase one.

Examples
http://foo.bar:3128

no_proxy

common optional string literal

List of hosts to avoid proxying globally.

Allowed patterns here include:

PatternExample match
Domain namesexample.com matches requests to example.com
Wildcard domains.example.come matches requests to example.com and its subdomains
IP addresses127.0.0.1 matches requests to 127.0.0.1
CIDR blocks192.168.0.0./16 matches requests to any IP addresses in this range
Splat* matches all hosts

If another no_proxy value is set in the configuration file or at a component level, this one is overridden.

The lowercase variant has priority over the uppercase one.

Examples
localhost,.example.com,192.168.0.0./16
*

Outputs

<component_id>

Default output stream of the component. Use this component’s ID as an input to downstream transforms and sinks.

Output Data

Logs

Record

An individual Pub/Sub record
Fields
attributes required object
Attributes that were published with the Pub/Sub record.
Examples
{
  "key": "value"
}
message required string literal
The message from the Pub/Sub record, parsed from the raw data.
Examples
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
message_id required string literal
The ID of this message, assigned by the server when the message is published. Guaranteed to be unique within the topic.
Examples
2345
source_type required string literal
The name of the source type.
Examples
gcp_pubsub
timestamp required timestamp
The time this message was published in the topic.
Examples
2020-10-10T17:07:36.452332Z

Telemetry

Metrics

link

component_received_bytes_total

counter
The number of raw bytes accepted by this component from source origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

events_out_total

counter
The total number of events emitted by this component. This metric is deprecated and will be removed in a future version. Use component_sent_events_total instead.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

source_lag_time_seconds

histogram
The difference between the timestamp recorded in each event and the time when it was ingested, expressed as fractional seconds.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

How it works

Automatic Concurrency Management

The `gcp_pubsub` source automatically manages the number of concurrent active streams by
monitoring the traffic flowing over the streams.
When a stream receives full responses (as determined by the `full_response_size` setting),
it marks itself as being "busy".
Periodically, the source will poll all the active connections and will start a new stream
if all the active streams are marked as busy and fewer than `max_concurrency` streams are
active.
Conversely, when a stream passes an idle interval (configured by the
`idle_timeout_seconds` setting) with no traffic and no outstanding acknowledgements,
it will drop the connection unless there are no other streams active.
This combination of actions allows this source to respond dynamically to high load levels
without opening up extra connections at startup.

Context

By default, the gcp_pubsub source augments events with helpful context keys.

GCP Pub/Sub

The gcp_pubsub source streams messages from GCP Pub/Sub. This is a highly scalable / durable queueing system with at-least-once queuing semantics. Messages are received in a stream and are either acknowledged immediately after receiving or after it has been fully processed by the sink(s), depending on if any of the sink(s) have the acknowledgements setting enabled.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols. You can adjust TLS behavior via the tls.* options.