Skip to main content

s2

BETA

This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.

Sends messages to an S2 stream.

# Common config fields, showing default values
output:
label: ""
s2:
basin: "" # No default (required)
auth_token: "" # No default (required)
stream: "" # No default (required)
fencing_token: aGVsbG8gczI= # No default (optional)

Generate an authentication token by logging onto the web console at s2.dev.

Metadata

The metadata attributes are set as S2 record headers. Currently, only string attribute values are supported.

Batching

The plugin expects batched inputs. Messages are batched automatically by Bento.

By default, Bento disables batching based on count, byte_size, and period parameters, but the plugin enables batching setting both count and byte_size to the maximum values supported by S2. It also sets a flush period of 5ms as a reasonable default.

Note: An S2 record batch can be a maximum of 1MiB but the plugin limits the size of a message to 256KiB since the Bento size limit doesn't take metadata into account. Moreover, the metered size of the same Bento message will be greater than the byte size of a Bento message.

Examples

Consume a network stream into an S2 stream

input:
label: towel_blinkenlights_nl
socket:
network: tcp
address: towel.blinkenlights.nl:23
scanner:
lines: {}

output:
label: s2_starwars
s2:
basin: my-favorite-basin
stream: starwars
auth_token: "${S2_AUTH_TOKEN}"

Fields

basin

Basin name

Type: string

auth_token

Authentication token for S2 account

Secret

This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.

Type: string

stream

Stream name

Type: string

fencing_token

Enforce a fencing token (base64 encoded)

Type: string

# Examples

fencing_token: aGVsbG8gczI=

batching

Allows you to configure a batching policy.

Type: object

# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching:
count: 10
jitter: 0.1
period: 10s

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.jitter

A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0 and jitter * period. For example, with period: 100ms and jitter: 0.1, each flush will be delayed by a random duration between 0-10ms.

Type: float
Default: 0

# Examples

jitter: 0.01

jitter: 0.1

jitter: 1

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 64