Reduce
Collapse multiple log events into a single event based on a set of conditions and merge strategies
Configuration
Example configurations
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
]
}
}
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"expire_after_ms": 30000,
"flush_period_ms": 1000,
"group_by": [
"request_id"
]
}
}
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
expire_after_ms = 30_000
flush_period_ms = 1_000
group_by = [ "request_id" ]
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
expire_after_ms: 30000
flush_period_ms: 1000
group_by:
- request_id
end_every_period_ms
optional uintends_when
optional conditionA condition used to distinguish the final event of a transaction.
If this condition resolves to true
for an event, the current transaction is immediately
flushed with this event.
type
.Available syntaxes
Syntax | Description | Example |
---|---|---|
vrl | A Vector Remap Language (VRL) Boolean expression. | .status_code != 200 && !includes(["info", "debug"], .severity) |
datadog_search | A Datadog Search query string. | *stack |
is_log | Whether the incoming event is a log. |
|
is_metric | Whether the incoming event is a metric. |
|
is_trace | Whether the incoming event is a trace. |
|
Shorthand for VRL
If you opt for the vrl
syntax for this condition, you can set the condition
as a string via the condition
parameter, without needing to specify both a source
and a type
. The
table below shows some examples:
Config format | Example |
---|---|
YAML | condition: .status == 200 |
TOML | condition = ".status == 200" |
JSON | "condition": ".status == 200" |
Condition config examples
Standard VRL
ends_when:
type: "vrl"
source: ".status == 500"
ends_when = { type = "vrl", source = ".status == 500" }
"ends_when": {
"type": "vrl",
"source": ".status == 500"
}
expire_after_ms
optional uint30000
(milliseconds)flush_period_ms
optional uint1000
(milliseconds)graph
optional objectExtra graph configuration
Configure output for component when generated with graph command
graph.node_attributes
optional objectNode attributes to add to this component’s node in resulting graph
They are added to the node as provided
graph.node_attributes.*
required string literalgroup_by
optional [string]An ordered list of fields by which to group events.
Each group with matching values for the specified keys is reduced independently, allowing you to keep independent event streams separate. When no fields are specified, all events are combined in a single group.
For example, if group_by = ["host", "region"]
, then all incoming events that have the same
host and region are grouped together before being reduced.
inputs
required [string]A list of upstream source or transform IDs.
Wildcards (*
) are supported.
See configuration for more info.
max_events
optional uintmerge_strategies
optional objectA map of field names to custom merge strategies.
For each field specified, the given strategy is used for combining events rather than the default behavior.
The default behavior is as follows:
- The first value of a string field is kept and subsequent values are discarded.
- For timestamp fields the first is kept and a new field
[field-name]_end
is added with the last received timestamp value. - Numeric values are summed.
- For nested paths, the field value is retrieved and then reduced using the default strategies mentioned above (unless explicitly specified otherwise).
merge_strategies.*
required string literal enumOption | Description |
---|---|
array | Append each value to an array. |
concat | Concatenate each string value, delimited with a space. |
concat_newline | Concatenate each string value, delimited with a newline. |
concat_raw | Concatenate each string, without a delimiter. |
discard | Discard all but the first value found. |
flat_unique | Create a flattened array of all unique values. |
longest_array | Keep the longest array seen. |
max | Keep the maximum numeric value seen. |
min | Keep the minimum numeric value seen. |
retain | Discard all but the last value found. Works as a way to coalesce by not retaining |
shortest_array | Keep the shortest array seen. |
sum | Sum all numeric values. |
starts_when
optional conditionA condition used to distinguish the first event of a transaction.
If this condition resolves to true
for an event, the previous transaction is flushed
(without this event) and a new transaction is started.
type
.Available syntaxes
Syntax | Description | Example |
---|---|---|
vrl | A Vector Remap Language (VRL) Boolean expression. | .status_code != 200 && !includes(["info", "debug"], .severity) |
datadog_search | A Datadog Search query string. | *stack |
is_log | Whether the incoming event is a log. |
|
is_metric | Whether the incoming event is a metric. |
|
is_trace | Whether the incoming event is a trace. |
|
Shorthand for VRL
If you opt for the vrl
syntax for this condition, you can set the condition
as a string via the condition
parameter, without needing to specify both a source
and a type
. The
table below shows some examples:
Config format | Example |
---|---|
YAML | condition: .status == 200 |
TOML | condition = ".status == 200" |
JSON | "condition": ".status == 200" |
Condition config examples
Standard VRL
starts_when:
type: "vrl"
source: ".status == 500"
starts_when = { type = "vrl", source = ".status == 500" }
"starts_when": {
"type": "vrl",
"source": ".status == 500"
}
Outputs
<component_id>
Telemetry
Metrics
linkcomponent_discarded_events_total
counterfilter
transform, or false if due to an error.component_errors_total
countercomponent_received_event_bytes_total
countercomponent_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_received_events_total
countercomponent_sent_event_bytes_total
countercomponent_sent_events_total
counterstale_events_flushed_total
counterutilization
gaugeExamples
Merge Ruby exceptions
Given this event...[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:6:in `bar'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:2:in `foo'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
group_by:
- host
- pid
- tid
merge_strategies:
message: concat_newline
starts_when: match(string!(.message), r'^[^\s]')
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "host", "pid", "tid" ]
starts_when = "match(string!(.message), r'^[^\\s]')"
[transforms.my_transform_id.merge_strategies]
message = "concat_newline"
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"group_by": [
"host",
"pid",
"tid"
],
"merge_strategies": {
"message": "concat_newline"
},
"starts_when": "match(string!(.message), r'^[^\\s]')"
}
}
}
[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n from foobar.rb:6:in `bar'\n from foobar.rb:2:in `foo'\n from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
Reduce Rails logs into a single transaction
Given this event...[{"log":{"message":"Received GET /path","request_id":"abcd1234","request_params":{"key":"val"},"request_path":"/path","timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"message":"Executed query in 5.2ms","query":"SELECT * FROM table","query_duration_ms":5.2,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:21.832345Z"}},{"log":{"message":"Rendered partial _partial.erb in 2.3ms","render_duration_ms":2.3,"request_id":"abcd1234","template":"_partial.erb","timestamp":"2020-10-07T12:33:22.457423Z"}},{"log":{"message":"Executed query in 7.8ms","query":"SELECT * FROM table","query_duration_ms":7.8,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:22.543323Z"}},{"log":{"message":"Sent 200 in 15.2ms","request_id":"abcd1234","response_duration_ms":5.2,"response_status":200,"timestamp":"2020-10-07T12:33:22.742322Z"}}]
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
]
}
}
}
{
"query_duration_ms": 13,
"render_duration_ms": 2.3,
"request_id": "abcd1234",
"request_params": {
"key": "val"
},
"request_path": "/path",
"response_duration_ms": 5.2,
"status": 200,
"timestamp": "2020-10-07T12:33:21.223543Z",
"timestamp_end": "2020-10-07T12:33:22.742322Z"
}