Skip to main content

s2

BETA

This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.

Consumes records from S2 streams

# Common config fields, showing default values
input:
label: ""
s2:
basin: "" # No default (required)
auth_token: "" # No default (required)
streams: null # No default (required)
cache: "" # No default (required)

Generate an authentication token by logging onto the web console at s2.dev.

Cache

The plugin requires setting up a caching mechanism to resume the input after the last acknowledged record.

To know more about setting up a cache resource, see Cache docs for Bento.

Metadata

This input adds the following metadata fields to each message in addition to the record headers:

  • s2_basin: The S2 basin where the origin stream lives.
  • s2_stream: The origin S2 stream.
  • s2_seq_num: Sequence number of the record in the origin stream formatted as a string.

All the header values are loosely converted to strings as metadata attributes.

Note: An S2 command record has no header name. This is set as the s2_command meta key.

Examples

Fetch records from all the streams with the prefix my-favorite-prefix/ in the basin.

cache_resources:
- label: s2_seq_num
file:
directory: s2_seq_num_cache

input:
label: s2_input
s2:
basin: my-favorite-basin
streams: my-favorite-prefix/
auth_token: "${S2_AUTH_TOKEN}"
cache: s2_seq_num

output:
label: stdout
stdout:
codec: lines

Fields

basin

Basin name

Type: string

auth_token

Authentication token for S2 account

Secret

This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.

Type: string

streams

Streams prefix or list of streams to subscribe to

Type: unknown

cache

Cache resource label for storing sequence number

Type: string

max_in_flight

Optionally sets a limit on the number of messages that can be flowing through a Bento stream pending acknowledgment from the input at any given time. Once a message has been either acknowledged or rejected (nacked) it is no longer considered pending. If the input produces logical batches then each batch is considered a single count against the maximum. WARNING: Batching policies at the output level will stall if this field limits the number of messages below the batching threshold. Zero (default) or lower implies no limit.

Type: int
Default: 0

update_streams_interval

Interval after which the streams list should update dynamically

Type: string
Default: "1m"