Skip to main content

aws_s3_stream

BETA

This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.

Streams data to S3 using multipart uploads.

Introduced in version 1.16.0.

# Common config fields, showing default values
output:
label: ""
aws_s3_stream:
bucket: "" # No default (required)
path: logs/${! timestamp_unix() }-${! uuid_v4() }.log # No default (required)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""

This output writes to S3 using multipart uploads, streaming content incrementally rather than buffering entire files in memory. This makes it ideal for writing large files or continuous streams where memory efficiency is critical.

The partition_by parameter allows you to maintain separate S3 multipart uploads for different partition values. Messages with matching partition values are written to the same file, and the full path expression is evaluated only once per partition (allowing use of functions like uuid_v4() for unique filenames). Without partition_by, each message evaluates the full path independently.

When to Use

Use aws_s3_stream instead of aws_s3 when:

  • Writing large files (>100MB) where memory usage is a concern
  • Streaming continuous data in memory-constrained environments
  • You need per-partition file grouping with dynamic paths

Credentials

By default Bento will use a shared credentials file when connecting to AWS services. You can find out more in this document.

Examples

This example writes streaming log files partitioned by date and service to S3.

output:
aws_s3_stream:
bucket: my-logs-bucket
path: 'logs/date=${! meta("date") }/service=${! meta("service") }/${! uuid_v4() }.log'

# Messages with same date+service go to same file
partition_by:
- '${! meta("date") }'
- '${! meta("service") }'

max_buffer_bytes: 10485760 # 10MB
max_buffer_count: 10000
max_buffer_period: 10s

Fields

bucket

The S3 bucket to upload files to.

Type: string

path

The path for each file. This field supports interpolation functions.

Type: string

# Examples

path: logs/${! timestamp_unix() }-${! uuid_v4() }.log

path: data/date=${! meta("date") }/account=${! meta("account") }/${! uuid_v4() }.json

partition_by

Optional list of interpolated string expressions that determine writer partitioning. Messages with the same partition values are written to the same file. The full path is only evaluated once when a new partition is encountered. This allows using functions like uuid_v4() in the path for unique filenames per partition. If omitted, the full path is evaluated per message for backwards compatibility. This field supports interpolation functions.

Type: array

# Examples

partition_by:
- ${! meta("date") }
- ${! meta("account") }

force_path_style_urls

Forces path style URLs for S3 requests.

Type: bool
Default: false

max_buffer_bytes

Maximum buffer size in bytes before flushing to S3. Default is 10MB.

Type: int
Default: 10485760

max_buffer_count

Maximum number of messages to buffer before flushing to S3.

Type: int
Default: 10000

max_buffer_period

Maximum duration to buffer messages before flushing to S3.

Type: string
Default: "10s"

content_type

The content type to set for uploaded files. This field supports interpolation functions.

Type: string
Default: "application/octet-stream"

content_encoding

The content encoding to set for uploaded files (e.g., gzip). This field supports interpolation functions.

Type: string

max_retries

The maximum number of retries for each individual part upload. Set to zero to disable retries.

Type: int
Default: 2

backoff

Determine time intervals and cut offs for retry attempts.

Type: object

backoff.initial_interval

The initial period to wait between retry attempts.

Type: string
Default: "1s"

# Examples

initial_interval: 50ms

initial_interval: 1s

backoff.max_interval

The maximum period to wait between retry attempts

Type: string
Default: "5s"

# Examples

max_interval: 5s

max_interval: 1m

backoff.max_elapsed_time

The maximum overall period of time to spend on retry attempts before the request is aborted.

Type: string
Default: "30s"

# Examples

max_elapsed_time: 1m

max_elapsed_time: 1h

region

The AWS region to target.

Type: string
Default: ""

endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string
Default: ""

credentials

Optional manual configuration of AWS credentials to use. More information can be found in this document.

Type: object

credentials.profile

A profile from ~/.aws/credentials to use.

Type: string
Default: ""

credentials.id

The ID of credentials to use.

Type: string
Default: ""

credentials.secret

The secret for the credentials being used.

Secret

This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.

Type: string
Default: ""

credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string
Default: ""

credentials.from_ec2_role

Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.

Type: bool
Default: false
Requires version 1.0.0 or newer

credentials.role

A role ARN to assume.

Type: string
Default: ""

credentials.role_external_id

An external ID to provide when assuming a role.

Type: string
Default: ""

credentials.expiry_window

Allow the credentials to trigger refreshing prior to the credentials actually expiring. This is beneficial so race conditions with expiring credentials do not cause requests to fail. For example '10s' would refresh credentials ten seconds before expiration. Setting to a duration of 0 disables the expiry window.

Type: string
Default: ""

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 64

batching

Allows you to configure a batching policy.

Type: object

# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching:
count: 10
jitter: 0.1
period: 10s

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.jitter

A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0 and jitter * period. For example, with period: 100ms and jitter: 0.1, each flush will be delayed by a random duration between 0-10ms.

Type: float
Default: 0

# Examples

jitter: 0.01

jitter: 0.1

jitter: 1

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array