s2
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
Sends messages to an S2 stream.
- Common
- Advanced
# Common config fields, showing default values
output:
label: ""
s2:
basin: "" # No default (required)
auth_token: "" # No default (required)
stream: "" # No default (required)
fencing_token: aGVsbG8gczI= # No default (optional)
# All config fields, showing default values
output:
label: ""
s2:
basin: "" # No default (required)
auth_token: "" # No default (required)
stream: "" # No default (required)
fencing_token: aGVsbG8gczI= # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
processors: [] # No default (optional)
max_in_flight: 64
Generate an authentication token by logging onto the web console at s2.dev.
Metadata
The metadata attributes are set as S2 record headers. Currently, only string attribute values are supported.
Batching
The plugin expects batched inputs. Messages are batched automatically by Bento.
By default, Bento disables batching based on count
, byte_size
, and period
parameters, but the plugin enables batching setting both count
and byte_size
to
the maximum values supported by S2. It also sets a flush period of 5ms
as a reasonable default.
Note: An S2 record batch can be a maximum of 1MiB but the plugin limits the size of a message to 256KiB since the Bento size limit doesn't take metadata into account. Moreover, the metered size of the same Bento message will be greater than the byte size of a Bento message.
Examples
- ASCII Starwars
Consume a network stream into an S2 stream
input:
label: towel_blinkenlights_nl
socket:
network: tcp
address: towel.blinkenlights.nl:23
scanner:
lines: {}
output:
label: s2_starwars
s2:
basin: my-favorite-basin
stream: starwars
auth_token: "${S2_AUTH_TOKEN}"
Fields
basin
Basin name
Type: string
auth_token
Authentication token for S2 account
This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.
Type: string
stream
Stream name
Type: string
fencing_token
Enforce a fencing token (base64 encoded)
Type: string
# Examples
fencing_token: aGVsbG8gczI=
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching:
count: 10
jitter: 0.1
period: 10s
batching.count
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.jitter
A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0
and jitter * period
. For example, with period: 100ms
and jitter: 0.1
, each flush will be delayed by a random duration between 0-10ms
.
Type: float
Default: 0
# Examples
jitter: 0.01
jitter: 0.1
jitter: 1
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 64