Memory Enrichment Table

Introducing the memory enrichment table!

We are excited to announce the memory enrichment table!

Special thanks to @esensar for implementing this feature and to Quad9 for sponsoring this work.

Data Model

The memory table operates on logs and accepts VRL objects. Each key-value pair is stored as a separate entry in the table, associating a value with its corresponding key. Value here refers to VRL values.

Building a cache

enrichment_tables:
  memory_table:
    type: memory
    ttl: 60
    flush_interval: 5
    inputs: [ "cache_generator" ]

sources:
  demo_logs_test:
    type: "demo_logs"
    format: "json"

transforms:
  demo_logs_processor:
    type: "remap"
    inputs: [ "demo_logs_test" ]
    source: |
      . = parse_json!(.message)
      user_id = get!(., path: ["user-identifier"])

      # Check if we already have a cached value for this user in the enrichment table
      existing, err = get_enrichment_table_record("memory_table", { "key": user_id })

      if err == null {
        # A cached value exists; reuse it.
        # The `existing` object has this structure:
        # { "key": user_id, "value": {...}, "ttl": 50 }
        . = existing.value
        .source = "cache"
      } else {
        # No cached value found, process the event and prepare new data
        .referer = parse_url!(.referer)
        .referer.host = encode_punycode!(.referer.host)
        .source = "transform"
      }      

  cache_generator:
    type: "remap"
    inputs: [ "demo_logs_processor" ]
    source: |
      # Check if this user is already in the cache
      key_value = get!(., path: ["user-identifier"])
      existing, err = get_enrichment_table_record("memory_table", { "key":  key_value })

      if err != null {
        # No cached value found, store the processed data in the enrichment table
        data = .

        # The memory enrichment table stores all key-value pairs it receives.
        # To structure it correctly, we create an object where:
        # - The key is the "user-identifier".
        # - The value is the rest of the processed event data.
        . = set!(value: {}, path: [get!(data, path: ["user-identifier"])], data: data)
      } else {
        # Already cached, do nothing
        . = {}
      }      

# After some time, processed events will start having their "source" set to "cache",
# indicating that the data is being retrieved from the enrichment table.
sinks:
  console:
    inputs: [ "demo_logs_processor" ]
    target: "stdout"
    type: "console"
    encoding:
      codec: "json"
      json:
        pretty: true

You can imagine a real world scenario where the demo_logs_processor has to do some expensive calculation the first time it encounters a key. Subsequently, every time the same key is encountered the processing step will be skipped since the pre-computed value is present in the table. The values can expire; in that case, the computation step will be repeated.

Populate with other components

This new table type also accepts inputs, meaning that sources and/or transforms can populate it with data, similar to how a sink operates. The data can then be queried like any other enrichment table. The configuration above demonstrates this behavior.

We plan to make this component even more flexible in the future. For example, it can also act as a source. This exercise raises some important questions on component flexibility. The end goal is treating components as nodes in a graph, unlocking even greater possibilities, such as chaining sinks.