Reduce
Collapse multiple log events into a single event based on a set of conditions and merge strategies
status: beta
egress: stream
state: stateful
Reduces multiple log events into a single log event based on a set of
conditions and merge strategies.
Configuration
Example configurations
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"group_by": [
"request_id"
]
}
}
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "request_id" ]
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
group_by:
- request_id
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"ends_when": ".status_code != 200 && !includes([\"info\", \"debug\"], .severity)",
"expire_after_ms": 30000,
"flush_period_ms": 1000,
"group_by": [
"request_id"
],
"merge_strategies": {
"method": "discard",
"path": "discard",
"duration_ms": "sum",
"query": "array"
},
"starts_when": null
}
}
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
ends_when = '.status_code != 200 && !includes(["info", "debug"], .severity)'
expire_after_ms = 30_000
flush_period_ms = 1_000
group_by = [ "request_id" ]
[transforms.my_transform_id.merge_strategies]
method = "discard"
path = "discard"
duration_ms = "sum"
query = "array"
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
ends_when: .status_code != 200 && !includes(["info", "debug"], .severity)
expire_after_ms: 30000
flush_period_ms: 1000
group_by:
- request_id
merge_strategies:
method: discard
path: discard
duration_ms: sum
query: array
starts_when: null
ends_when
optional string literalA condition used to distinguish the final event of a transaction. If this condition resolves to
true
for an event, the current transaction is immediately flushed with this event.expire_after_ms
optional uintA maximum period of time to wait after the last event is received before a combined event should be considered complete.
default:
30000
(milliseconds)flush_period_ms
optional uintControls the frequency that Vector checks for (and flushes) expired events.
default:
1000
(milliseconds)group_by
common optional [string]An ordered list of fields by which to group events. Each group is combined independently, allowing you to keep independent events separate. When no fields are specified, all events will be combined in a single group. Events missing a specified field will be combined in their own group.
inputs
required [string]A list of upstream source or transform
IDs. Wildcards (*
) are supported.
See configuration for more info.
merge_strategies
optional objectA map of field names to custom merge strategies. For each field specified this strategy will be used for combining events rather than the default behavior.
The default behavior is as follows:
- The first value of a string field is kept, subsequent values are discarded.
- For timestamp fields the first is kept and a new field
[field-name]_end
is added with the last received timestamp value. - Numeric values are summed.
merge_strategies.*
required string literal enumThe custom merge strategy to use for a field.
Enum options
Option | Description |
---|---|
array | Each value is appended to an array. |
concat | Concatenate each string value (delimited with a space). |
concat_newline | Concatenate each string value (delimited with a newline). |
discard | Discard all but the first value found. |
flat_unique | Create a flattened array of all the unique values. |
longest_array | Retains the longest array seen |
max | The maximum of all numeric values. |
min | The minimum of all numeric values. |
retain | Discard all but the last value found. Works as a coalesce by not retaining null. |
shortest_array | Retains the shortest array seen |
sum | Sum all numeric values. |
starts_when
optional conditionA condition used to distinguish the first event of a transaction. If this condition resolves to
true
for an event, the previous transaction is flushed (without this event) and a new transaction is started.The text of the condition. The syntax of the condition depends on the value of
type
.Available syntaxes
Syntax | Description | Example |
---|---|---|
vrl | A Vector Remap Language (VRL) Boolean expression. | .status_code != 200 && !includes(["info", "debug"], .severity) |
datadog_search | A Datadog Search query string. | *stack |
Shorthand for VRL
If you opt for the vrl
syntax for this condition, you can set the condition
as a string via the condition
parameter, without needing to specify both a source
and a type
. The
table below shows some examples:
Config format | Example |
---|---|
TOML | condition = ".status == 200" |
YAML | condition: .status == 200 |
JSON | "condition": ".status == 200" |
Condition config examples
Standard VRL
starts_when = { type = "vrl", source = ".status == 500" }
starts_when:
type: "vrl"
source: ".status == 500"
"starts_when": {
"type": "vrl",
"source": ".status == 500"
}
Outputs
<component_id>
Default output stream of the component. Use this component’s ID as an input to downstream transforms and sinks.
Telemetry
Metrics
linkcomponent_received_event_bytes_total
counterThe number of event bytes accepted by this component either from
tagged origins like file and uri, or cumulatively from other origins.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the data originated.
file
optional
The file from which the data originated.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the data originated.
peer_path
optional
The pathname from which the data originated.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the data originated.
uri
optional
The sanitized URI from which the data originated.
component_received_events_count
histogramA histogram of Vector the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging
performance issues in Vector due to small internal batches.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the data originated.
file
optional
The file from which the data originated.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the data originated.
peer_path
optional
The pathname from which the data originated.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the data originated.
uri
optional
The sanitized URI from which the data originated.
component_received_events_total
counterThe number of events accepted by this component either from tagged
origins like file and uri, or cumulatively from other origins.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the data originated.
file
optional
The file from which the data originated.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the data originated.
peer_path
optional
The pathname from which the data originated.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the data originated.
uri
optional
The sanitized URI from which the data originated.
component_sent_event_bytes_total
counterThe total number of event bytes emitted by this component.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
output
optional
The specific output of the component.
pid
optional
The process ID of the Vector instance.
component_sent_events_total
counterThe total number of events emitted by this component.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
output
optional
The specific output of the component.
pid
optional
The process ID of the Vector instance.
events_in_total
counterThe number of events accepted by this component either from tagged
origins like file and uri, or cumulatively from other origins.
This metric is deprecated and will be removed in a future version.
Use
component_received_events_total
instead.component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the data originated.
file
optional
The file from which the data originated.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the data originated.
peer_path
optional
The pathname from which the data originated.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the data originated.
uri
optional
The sanitized URI from which the data originated.
events_out_total
counterThe total number of events emitted by this component.
This metric is deprecated and will be removed in a future version.
Use
component_sent_events_total
instead.component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
output
optional
The specific output of the component.
pid
optional
The process ID of the Vector instance.
processed_bytes_total
counterThe number of bytes processed by the component.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
container_name
optional
The name of the container from which the bytes originate.
file
optional
The file from which the bytes originate.
host
optional
The hostname of the system Vector is running on.
mode
optional
The connection mode used by the component.
peer_addr
optional
The IP from which the bytes originate.
peer_path
optional
The pathname from which the bytes originate.
pid
optional
The process ID of the Vector instance.
pod_name
optional
The name of the pod from which the bytes originate.
uri
optional
The sanitized URI from which the bytes originate.
processed_events_total
counterThe total number of events processed by this component.
This metric is deprecated in place of using
component_received_events_total
and
component_sent_events_total
metrics.component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
stale_events_flushed_total
counterThe number of stale events that Vector has flushed.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
utilization
gaugeA ratio from 0 to 1 of the load on a component. A value of 0 would indicate a completely idle component that is simply waiting for input. A value of 1 would indicate a that is never idle. This value is updated every 5 seconds.
component_id
required
The Vector component ID.
component_kind
required
The Vector component kind.
component_name
required
Deprecated, use
component_id
instead. The value is the same as component_id
.component_type
required
The Vector component type.
host
optional
The hostname of the system Vector is running on.
pid
optional
The process ID of the Vector instance.
Examples
Merge Ruby exceptions
Given this event...[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:6:in `bar'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:2:in `foo'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "host", "pid", "tid" ]
starts_when = "match(string!(.message), r'^[^\\s]')"
[transforms.my_transform_id.merge_strategies]
message = "concat_newline"
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
group_by:
- host
- pid
- tid
merge_strategies:
message: concat_newline
starts_when: match(string!(.message), r'^[^\s]')
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"group_by": [
"host",
"pid",
"tid"
],
"merge_strategies": {
"message": "concat_newline"
},
"starts_when": "match(string!(.message), r'^[^\\s]')"
}
}
}
[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n from foobar.rb:6:in `bar'\n from foobar.rb:2:in `foo'\n from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
Reduce Rails logs into a single transaction
Given this event...[{"log":{"message":"Received GET /path","request_id":"abcd1234","request_params":{"key":"val"},"request_path":"/path","timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"message":"Executed query in 5.2ms","query":"SELECT * FROM table","query_duration_ms":5.2,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:21.832345Z"}},{"log":{"message":"Rendered partial _partial.erb in 2.3ms","render_duration_ms":2.3,"request_id":"abcd1234","template":"_partial.erb","timestamp":"2020-10-07T12:33:22.457423Z"}},{"log":{"message":"Executed query in 7.8ms","query":"SELECT * FROM table","query_duration_ms":7.8,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:22.543323Z"}},{"log":{"message":"Sent 200 in 15.2ms","request_id":"abcd1234","response_duration_ms":5.2,"response_status":200,"timestamp":"2020-10-07T12:33:22.742322Z"}}]
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
]
}
}
}
{
"query_duration_ms": 13,
"render_duration_ms": 2.3,
"request_id": "abcd1234",
"request_params": {
"key": "val"
},
"request_path": "/path",
"response_duration_ms": 5.2,
"status": 200,
"timestamp": "2020-10-07T12:33:21.223543Z",
"timestamp_end": "2020-10-07T12:33:22.742322Z"
}