Reduce

Collapse multiple log events into a single event based on a set of conditions and merge strategies

status: stable egress: stream state: stateful
Reduces multiple log events into a single log event based on a set of conditions and merge strategies.

Configuration

Example configurations

{
  "transforms": {
    "my_transform_id": {
      "type": "reduce",
      "inputs": [
        "my-source-or-transform-id"
      ]
    }
  }
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
transforms:
  my_transform_id:
    type: reduce
    inputs:
      - my-source-or-transform-id
{
  "transforms": {
    "my_transform_id": {
      "type": "reduce",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "expire_after_ms": 30000,
      "flush_period_ms": 1000,
      "group_by": [
        "request_id"
      ]
    }
  }
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
expire_after_ms = 30_000
flush_period_ms = 1_000
group_by = [ "request_id" ]
transforms:
  my_transform_id:
    type: reduce
    inputs:
      - my-source-or-transform-id
    expire_after_ms: 30000
    flush_period_ms: 1000
    group_by:
      - request_id

end_every_period_ms

optional uint
If supplied, every time this interval elapses for a given grouping, the reduced value for that grouping is flushed. Checked every flush_period_ms.

ends_when

optional condition

A condition used to distinguish the final event of a transaction.

If this condition resolves to true for an event, the current transaction is immediately flushed with this event.

ends_when.source

The text of the condition. The syntax of the condition depends on the value of type.

ends_when.type

The type of condition to supply. See Available syntaxes below for a list of available types for this transform.

Available syntaxes

SyntaxDescriptionExample
vrlA Vector Remap Language (VRL) Boolean expression..status_code != 200 && !includes(["info", "debug"], .severity)
datadog_searchA Datadog Search query string.*stack
is_logWhether the incoming event is a log.
is_metricWhether the incoming event is a metric.
is_traceWhether the incoming event is a trace.

Shorthand for VRL

If you opt for the vrl syntax for this condition, you can set the condition as a string via the condition parameter, without needing to specify both a source and a type. The table below shows some examples:

Config formatExample
YAMLcondition: .status == 200
TOMLcondition = ".status == 200"
JSON"condition": ".status == 200"

Condition config examples

Standard VRL

ends_when:
  type: "vrl"
  source: ".status == 500"
ends_when = { type = "vrl", source = ".status == 500" }
"ends_when": {
  "type": "vrl",
  "source": ".status == 500"
}
ends_when:
  type: "datadog_search"
  source: "*stack"
ends_when = { type = "datadog_search", source = "*stack" }
"ends_when": {
  "type": "datadog_search",
  "source": "*stack"
}

VRL shorthand

ends_when: ".status == 500"
ends_when = ".status == 500"
"ends_when": ".status == 500"

expire_after_ms

optional uint
The maximum period of time to wait after the last event is received, in milliseconds, before a combined event should be considered complete.
default: 30000 (milliseconds)

flush_period_ms

optional uint
The interval to check for and flush any expired events, in milliseconds.
default: 1000 (milliseconds)

graph

optional object

Extra graph configuration

Configure output for component when generated with graph command

graph.node_attributes

optional object

Node attributes to add to this component’s node in resulting graph

They are added to the node as provided

graph.node_attributes.*
required string literal
A single graph node attribute in graphviz DOT language.
Examples
{
  "color": "red",
  "name": "Example Node",
  "width": "5.0"
}

group_by

optional [string]

An ordered list of fields by which to group events.

Each group with matching values for the specified keys is reduced independently, allowing you to keep independent event streams separate. When no fields are specified, all events are combined in a single group.

For example, if group_by = ["host", "region"], then all incoming events that have the same host and region are grouped together before being reduced.

Array string literal
Examples
[
  "request_id",
  "user_id",
  "transaction_id"
]

inputs

required [string]

A list of upstream source or transform IDs.

Wildcards (*) are supported.

See configuration for more info.

Array string literal
Examples
[
  "my-source-or-transform-id",
  "prefix-*"
]

max_events

optional uint
The maximum number of events to group together.

merge_strategies

optional object

A map of field names to custom merge strategies.

For each field specified, the given strategy is used for combining events rather than the default behavior.

The default behavior is as follows:

  • The first value of a string field is kept and subsequent values are discarded.
  • For timestamp fields the first is kept and a new field [field-name]_end is added with the last received timestamp value.
  • Numeric values are summed.
  • For nested paths, the field value is retrieved and then reduced using the default strategies mentioned above (unless explicitly specified otherwise).

merge_strategies.*

required string literal enum
An individual merge strategy.
Enum options
OptionDescription
arrayAppend each value to an array.
concatConcatenate each string value, delimited with a space.
concat_newlineConcatenate each string value, delimited with a newline.
concat_rawConcatenate each string, without a delimiter.
discardDiscard all but the first value found.
flat_uniqueCreate a flattened array of all unique values.
longest_arrayKeep the longest array seen.
maxKeep the maximum numeric value seen.
minKeep the minimum numeric value seen.
retain

Discard all but the last value found.

Works as a way to coalesce by not retaining null.

shortest_arrayKeep the shortest array seen.
sumSum all numeric values.
Examples
"array"
"concat"
"concat_newline"
"concat_raw"
"discard"
"flat_unique"
"longest_array"
"max"
"min"
"retain"
"shortest_array"
"sum"

starts_when

optional condition

A condition used to distinguish the first event of a transaction.

If this condition resolves to true for an event, the previous transaction is flushed (without this event) and a new transaction is started.

starts_when.source

The text of the condition. The syntax of the condition depends on the value of type.

starts_when.type

The type of condition to supply. See Available syntaxes below for a list of available types for this transform.

Available syntaxes

SyntaxDescriptionExample
vrlA Vector Remap Language (VRL) Boolean expression..status_code != 200 && !includes(["info", "debug"], .severity)
datadog_searchA Datadog Search query string.*stack
is_logWhether the incoming event is a log.
is_metricWhether the incoming event is a metric.
is_traceWhether the incoming event is a trace.

Shorthand for VRL

If you opt for the vrl syntax for this condition, you can set the condition as a string via the condition parameter, without needing to specify both a source and a type. The table below shows some examples:

Config formatExample
YAMLcondition: .status == 200
TOMLcondition = ".status == 200"
JSON"condition": ".status == 200"

Condition config examples

Standard VRL

starts_when:
  type: "vrl"
  source: ".status == 500"
starts_when = { type = "vrl", source = ".status == 500" }
"starts_when": {
  "type": "vrl",
  "source": ".status == 500"
}
starts_when:
  type: "datadog_search"
  source: "*stack"
starts_when = { type = "datadog_search", source = "*stack" }
"starts_when": {
  "type": "datadog_search",
  "source": "*stack"
}

VRL shorthand

starts_when: ".status == 500"
starts_when = ".status == 500"
"starts_when": ".status == 500"

Outputs

<component_id>

Default output stream of the component. Use this component’s ID as an input to downstream transforms and sinks.

Telemetry

Metrics

link

component_discarded_events_total

counter
The number of events dropped by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
intentional
True if the events were discarded intentionally, like a filter transform, or false if due to an error.
pid optional
The process ID of the Vector instance.

component_errors_total

counter
The total number of errors encountered by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
error_type
The type of the error
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
stage
The stage within the component at which the error occurred.

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_count

histogram

A histogram of the number of events passed in each internal batch in Vector’s internal topology.

Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.

component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

stale_events_flushed_total

counter
The number of stale events that Vector has flushed.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

utilization

gauge
A ratio from 0 to 1 of the load on a component. A value of 0 would indicate a completely idle component that is simply waiting for input. A value of 1 would indicate a that is never idle. This value is updated every 5 seconds.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

Examples

Merge Ruby exceptions

Given this event...
[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"    from foobar.rb:6:in `bar'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"    from foobar.rb:2:in `foo'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"    from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
...and this configuration...
transforms:
  my_transform_id:
    type: reduce
    inputs:
      - my-source-or-transform-id
    group_by:
      - host
      - pid
      - tid
    merge_strategies:
      message: concat_newline
    starts_when: match(string!(.message), r'^[^\s]')
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "host", "pid", "tid" ]
starts_when = "match(string!(.message), r'^[^\\s]')"

  [transforms.my_transform_id.merge_strategies]
  message = "concat_newline"
{
  "transforms": {
    "my_transform_id": {
      "type": "reduce",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "group_by": [
        "host",
        "pid",
        "tid"
      ],
      "merge_strategies": {
        "message": "concat_newline"
      },
      "starts_when": "match(string!(.message), r'^[^\\s]')"
    }
  }
}
...this Vector event is produced:
[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n    from foobar.rb:6:in `bar'\n    from foobar.rb:2:in `foo'\n    from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]

Reduce Rails logs into a single transaction

Given this event...
[{"log":{"message":"Received GET /path","request_id":"abcd1234","request_params":{"key":"val"},"request_path":"/path","timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"message":"Executed query in 5.2ms","query":"SELECT * FROM table","query_duration_ms":5.2,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:21.832345Z"}},{"log":{"message":"Rendered partial _partial.erb in 2.3ms","render_duration_ms":2.3,"request_id":"abcd1234","template":"_partial.erb","timestamp":"2020-10-07T12:33:22.457423Z"}},{"log":{"message":"Executed query in 7.8ms","query":"SELECT * FROM table","query_duration_ms":7.8,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:22.543323Z"}},{"log":{"message":"Sent 200 in 15.2ms","request_id":"abcd1234","response_duration_ms":5.2,"response_status":200,"timestamp":"2020-10-07T12:33:22.742322Z"}}]
...and this configuration...
transforms:
  my_transform_id:
    type: reduce
    inputs:
      - my-source-or-transform-id
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
{
  "transforms": {
    "my_transform_id": {
      "type": "reduce",
      "inputs": [
        "my-source-or-transform-id"
      ]
    }
  }
}
...this Vector event is produced:
{
  "query_duration_ms": 13,
  "render_duration_ms": 2.3,
  "request_id": "abcd1234",
  "request_params": {
    "key": "val"
  },
  "request_path": "/path",
  "response_duration_ms": 5.2,
  "status": 200,
  "timestamp": "2020-10-07T12:33:21.223543Z",
  "timestamp_end": "2020-10-07T12:33:22.742322Z"
}

How it works

State

This component is stateful, meaning its behavior changes based on previous inputs (events). State is not preserved across restarts, therefore state-dependent behavior will reset between restarts and depend on the inputs (events) received since the most recent restart.