Processors
Bento processors are functions applied to messages passing through a pipeline. The function signature allows a processor to mutate or drop messages depending on the content of the message. There are many types on offer but the most powerful are the mapping
and mutation
processors.
Processors are set via config, and depending on where in the config they are placed they will be run either immediately after a specific input (set in the input section), on all messages (set in the pipeline section) or before a specific output (set in the output section). Most processors apply to all messages and can be placed in the pipeline section:
pipeline:
threads: 1
processors:
- label: my_cool_mapping
mapping: |
root.message = this
root.meta.link_count = this.links.length()
The threads
field in the pipeline section determines how many parallel processing threads are created. You can read more about parallel processing in the pipeline guide.
Labels
Processors have an optional field label
that can uniquely identify them in observability data such as metrics and logs. This can be useful when running configs with multiple nested processors, otherwise their metrics labels will be generated based on their composition. For more information check out the metrics documentation.
Error Handling
Some processors have conditions whereby they might fail. Rather than throw these messages into the abyss Bento still attempts to send these messages onwards, and has mechanisms for filtering, recovering or dead-letter queuing messages that have failed which can be read about here.
The introduction of error_handling
overrides this behaviour. Instead, Bento will reject all batches containing errored messages; propogating a nack
to the input
layer.
Error Logs
Errors that occur during processing can be roughly separated into two groups; those that are unexpected intermittent errors such as connectivity problems, and those that are logical errors such as bad input data or unmatched schemas.
All processing errors result in the messages being flagged as failed, error metrics increasing for the given errored processor, and debug level logs being emitted that describe the error. Only errors that are known to be intermittent are also logged at the error level.
The reason for this behaviour is to prevent noisy logging in cases where logical errors are expected and will likely be handled in config. However, this can also sometimes make it easy to miss logical errors in your configs when they lack error handling. If you suspect you are experiencing processing errors and do not wish to add error handling yet then a quick and easy way to expose those errors is to enable debug level logs with the cli flag --log.level=debug
or by setting the level in config:
logger:
level: DEBUG
Error Handling
This configuration field is experimental and therefore breaking changes could be made to it outside of major version releases.
Introduced in v1.4.0
.
You can override Bento's default error handling using the error_handling.strategy
field, changing the behaviour across all processor components. For example, a reject
strategy treats any message-level error as a batch-wide failure, immediately rejecting any batch containing errored messages and propagating a nack
(negative acknowledgment) to the input layer. The handling of rejected messages then depends on the input component's nack
behavior - by default, triggering the reprocessing failed messages from scratch.
pipeline:
processors:
- mapping: |
root = throw("error")
- mapping: |
root.message = "I'm never reached"
error_handling:
strategy: reject
This behavior also bypasses Bento's traditional error handling mechanisms like catch
and try
(described in Error Handling) since the entire transaction is rejected before messages can reach error handling components.
More stable alternatives to error_handling
could be considered:
Future version will likely see more error_handling
strategies that allow for pipeline-wide handling.
Using Processors as Outputs
It might be the case that a processor that results in a side effect, such as the sql_insert
or redis
processors, is the only side effect of a pipeline, and therefore could be considered the output.
In such cases it's possible to place these processors within a reject
output so that they behave the same as regular outputs, where success results in dropping the message with an acknowledgement and failure results in a nack (or retry):
output:
reject: 'failed to send data: ${! error() }'
processors:
- try:
- redis:
url: tcp://localhost:6379
command: sadd
args_mapping: 'root = [ this.key, this.value ]'
- mapping: root = deleted()
The way this works is that if your processor with the side effect (redis
in this case) succeeds then the final mapping
processor deletes the message which results in an acknowledgement. If the processor fails then the try
block exits early without executing the mapping
processor and instead the message is routed to the reject
output, which nacks the message with an error message containing the error obtained from the redis
processor.
Categories
- Mapping
- Integration
- Parsing
- Composition
- Utility
- Services
- Aws
- Azure
Processors that specialize in restructuring messages.
awkbloblangjavascriptjmespathjqjson_schemamappingmutationProcessors that interact with external services.
aws_dynamodb_partiqlaws_lambdacachecommandcouchbasegcp_bigquery_selecthttpredisredis_scriptschema_registry_decodeschema_registry_encodesql_insertsql_rawsql_selectsubprocessProcessors that specialize in translating messages from one format to another.
archiveavrobloblangcompressdecompressgrokmappingmsgpackmutationparquet_decodeparquet_encodeparse_logprotobufschema_registry_decodeschema_registry_encodeunarchivexmlHigher level processors that compose other processors and modify their behavior.
branchcatchfor_eachgroup_bygroup_by_valueinsert_partparallelprocessorsretryswitchtrywhileworkflowProcessors that provide general utility or do not fit in another category.
archivebounds_checkcacheddedupelogmetricrate_limitresourceselect_partssleepsplitsync_responseunarchivewasmBatching and Multiple Part Messages
All Bento processors support multiple part messages, which are synonymous with batches. This enables some cool windowed processing capabilities.
Many processors are able to perform their behaviours on specific parts of a message batch, or on all parts, and have a field parts
for specifying an array of part indexes they should apply to. If the list of target parts is empty these processors will be applied to all message parts.
Part indexes can be negative, and if so the part will be selected from the end counting backwards starting from -1. E.g. if part = -1 then the selected part will be the last part of the message, if part = -2 then the part before the last element will be selected, and so on.
Some processors such as dedupe
act across an entire batch, when instead we might like to perform them on individual messages of a batch. In this case the for_each
processor can be used.
You can read more about batching in this document.