cached
This component is experimental and therefore subject to change or removal outside of major version releases.
Cache the result of applying one or more processors to messages identified by a key. If the key already exists within the cache the contents of the message will be replaced with the cached result instead of applying the processors. This component is therefore useful in situations where an expensive set of processors need only be executed periodically.
Introduced in version 1.0.0.
# Config fields, showing default values
label: ""
cached:
cache: "" # No default (required)
skip_on: errored() # No default (optional)
key: my_foo_result # No default (required)
ttl: "" # No default (optional)
processors: [] # No default (required)
The format of the data when stored within the cache is a custom and versioned schema chosen to balance performance and storage space. It is therefore not possible to point this processor to a cache that is pre-populated with data that this processor has not created itself.
Examples
- Cached Enrichment
- Periodic Global Enrichment
In the following example we want to we enrich messages consumed from Kafka with data specific to the origin topic partition, we do this by placing an http
processor within a branch
, where the HTTP URL contains interpolation functions with the topic and partition in the path.
However, it would be inefficient to make this HTTP request for every single message as the result is consistent for all data of a given topic partition. We can solve this by placing our enrichment call within a cached
processor where the key contains the topic and partition, resulting in messages that originate from the same topic/partition combination using the cached result of the prior.
pipeline:
processors:
- branch:
processors:
- cached:
key: '${! metadata("kafka_topic") }-${! metadata("kafka_partition").string() }'
cache: foo_cache
processors:
- mapping: 'root = ""'
- http:
url: http://example.com/enrichment/${! metadata("kafka_topic") }/${! metadata("kafka_partition").string() }
verb: GET
result_map: 'root.enrichment = this'
cache_resources:
- label: foo_cache
memory:
# Disable compaction so that cached items never expire
compaction_interval: ""
In the following example we enrich all messages with the same data obtained from a static URL with an http
processor within a branch
. However, we expect the data from this URL to change roughly every 10 minutes, so we configure a cached
processor with a static key (since this request is consistent for all messages) and a TTL of 10m
.
pipeline:
processors:
- branch:
request_map: 'root = ""'
processors:
- cached:
key: static_foo
cache: foo_cache
ttl: 10m
processors:
- http:
url: http://example.com/get/foo.json
verb: GET
result_map: 'root.foo = this'
cache_resources:
- label: foo_cache
memory: {}
Fields
cache
The cache resource to read and write processor results from.
Type: string
skip_on
A condition that can be used to skip caching the results from the processors.
Type: string
# Examples
skip_on: errored()
key
A key to be resolved for each message, if the key already exists in the cache then the cached result is used, otherwise the processors are applied and the result is cached under this key. The key could be static and therefore apply generally to all messages or it could be an interpolated expression that is potentially unique for each message. This field supports interpolation functions.
Type: string
# Examples
key: my_foo_result
key: ${! this.document.id }
key: ${! metadata("kafka_key") }
key: ${! metadata("kafka_topic") }
ttl
An optional expiry period to set for each cache entry. Some caches only have a general TTL and will therefore ignore this setting. This field supports interpolation functions.
Type: string
processors
The list of processors whose result will be cached.
Type: array