Reduce
Collapse multiple log events into a single event based on a set of conditions and merge strategies
Configuration
Example configurations
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
]
}
}
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"expire_after_ms": 30000,
"flush_period_ms": 1000,
"group_by": [
"request_id"
]
}
}
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
expire_after_ms = 30_000
flush_period_ms = 1_000
group_by = [ "request_id" ]
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
expire_after_ms: 30000
flush_period_ms: 1000
group_by:
- request_id
ends_when
optional conditionA condition used to distinguish the final event of a transaction.
If this condition resolves to true
for an event, the current transaction is immediately
flushed with this event.
type
.Available syntaxes
Syntax | Description | Example |
---|---|---|
vrl | A Vector Remap Language (VRL) Boolean expression. | .status_code != 200 && !includes(["info", "debug"], .severity) |
datadog_search | A Datadog Search query string. | *stack |
is_log | Whether the incoming event is a log. |
|
is_metric | Whether the incoming event is a metric. |
|
is_trace | Whether the incoming event is a trace. |
|
Shorthand for VRL
If you opt for the vrl
syntax for this condition, you can set the condition
as a string via the condition
parameter, without needing to specify both a source
and a type
. The
table below shows some examples:
Config format | Example |
---|---|
TOML | condition = ".status == 200" |
YAML | condition: .status == 200 |
JSON | "condition": ".status == 200" |
Condition config examples
Standard VRL
ends_when = { type = "vrl", source = ".status == 500" }
ends_when:
type: "vrl"
source: ".status == 500"
"ends_when": {
"type": "vrl",
"source": ".status == 500"
}
expire_after_ms
optional uint30000
(milliseconds)flush_period_ms
optional uint1000
(milliseconds)group_by
optional [string]An ordered list of fields by which to group events.
Each group with matching values for the specified keys is reduced independently, allowing you to keep independent event streams separate. When no fields are specified, all events are combined in a single group.
For example, if group_by = ["host", "region"]
, then all incoming events that have the same
host and region are grouped together before being reduced.
inputs
required [string]A list of upstream source or transform IDs.
Wildcards (*
) are supported.
See configuration for more info.
max_events
optional uintmerge_strategies
optional objectA map of field names to custom merge strategies.
For each field specified, the given strategy is used for combining events rather than the default behavior.
The default behavior is as follows:
- The first value of a string field is kept and subsequent values are discarded.
- For timestamp fields the first is kept and a new field
[field-name]_end
is added with the last received timestamp value. - Numeric values are summed.
merge_strategies.*
required string literal enumOption | Description |
---|---|
array | Append each value to an array. |
concat | Concatenate each string value, delimited with a space. |
concat_newline | Concatenate each string value, delimited with a newline. |
concat_raw | Concatenate each string, without a delimiter. |
discard | Discard all but the first value found. |
flat_unique | Create a flattened array of all unique values. |
longest_array | Keep the longest array seen. |
max | Keep the maximum numeric value seen. |
min | Keep the minimum numeric value seen. |
retain | Discard all but the last value found. Works as a way to coalesce by not retaining |
shortest_array | Keep the shortest array seen. |
sum | Sum all numeric values. |
starts_when
optional conditionA condition used to distinguish the first event of a transaction.
If this condition resolves to true
for an event, the previous transaction is flushed
(without this event) and a new transaction is started.
type
.Available syntaxes
Syntax | Description | Example |
---|---|---|
vrl | A Vector Remap Language (VRL) Boolean expression. | .status_code != 200 && !includes(["info", "debug"], .severity) |
datadog_search | A Datadog Search query string. | *stack |
is_log | Whether the incoming event is a log. |
|
is_metric | Whether the incoming event is a metric. |
|
is_trace | Whether the incoming event is a trace. |
|
Shorthand for VRL
If you opt for the vrl
syntax for this condition, you can set the condition
as a string via the condition
parameter, without needing to specify both a source
and a type
. The
table below shows some examples:
Config format | Example |
---|---|
TOML | condition = ".status == 200" |
YAML | condition: .status == 200 |
JSON | "condition": ".status == 200" |
Condition config examples
Standard VRL
starts_when = { type = "vrl", source = ".status == 500" }
starts_when:
type: "vrl"
source: ".status == 500"
"starts_when": {
"type": "vrl",
"source": ".status == 500"
}
Outputs
<component_id>
Telemetry
Metrics
linkcomponent_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.events_in_total
countercomponent_received_events_total
instead.component_id
instead. The value is the same as component_id
.events_out_total
countercomponent_sent_events_total
instead.component_id
instead. The value is the same as component_id
.processed_bytes_total
countercomponent_id
instead. The value is the same as component_id
.processed_events_total
countercomponent_received_events_total
and
component_sent_events_total
metrics.component_id
instead. The value is the same as component_id
.stale_events_flushed_total
countercomponent_id
instead. The value is the same as component_id
.utilization
gaugecomponent_id
instead. The value is the same as component_id
.Examples
Merge Ruby exceptions
Given this event...[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:6:in `bar'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:2:in `foo'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "host", "pid", "tid" ]
starts_when = "match(string!(.message), r'^[^\\s]')"
[transforms.my_transform_id.merge_strategies]
message = "concat_newline"
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
group_by:
- host
- pid
- tid
merge_strategies:
message: concat_newline
starts_when: match(string!(.message), r'^[^\s]')
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"group_by": [
"host",
"pid",
"tid"
],
"merge_strategies": {
"message": "concat_newline"
},
"starts_when": "match(string!(.message), r'^[^\\s]')"
}
}
}
[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n from foobar.rb:6:in `bar'\n from foobar.rb:2:in `foo'\n from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
Reduce Rails logs into a single transaction
Given this event...[{"log":{"message":"Received GET /path","request_id":"abcd1234","request_params":{"key":"val"},"request_path":"/path","timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"message":"Executed query in 5.2ms","query":"SELECT * FROM table","query_duration_ms":5.2,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:21.832345Z"}},{"log":{"message":"Rendered partial _partial.erb in 2.3ms","render_duration_ms":2.3,"request_id":"abcd1234","template":"_partial.erb","timestamp":"2020-10-07T12:33:22.457423Z"}},{"log":{"message":"Executed query in 7.8ms","query":"SELECT * FROM table","query_duration_ms":7.8,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:22.543323Z"}},{"log":{"message":"Sent 200 in 15.2ms","request_id":"abcd1234","response_duration_ms":5.2,"response_status":200,"timestamp":"2020-10-07T12:33:22.742322Z"}}]
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
]
}
}
}
{
"query_duration_ms": 13,
"render_duration_ms": 2.3,
"request_id": "abcd1234",
"request_params": {
"key": "val"
},
"request_path": "/path",
"response_duration_ms": 5.2,
"status": 200,
"timestamp": "2020-10-07T12:33:21.223543Z",
"timestamp_end": "2020-10-07T12:33:22.742322Z"
}