duckdb_append
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
Inserts rows into a DuckDB database using the Appender API.
- Common
- Advanced
# Common config fields, showing default values
output:
label: ""
duckdb_append:
dsn: /data/bento.duckdb # No default (required)
table: "" # No default (required)
schema: ""
columns: [] # No default (required)
args_mapping: root = [ this.duck, this.coins, this.deposited_at ] # No default (required)
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
# All config fields, showing default values
output:
label: ""
duckdb_append:
dsn: /data/bento.duckdb # No default (required)
table: "" # No default (required)
schema: ""
columns: [] # No default (required)
args_mapping: root = [ this.duck, this.coins, this.deposited_at ] # No default (required)
init_files: [] # No default (optional)
init_statement: | # No default (optional)
CREATE TABLE IF NOT EXISTS some_table (
foo varchar(50) not null,
bar integer,
baz varchar(50),
primary key (foo)
) WITHOUT ROWID;
init_verify_conn: false
conn_max_idle_time: "" # No default (optional)
conn_max_life_time: "" # No default (optional)
conn_max_idle: 2
conn_max_open: 0 # No default (optional)
secret_name: "" # No default (optional)
iam_enabled: false
azure:
entra_enabled: false
token_request_options:
claims: ""
enable_cae: false
scopes:
- https://ossrdbms-aad.database.windows.net/.default
tenant_id: ""
region: ""
endpoint: ""
credentials:
profile: ""
id: ""
secret: ""
token: ""
from_ec2_role: false
role: ""
role_external_id: ""
expiry_window: ""
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
processors: [] # No default (optional)
Writes rows directly into DuckDB's columnar storage via the Appender API, bypassing SQL parsing entirely. Faster than sql_insert for bulk ingestion.
The target table must exist before the component connects. Use init_statement to create it if needed.
Type coercions applied to args_mapping output before passing to the Appender:
- JSON integers (
json.Number) →int64 - JSON decimals (
json.Number) →float64 - RFC3339 strings →
time.Time(for TIMESTAMP columns) - Nested types (maps, slices) are recursively coerced for STRUCT and LIST columns
- All other types passed through unchanged.
Rows are flushed to disk on every WriteBatch call. When the component shuts down, Close() also flushes any remaining buffered rows.
Examples
- Table Insert via Appender
Insert rows into a database by bulk-loading them directly into DuckDB's columnar storage via the Appender (no SQL parsing overhead).
output:
duckdb_append:
dsn: /data/vault.duckdb
table: vault_deposits
columns: [ deposit_id, duck, coins, denomination, deposited_at ]
args_mapping: |
root = [
this.deposit_id,
this.duck,
this.coins,
this.denomination,
this.deposited_at,
]
init_statement: |
CREATE TABLE IF NOT EXISTS vault_deposits (
deposit_id VARCHAR PRIMARY KEY,
duck VARCHAR NOT NULL,
coins BIGINT NOT NULL,
denomination VARCHAR NOT NULL,
deposited_at TIMESTAMP NOT NULL
)
batching:
count: 10000
period: 5s
Fields
dsn
Path to the DuckDB database file. Use :memory: for an ephemeral in-process database.
Type: string
# Examples
dsn: /data/bento.duckdb
dsn: ':memory:'
table
Target table name. The table must exist when Connect() is called; use init_statement to create it.
Type: string
schema
DuckDB schema containing the table. Empty string uses the default schema.
Type: string
Default: ""
columns
Ordered list of column names. Must match the order of values produced by args_mapping.
Type: array
# Examples
columns:
- duck
- coins
- deposited_at
args_mapping
A Bloblang mapping that must evaluate to an array of values, one per column in the same order as columns.
Type: string
# Examples
args_mapping: root = [ this.duck, this.coins, this.deposited_at ]
init_files
An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).
Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement and init_files are specified the init_statement is executed after the init_files.
If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Type: array
# Examples
init_files:
- ./init/*.sql
init_files:
- ./foo.sql
- ./bar.sql
init_statement
An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.
If both init_statement and init_files are specified the init_statement is executed after the init_files.
If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Type: string
# Examples
init_statement: |2
CREATE TABLE IF NOT EXISTS some_table (
foo varchar(50) not null,
bar integer,
baz varchar(50),
primary key (foo)
) WITHOUT ROWID;
init_verify_conn
Whether to verify the database connection on startup by performing a simple ping, by default this is disabled.
Type: bool
Default: false
Requires version 1.2.0 or newer
conn_max_idle_time
An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connections idle time.
Type: string
conn_max_life_time
An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connections age.
Type: string
conn_max_idle
An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value <= 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.
Type: int
Default: 2
conn_max_open
An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value <= 0, then there is no limit on the number of open connections. The default is 0 (unlimited).
Type: int
secret_name
An optional field that can be used to get the Username + Password from AWS Secrets Manager. This will overwrite the Username + Password in the DSN with the values from the Secret only if the driver is set to postgres.
Type: string
Requires version 1.3.0 or newer
iam_enabled
An optional field used to generate an IAM authentication token to connect to an Amazon Relational Database (RDS) DB instance. This will overwrite the Password in the DSN with the generated token only if the drivers are mysql or postgres.
Type: bool
Default: false
Requires version 1.8.0 or newer
azure
Optional Fields that can be set to use Azure based authentication for Azure Postgres SQL
Type: object
Requires version 1.10.0 or newer
azure.entra_enabled
An optional field used to generate an entra token to connect to 'Azure Database for PostgreSQL flexible server', This will create a new connection string with the host, user and database from the DSN field - you may need to URL encode the dsn! The Default Azure Credential Chain is used from the Azure SDK.
Type: bool
Default: false
Requires version 1.10.0 or newer
azure.token_request_options
Sorry! This field is missing documentation.
Type: object
azure.token_request_options.claims
Set additional claims for the token.
Type: string
Default: ""
Requires version 1.10.0 or newer
azure.token_request_options.enable_cae
Indicates whether to enable Continuous Access Evaluation (CAE) for the requested token
Type: bool
Default: false
Requires version 1.10.0 or newer
azure.token_request_options.scopes
Scopes contains the list of permission scopes required for the token.
Type: array
Default: ["https://ossrdbms-aad.database.windows.net/.default"]
Requires version 1.10.0 or newer
azure.token_request_options.tenant_id
tenant_id identifies the tenant from which to request the token. azure credentials authenticate in their configured default tenants when this field isn't set.
Type: string
Default: ""
Requires version 1.10.0 or newer
region
The AWS region to target.
Type: string
Default: ""
endpoint
Allows you to specify a custom endpoint for the AWS API.
Type: string
Default: ""
credentials
Optional manual configuration of AWS credentials to use. More information can be found in this document.
Type: object
credentials.profile
A profile from ~/.aws/credentials to use.
Type: string
Default: ""
credentials.id
The ID of credentials to use.
Type: string
Default: ""
credentials.secret
The secret for the credentials being used.
This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.
Type: string
Default: ""
credentials.token
The token for the credentials being used, required when using short term credentials.
Type: string
Default: ""
credentials.from_ec2_role
Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.
Type: bool
Default: false
credentials.role
A role ARN to assume.
Type: string
Default: ""
credentials.role_external_id
An external ID to provide when assuming a role.
Type: string
Default: ""
credentials.expiry_window
Allow the credentials to trigger refreshing prior to the credentials actually expiring. This is beneficial so race conditions with expiring credentials do not cause requests to fail. For example '10s' would refresh credentials ten seconds before expiration. Setting to a duration of 0 disables the expiry window.
Type: string
Default: ""
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching:
count: 10
jitter: 0.1
period: 10s
batching.count
A number of messages at which the batch should be flushed. If 0 disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0 disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.jitter
A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0 and jitter * period. For example, with period: 100ms and jitter: 0.1, each flush will be delayed by a random duration between 0-10ms.
Type: float
Default: 0
# Examples
jitter: 0.01
jitter: 0.1
jitter: 1
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array