Memory Enrichment Table

Introducing the memory enrichment table!

We are excited to announce the memory enrichment table!

Special thanks to @esensar for implementing this feature and to Quad9 for sponsoring this work.

Data Model

The memory table operates on logs and accepts VRL objects. Each key-value pair is stored as a separate entry in the table, associating a value with its corresponding key. Value here refers to VRL values.

Building a cache

enrichment_tables:
  memory_table:
    type: memory
    ttl: 60
    flush_interval: 5
    inputs: [ "cache_generator" ]

sources:
  demo_logs_test:
    type: "demo_logs"
    format: "json"

transforms:
  demo_logs_processor:
    type: "remap"
    inputs: [ "demo_logs_test" ]
    source: |
      . = parse_json!(.message)
      user_id = get!(., path: ["user-identifier"])

      # Check if we already have a cached value for this user in the enrichment table
      existing, err = get_enrichment_table_record("memory_table", { "key": user_id })

      if err == null {
        # A cached value exists; reuse it.
        # The `existing` object has this structure:
        # { "key": user_id, "value": {...}, "ttl": 50 }
        . = existing.value
        .source = "cache"
      } else {
        # No cached value found, process the event and prepare new data
        .referer = parse_url!(.referer)
        .referer.host = encode_punycode!(.referer.host)
        .source = "transform"
      }      

  cache_generator:
    type: "remap"
    inputs: [ "demo_logs_processor" ]
    source: |
      # Check if this user is already in the cache
      key_value = get!(., path: ["user-identifier"])
      existing, err = get_enrichment_table_record("memory_table", { "key":  key_value })

      if err != null {
        # No cached value found, store the processed data in the enrichment table
        data = .

        # The memory enrichment table stores all key-value pairs it receives.
        # To structure it correctly, we create an object where:
        # - The key is the "user-identifier".
        # - The value is the rest of the processed event data.
        . = set!(value: {}, path: [get!(data, path: ["user-identifier"])], data: data)
      } else {
        # Already cached, do nothing
        . = {}
      }      

# After some time, processed events will start having their "source" set to "cache",
# indicating that the data is being retrieved from the enrichment table.
sinks:
  console:
    inputs: [ "demo_logs_processor" ]
    target: "stdout"
    type: "console"
    encoding:
      codec: "json"
      json:
        pretty: true

You can imagine a real world scenario where the demo_logs_processor has to do some expensive calculation the first time it encounters a key. Subsequently, every time the same key is encountered the processing step will be skipped since the pre-computed value is present in the table. The values can expire; in that case, the computation step will be repeated.

Use as a sink

This new table type can also be used as a sink to feed it data, which can then be queried like any other enrichment table. For example, here is how to introduce this new component as a sink if you have another source that can populate the cache:

  memory_table_sink:
    inputs: [ "another_source_or_transform" ]
    type: memory_enrichment_table
    ttl: 60
    flush_interval: 5

We plan to make this component even more flexible in the future. For example, it can also act as a source. This exercise raises some important questions on component flexibility. The end goal is treating components as nodes in a graph, unlocking even greater possibilities, such as chaining sinks.