Skip to main content

s2

EXPERIMENTAL

This component is experimental and therefore subject to change or removal outside of major version releases.

Consumes records from S2 streams

Introduced in version 1.5.0.

# Common config fields, showing default values
input:
label: ""
s2:
basin: "" # No default (required)
access_token: "" # No default (required)
streams: null # No default (required)
cache: "" # No default (required)

Generate an authentication token by logging onto the web console at s2.dev.

Cache

The plugin requires setting up a caching mechanism to resume the input after the last acknowledged record.

To know more about setting up a cache resource, see Cache docs for Bento.

Metadata

This input adds the following metadata fields to each message in addition to the record headers:

  • s2_basin: The S2 basin where the origin stream lives.
  • s2_stream: The origin S2 stream.
  • s2_seq_num: Sequence number of the record in the origin stream formatted as a string.

All the header values are loosely converted to strings as metadata attributes.

Note: An S2 command record has no header name. This is set as the s2_command meta key.

Examples

Fetch records from all the streams with the prefix my-favorite-prefix/ in the basin.

cache_resources:
- label: s2_seq_num
file:
directory: s2_seq_num_cache

input:
label: s2_input
s2:
basin: my-favorite-basin
streams: my-favorite-prefix/
access_token: "${S2_ACCESS_TOKEN}"
cache: s2_seq_num

output:
label: stdout
stdout:
codec: lines

Fields

basin

Basin name

Type: string

access_token

Access token for S2 account

Secret

This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.

Type: string

streams

Streams prefix or list of streams to subscribe to

Type: unknown

cache

Cache resource label for storing sequence number

Type: string

max_in_flight

Optionally sets a limit on the number of messages that can be flowing through a Bento stream pending acknowledgment from the input at any given time. Once a message has been either acknowledged or rejected (nacked) it is no longer considered pending. If the input produces logical batches then each batch is considered a single count against the maximum. WARNING: Batching policies at the output level will stall if this field limits the number of messages below the batching threshold. Zero (default) or lower implies no limit.

Type: int
Default: 0

update_streams_interval

Interval after which the streams list should update dynamically

Type: string
Default: "1m"

backoff_duration

Interval to backoff for before reconnecting to a stream

Type: string
Default: "100ms"
Requires version 1.6.0 or newer

start_seq_num

Start consuming the stream from either the earliest or the latest sequence number

Type: string
Default: "earliest"
Requires version 1.6.0 or newer
Options: earliest, latest.