aws_s3_stream
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
Streams data to S3 using multipart uploads.
Introduced in version 1.16.0.
- Common
- Advanced
# Common config fields, showing default values
output:
label: ""
aws_s3_stream:
bucket: "" # No default (required)
path: logs/${! timestamp_unix() }-${! uuid_v4() }.log # No default (required)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
# All config fields, showing default values
output:
label: ""
aws_s3_stream:
bucket: "" # No default (required)
path: logs/${! timestamp_unix() }-${! uuid_v4() }.log # No default (required)
partition_by: [] # No default (optional)
force_path_style_urls: false
max_buffer_bytes: 10485760
max_buffer_count: 10000
max_buffer_period: 10s
content_type: application/octet-stream
content_encoding: "" # No default (optional)
max_retries: 2
backoff:
initial_interval: 1s
max_interval: 5s
max_elapsed_time: 30s
region: ""
endpoint: ""
credentials:
profile: ""
id: ""
secret: ""
token: ""
from_ec2_role: false
role: ""
role_external_id: ""
expiry_window: ""
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
processors: [] # No default (optional)
This output writes to S3 using multipart uploads, streaming content incrementally rather than buffering entire files in memory. This makes it ideal for writing large files or continuous streams where memory efficiency is critical.
The partition_by parameter allows you to maintain separate S3 multipart uploads for different
partition values. Messages with matching partition values are written to the same file, and the full
path expression is evaluated only once per partition (allowing use of functions like uuid_v4()
for unique filenames). Without partition_by, each message evaluates the full path independently.
When to Use
Use aws_s3_stream instead of aws_s3 when:
- Writing large files (>100MB) where memory usage is a concern
- Streaming continuous data in memory-constrained environments
- You need per-partition file grouping with dynamic paths
Credentials
By default Bento will use a shared credentials file when connecting to AWS services. You can find out more in this document.
Examples
- Writing Partitioned Log Files
- Low Memory JSON Streaming
This example writes streaming log files partitioned by date and service to S3.
output:
aws_s3_stream:
bucket: my-logs-bucket
path: 'logs/date=${! meta("date") }/service=${! meta("service") }/${! uuid_v4() }.log'
# Messages with same date+service go to same file
partition_by:
- '${! meta("date") }'
- '${! meta("service") }'
max_buffer_bytes: 10485760 # 10MB
max_buffer_count: 10000
max_buffer_period: 10s
This example demonstrates memory-efficient streaming for large JSON datasets.
output:
aws_s3_stream:
bucket: data-lake
path: 'events/date=${! now().ts_format("2006-01-02") }/${! uuid_v4() }.json'
content_type: application/json
batching:
count: 10000
period: 10s
processors:
- archive:
format: lines
Fields
bucket
The S3 bucket to upload files to.
Type: string
path
The path for each file. This field supports interpolation functions.
Type: string
# Examples
path: logs/${! timestamp_unix() }-${! uuid_v4() }.log
path: data/date=${! meta("date") }/account=${! meta("account") }/${! uuid_v4() }.json
partition_by
Optional list of interpolated string expressions that determine writer partitioning. Messages with the same partition values are written to the same file. The full path is only evaluated once when a new partition is encountered. This allows using functions like uuid_v4() in the path for unique filenames per partition. If omitted, the full path is evaluated per message for backwards compatibility. This field supports interpolation functions.
Type: array
# Examples
partition_by:
- ${! meta("date") }
- ${! meta("account") }
force_path_style_urls
Forces path style URLs for S3 requests.
Type: bool
Default: false
max_buffer_bytes
Maximum buffer size in bytes before flushing to S3. Default is 10MB.
Type: int
Default: 10485760
max_buffer_count
Maximum number of messages to buffer before flushing to S3.
Type: int
Default: 10000
max_buffer_period
Maximum duration to buffer messages before flushing to S3.
Type: string
Default: "10s"
content_type
The content type to set for uploaded files. This field supports interpolation functions.
Type: string
Default: "application/octet-stream"
content_encoding
The content encoding to set for uploaded files (e.g., gzip). This field supports interpolation functions.
Type: string
max_retries
The maximum number of retries for each individual part upload. Set to zero to disable retries.
Type: int
Default: 2
backoff
Determine time intervals and cut offs for retry attempts.
Type: object
backoff.initial_interval
The initial period to wait between retry attempts.
Type: string
Default: "1s"
# Examples
initial_interval: 50ms
initial_interval: 1s
backoff.max_interval
The maximum period to wait between retry attempts
Type: string
Default: "5s"
# Examples
max_interval: 5s
max_interval: 1m
backoff.max_elapsed_time
The maximum overall period of time to spend on retry attempts before the request is aborted.
Type: string
Default: "30s"
# Examples
max_elapsed_time: 1m
max_elapsed_time: 1h
region
The AWS region to target.
Type: string
Default: ""
endpoint
Allows you to specify a custom endpoint for the AWS API.
Type: string
Default: ""
credentials
Optional manual configuration of AWS credentials to use. More information can be found in this document.
Type: object
credentials.profile
A profile from ~/.aws/credentials to use.
Type: string
Default: ""
credentials.id
The ID of credentials to use.
Type: string
Default: ""
credentials.secret
The secret for the credentials being used.
This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.
Type: string
Default: ""
credentials.token
The token for the credentials being used, required when using short term credentials.
Type: string
Default: ""
credentials.from_ec2_role
Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.
Type: bool
Default: false
Requires version 1.0.0 or newer
credentials.role
A role ARN to assume.
Type: string
Default: ""
credentials.role_external_id
An external ID to provide when assuming a role.
Type: string
Default: ""
credentials.expiry_window
Allow the credentials to trigger refreshing prior to the credentials actually expiring. This is beneficial so race conditions with expiring credentials do not cause requests to fail. For example '10s' would refresh credentials ten seconds before expiration. Setting to a duration of 0 disables the expiry window.
Type: string
Default: ""
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 64
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching:
count: 10
jitter: 0.1
period: 10s
batching.count
A number of messages at which the batch should be flushed. If 0 disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0 disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.jitter
A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0 and jitter * period. For example, with period: 100ms and jitter: 0.1, each flush will be delayed by a random duration between 0-10ms.
Type: float
Default: 0
# Examples
jitter: 0.01
jitter: 0.1
jitter: 1
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array