cached
This component is experimental and therefore subject to change or removal outside of major version releases.
Cache the result of applying one or more processors to messages identified by a key. If the key already exists within the cache the contents of the message will be replaced with the cached result instead of applying the processors. This component is therefore useful in situations where an expensive set of processors need only be executed periodically.
Introduced in version 1.0.0.
# Config fields, showing default values
label: ""
cached:
cache: "" # No default (required)
skip_on: errored() # No default (optional)
key: my_foo_result # No default (required)
ttl: "" # No default (optional)
processors: [] # No default (required)
The format of the data when stored within the cache is a custom and versioned schema chosen to balance performance and storage space. It is therefore not possible to point this processor to a cache that is pre-populated with data that this processor has not created itself.
Examples
- Cached Enrichment
- Periodic Global Enrichment
In the following example we want to we enrich messages consumed from Kafka with data specific to the origin topic partition, we do this by placing an http
processor within a branch
, where the HTTP URL contains interpolation functions with the topic and partition in the path.
However, it would be inefficient to make this HTTP request for every single message as the result is consistent for all data of a given topic partition. We can solve this by placing our enrichment call within a cached
processor where the key contains the topic and partition, resulting in messages that originate from the same topic/partition combination using the cached result of the prior.
pipeline:
processors:
- branch:
processors:
- cached:
key: '${! meta("kafka_topic") }-${! meta("kafka_partition") }'
cache: foo_cache
processors:
- mapping: 'root = ""'
- http:
url: http://example.com/enrichment/${! meta("kafka_topic") }/${! meta("kafka_partition") }
verb: GET
result_map: 'root.enrichment = this'
cache_resources:
- label: foo_cache
memory:
# Disable compaction so that cached items never expire
compaction_interval: ""
In the following example we enrich all messages with the same data obtained from a static URL with an http
processor within a branch
. However, we expect the data from this URL to change roughly every 10 minutes, so we configure a cached
processor with a static key (since this request is consistent for all messages) and a TTL of 10m
.
pipeline:
processors:
- branch:
request_map: 'root = ""'
processors:
- cached:
key: static_foo
cache: foo_cache
ttl: 10m
processors:
- http:
url: http://example.com/get/foo.json
verb: GET
result_map: 'root.foo = this'
cache_resources:
- label: foo_cache
memory: {}
Fields
cache
The cache resource to read and write processor results from.
Type: string
skip_on
A condition that can be used to skip caching the results from the processors.
Type: string
# Examples
skip_on: errored()