Skip to main content

sql_raw

Executes an arbitrary SQL query for each message.

Introduced in version 1.0.0.

# Common config fields, showing default values
output:
label: ""
sql_raw:
driver: "" # No default (required)
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""

Examples

Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:

output:
sql_raw:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (id, name, topic) VALUES (?, ?, ?);"
args_mapping: |
root = [
this.user.id,
this.user.name,
metadata("kafka_topic").string(),
]

Fields

driver

A database driver to use.

Type: string
Options: mysql, postgres, clickhouse, mssql, sqlite, oracle, snowflake, trino, gocosmos, spanner.

dsn

A Data Source Name to identify the target database.

Drivers

The following is a list of supported drivers, their placeholder style, and their respective DSN formats:

DriverData Source Name Format
clickhouseclickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
mssqlsqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&...]
sqlitefile:/path/to/filename.db[?param&=value1&...]
oracleoracle://[username[:password]@][netloc][:port]/service_name?server=server2&server=server3
snowflakeusername[:password]@account_identifier/dbname/schemaname[?param1=value&...&paramN=valueN]
spannerprojects/[project]/instances/[instance]/databases/dbname
trinohttp[s]://user[:pass]@host[:port][?parameters]
gocosmosAccountEndpoint=<cosmosdb-endpoint>;AccountKey=<cosmosdb-account-key>[;TimeoutMs=<timeout-in-ms>][;Version=<cosmosdb-api-version>][;DefaultDb/Db=<db-name>][;AutoId=<true/false>][;InsecureSkipVerify=<true/false>]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

The snowflake driver supports multiple DSN formats. Please consult the docs for more details. For key pair authentication, the DSN has the following format: <snowflake_user>@<snowflake_account>/<db_name>/<schema_name>?warehouse=<warehouse>&role=<role>&authenticator=snowflake_jwt&privateKey=<base64_url_encoded_private_key>, where the value for the privateKey parameter can be constructed from an unencrypted RSA private key file rsa_key.p8 using openssl enc -d -base64 -in rsa_key.p8 | basenc --base64url -w0 (you can use gbasenc insted of basenc on OSX if you install coreutils via Homebrew). If you have a password-encrypted private key, you can decrypt it using openssl pkcs8 -in rsa_key_encrypted.p8 -out rsa_key.p8. Also, make sure fields such as the username are URL-encoded.

The gocosmos driver is still experimental, but it has support for hierarchical partition keys as well as cross-partition queries. Please refer to the SQL notes for details.

Type: string

# Examples

dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60

dsn: foouser:foopassword@tcp(localhost:3306)/foodb

dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

dsn: oracle://foouser:foopass@localhost:1521/service_name

query

The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?) whereas others expect incrementing dollar signs ($1, $2, and so on) or colons (:1, :2 and so on). The style to use is outlined in this table:

DriverPlaceholder Style
clickhouseDollar sign
mysqlQuestion mark
postgresDollar sign
mssqlQuestion mark
sqliteQuestion mark
oracleColon
snowflakeQuestion mark
spannerQuestion mark
trinoQuestion mark
gocosmosColon

Type: string

# Examples

query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

unsafe_dynamic_query

Whether to enable interpolation functions in the query. Great care should be made to ensure your queries are defended against injection attacks.

Type: bool
Default: false

args_mapping

An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query.

Type: string

# Examples

args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]

args_mapping: root = [ metadata("user.id").string() ]

max_in_flight

The maximum number of inserts to run in parallel.

Type: int
Default: 64

init_files

An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).

Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement and init_files are specified the init_statement is executed after the init_files.

If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

Type: array
Requires version 1.0.0 or newer

# Examples

init_files:
- ./init/*.sql

init_files:
- ./foo.sql
- ./bar.sql

init_statement

An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.

If both init_statement and init_files are specified the init_statement is executed after the init_files.

If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

Type: string
Requires version 1.0.0 or newer

# Examples

init_statement: |2
CREATE TABLE IF NOT EXISTS some_table (
foo varchar(50) not null,
bar integer,
baz varchar(50),
primary key (foo)
) WITHOUT ROWID;

init_verify_conn

Whether to verify the database connection on startup by performing a simple ping, by default this is disabled.

Type: bool
Default: false
Requires version 1.2.0 or newer

conn_max_idle_time

An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connections idle time.

Type: string

conn_max_life_time

An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value <= 0, connections are not closed due to a connections age.

Type: string

conn_max_idle

An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value <= 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.

Type: int
Default: 2

conn_max_open

An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value <= 0, then there is no limit on the number of open connections. The default is 0 (unlimited).

Type: int

secret_name

An optional field that can be used to get the Username + Password from AWS Secrets Manager. This will overwrite the Username + Password in the DSN with the values from the Secret only if the driver is set to postgres.

Type: string

region

The AWS region to target.

Type: string
Default: ""

endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string
Default: ""

credentials

Optional manual configuration of AWS credentials to use. More information can be found in this document.

Type: object

credentials.profile

A profile from ~/.aws/credentials to use.

Type: string
Default: ""

credentials.id

The ID of credentials to use.

Type: string
Default: ""

credentials.secret

The secret for the credentials being used.

Secret

This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.

Type: string
Default: ""

credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string
Default: ""

credentials.from_ec2_role

Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.

Type: bool
Default: false
Requires version 1.0.0 or newer

credentials.role

A role ARN to assume.

Type: string
Default: ""

credentials.role_external_id

An external ID to provide when assuming a role.

Type: string
Default: ""

batching

Allows you to configure a batching policy.

Type: object

# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching:
count: 10
jitter: 0.1
period: 10s

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.jitter

A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0 and jitter * period. For example, with period: 100ms and jitter: 0.1, each flush will be delayed by a random duration between 0-10ms.

Type: float
Default: 0

# Examples

jitter: 0.01

jitter: 0.1

jitter: 1

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array