Memory Enrichment Table
Introducing the memory enrichment table!
We are excited to announce the memory
enrichment table!
Special thanks to @esensar for implementing this feature and to Quad9 for sponsoring this work.
Data Model
The memory table operates on logs and accepts VRL objects. Each key-value pair is stored as a separate entry in the table, associating a value with its corresponding key. Value here refers to VRL values.
Building a cache
enrichment_tables:
memory_table:
type: memory
ttl: 60
flush_interval: 5
inputs: [ "cache_generator" ]
sources:
demo_logs_test:
type: "demo_logs"
format: "json"
transforms:
demo_logs_processor:
type: "remap"
inputs: [ "demo_logs_test" ]
source: |
. = parse_json!(.message)
user_id = get!(., path: ["user-identifier"])
# Check if we already have a cached value for this user in the enrichment table
existing, err = get_enrichment_table_record("memory_table", { "key": user_id })
if err == null {
# A cached value exists; reuse it.
# The `existing` object has this structure:
# { "key": user_id, "value": {...}, "ttl": 50 }
. = existing.value
.source = "cache"
} else {
# No cached value found, process the event and prepare new data
.referer = parse_url!(.referer)
.referer.host = encode_punycode!(.referer.host)
.source = "transform"
}
cache_generator:
type: "remap"
inputs: [ "demo_logs_processor" ]
source: |
# Check if this user is already in the cache
key_value = get!(., path: ["user-identifier"])
existing, err = get_enrichment_table_record("memory_table", { "key": key_value })
if err != null {
# No cached value found, store the processed data in the enrichment table
data = .
# The memory enrichment table stores all key-value pairs it receives.
# To structure it correctly, we create an object where:
# - The key is the "user-identifier".
# - The value is the rest of the processed event data.
. = set!(value: {}, path: [get!(data, path: ["user-identifier"])], data: data)
} else {
# Already cached, do nothing
. = {}
}
# After some time, processed events will start having their "source" set to "cache",
# indicating that the data is being retrieved from the enrichment table.
sinks:
console:
inputs: [ "demo_logs_processor" ]
target: "stdout"
type: "console"
encoding:
codec: "json"
json:
pretty: true
You can imagine a real world scenario where the demo_logs_processor
has to do some expensive
calculation the first time it encounters a key. Subsequently, every time the same key is encountered
the processing step will be skipped since the pre-computed value is present in the table. The values
can expire; in that case, the computation step will be repeated.
Populate with other components
This new table type also accepts inputs
, meaning that sources and/or transforms can populate it with data, similar to how a sink operates.
The data can then be queried like any other enrichment table. The configuration above demonstrates this behavior.
We plan to make this component even more flexible in the future. For example, it can also act as a source. This exercise raises some important questions on component flexibility. The end goal is treating components as nodes in a graph, unlocking even greater possibilities, such as chaining sinks.