read_until
Reads messages from a child input until a consumed message passes a Bloblang query, at which point the input closes. It is also possible to configure a timeout after which the input is closed if no new messages arrive in that period.
# Config fields, showing default values
input:
label: ""
read_until:
input: null # No default (required)
check: this.type == "foo" # No default (optional)
idle_timeout: 5s # No default (optional)
restart_input: false
Messages are read continuously while the query check returns false, when the query returns true the message that triggered the check is sent out and the input is closed. Use this to define inputs where the stream should end once a certain message appears.
If the idle timeout is configured, the input will be closed if no new messages arrive after that period of time. Use this field if you want to empty out and close an input that doesn't have a logical end.
Sometimes inputs close themselves. For example, when the file
input type reaches the end of a file it will shut down. By default this type will also shut down. If you wish for the input type to be restarted every time it shuts down until the query check is met then set restart_input
to true
.
Metadata
A metadata key bento_read_until
containing the value final
is added to the first part of the message that triggers the input to stop.
Fields
input
The child input to consume from.
Type: input
check
A Bloblang query that should return a boolean value indicating whether the input should now be closed.
Type: string
# Examples
check: this.type == "foo"
check: count("messages") >= 100
idle_timeout
The maximum amount of time without receiving new messages after which the input is closed.
Type: string
# Examples
idle_timeout: 5s
restart_input
Whether the input should be reopened if it closes itself before the condition has resolved to true.
Type: bool
Default: false
Examples
- Consume N Messages
- Read from a kafka and close when empty
A common reason to use this input is to consume only N messages from an input and then stop. This can easily be done with the count
function:
# Only read 100 messages, and then exit.
input:
read_until:
check: count("messages") >= 100
input:
kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
A common reason to use this input is a job that consumes all messages and exits once its empty:
# Consumes all messages and exit when the last message was consumed 5s ago.
input:
read_until:
idle_timeout: 5s
input:
kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup