sql
This component is deprecated and will be removed in the next major version release. Please consider moving onto alternative components.
Executes an arbitrary SQL query for each message.
- Common
- Advanced
# Common config fields, showing default values
output:
label: ""
sql:
driver: "" # No default (required)
data_source_name: "" # No default (required)
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
# All config fields, showing default values
output:
label: ""
sql:
driver: "" # No default (required)
data_source_name: "" # No default (required)
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
processors: [] # No default (optional)
Alternatives
For basic inserts use the sql_insert output. For more complex queries use the sql_raw output.
Fields
driver
A database driver to use.
Type: string
Options: mysql, postgres, clickhouse, mssql, sqlite, oracle, snowflake, trino, gocosmos, spanner, duckdb.
data_source_name
Data source name.
Type: string
query
The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?) whereas others expect incrementing dollar signs ($1, $2, and so on) or colons (:1, :2 and so on). The style to use is outlined in this table:
| Driver | Placeholder Style |
|---|---|
clickhouse | Dollar sign |
mysql | Question mark |
postgres | Dollar sign |
mssql | Question mark |
sqlite | Question mark |
oracle | Colon |
snowflake | Question mark |
spanner | Question mark |
trino | Question mark |
gocosmos | Colon |
duckdb | Question mark |
Type: string
# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);
args_mapping
An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query.
Type: string
# Examples
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ metadata("user.id").string() ]
max_in_flight
The maximum number of inserts to run in parallel.
Type: int
Default: 64
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching:
count: 10
jitter: 0.1
period: 10s
batching.count
A number of messages at which the batch should be flushed. If 0 disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0 disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.jitter
A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0 and jitter * period. For example, with period: 100ms and jitter: 0.1, each flush will be delayed by a random duration between 0-10ms.
Type: float
Default: 0
# Examples
jitter: 0.01
jitter: 0.1
jitter: 1
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array