sql_select
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
Executes a select query and creates a message for each row received.
Introduced in version 1.0.0.
- Common
- Advanced
# Common config fields, showing default values
input:
label: ""
sql_select:
driver: "" # No default (required)
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
table: foo # No default (required)
columns: [] # No default (required)
where: type = ? and created_at > ? # No default (optional)
args_mapping: root = [ "article", now().ts_format("2006-01-02") ] # No default (optional)
auto_replay_nacks: true
# All config fields, showing default values
input:
label: ""
sql_select:
driver: "" # No default (required)
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
table: foo # No default (required)
columns: [] # No default (required)
where: type = ? and created_at > ? # No default (optional)
args_mapping: root = [ "article", now().ts_format("2006-01-02") ] # No default (optional)
prefix: "" # No default (optional)
suffix: "" # No default (optional)
auto_replay_nacks: true
init_files: [] # No default (optional)
init_statement: | # No default (optional)
CREATE TABLE IF NOT EXISTS some_table (
foo varchar(50) not null,
bar integer,
baz varchar(50),
primary key (foo)
) WITHOUT ROWID;
init_verify_conn: false
conn_max_idle_time: "" # No default (optional)
conn_max_life_time: "" # No default (optional)
conn_max_idle: 2
conn_max_open: 0 # No default (optional)
secret_name: "" # No default (optional)
region: ""
endpoint: ""
credentials:
profile: ""
id: ""
secret: ""
token: ""
from_ec2_role: false
role: ""
role_external_id: ""
Once the rows from the query are exhausted this input shuts down, allowing the pipeline to gracefully terminate (or the next input in a sequence to execute).
Examples
- Consume a Table (PostgreSQL)
Here we define a pipeline that will consume all rows from a table created within the last hour by comparing the unix timestamp stored in the row column "created_at":
input:
sql_select:
driver: postgres
dsn: postgres://foouser:foopass@localhost:5432/testdb?sslmode=disable
table: footable
columns: [ '*' ]
where: created_at >= ?
args_mapping: |
root = [
now().ts_unix() - 3600
]
Fields
driver
A database driver to use.
Type: string
Options: mysql
, postgres
, clickhouse
, mssql
, sqlite
, oracle
, snowflake
, trino
, gocosmos
, spanner
.
dsn
A Data Source Name to identify the target database.
Drivers
The following is a list of supported drivers, their placeholder style, and their respective DSN formats:
Driver | Data Source Name Format |
---|---|
clickhouse | clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...¶mN=valueN] |
mysql | [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] |
postgres | postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...] |
mssql | sqlserver://[user[:password]@][netloc][:port][?database=dbname¶m1=value1&...] |
sqlite | file:/path/to/filename.db[?param&=value1&...] |
oracle | oracle://[username[:password]@][netloc][:port]/service_name?server=server2&server=server3 |
snowflake | username[:password]@account_identifier/dbname/schemaname[?param1=value&...¶mN=valueN] |
spanner | projects/[project]/instances/[instance]/databases/dbname |
trino | http[s]://user[:pass]@host[:port][?parameters] |
gocosmos | AccountEndpoint=<cosmosdb-endpoint>;AccountKey=<cosmosdb-account-key>[;TimeoutMs=<timeout-in-ms>][;Version=<cosmosdb-api-version>][;DefaultDb/Db=<db-name>][;AutoId=<true/false>][;InsecureSkipVerify=<true/false>] |
Please note that the postgres
driver enforces SSL by default, you can override this with the parameter sslmode=disable
if required.
The snowflake
driver supports multiple DSN formats. Please consult the docs for more details. For key pair authentication, the DSN has the following format: <snowflake_user>@<snowflake_account>/<db_name>/<schema_name>?warehouse=<warehouse>&role=<role>&authenticator=snowflake_jwt&privateKey=<base64_url_encoded_private_key>
, where the value for the privateKey
parameter can be constructed from an unencrypted RSA private key file rsa_key.p8
using openssl enc -d -base64 -in rsa_key.p8 | basenc --base64url -w0
(you can use gbasenc
insted of basenc
on OSX if you install coreutils
via Homebrew). If you have a password-encrypted private key, you can decrypt it using openssl pkcs8 -in rsa_key_encrypted.p8 -out rsa_key.p8
. Also, make sure fields such as the username are URL-encoded.
The gocosmos
driver is still experimental, but it has support for hierarchical partition keys as well as cross-partition queries. Please refer to the SQL notes for details.
Type: string
# Examples
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
dsn: oracle://foouser:foopass@localhost:1521/service_name
table
The table to select from.
Type: string
# Examples
table: foo
columns
A list of columns to select.
Type: array
# Examples
columns:
- '*'
columns:
- foo
- bar
- baz
where
An optional where clause to add. Placeholder arguments are populated with the args_mapping
field. Placeholders should always be question marks, and will automatically be converted to dollar syntax when the postgres or clickhouse drivers are used.
Type: string
# Examples
where: type = ? and created_at > ?
where: user_id = ?
args_mapping
An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field where
.
Type: string
# Examples
args_mapping: root = [ "article", now().ts_format("2006-01-02") ]
prefix
An optional prefix to prepend to the select query (before SELECT).
Type: string
suffix
An optional suffix to append to the select query.
Type: string
auto_replay_nacks
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false
these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Type: bool
Default: true
init_files
An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).
Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement
and init_files
are specified the init_statement
is executed after the init_files
.
If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Type: array
Requires version 1.0.0 or newer
# Examples
init_files:
- ./init/*.sql
init_files:
- ./foo.sql
- ./bar.sql
init_statement
An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.
If both init_statement
and init_files
are specified the init_statement
is executed after the init_files
.
If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Type: string
Requires version 1.0.0 or newer
# Examples
init_statement: |2
CREATE TABLE IF NOT EXISTS some_table (
foo varchar(50) not null,
bar integer,
baz varchar(50),
primary key (foo)
) WITHOUT ROWID;
init_verify_conn
Whether to verify the database connection on startup by performing a simple ping, by default this is disabled.
Type: bool
Default: false
Requires version 1.2.0 or newer
conn_max_idle_time
An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value <= 0
, connections are not closed due to a connections idle time.
Type: string
conn_max_life_time
An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value <= 0
, connections are not closed due to a connections age.
Type: string
conn_max_idle
An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value <= 0
, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.
Type: int
Default: 2
conn_max_open
An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value <= 0
, then there is no limit on the number of open connections. The default is 0 (unlimited).
Type: int
secret_name
An optional field that can be used to get the Username + Password from AWS Secrets Manager. This will overwrite the Username + Password in the DSN with the values from the Secret only if the driver is set to postgres.
Type: string
region
The AWS region to target.
Type: string
Default: ""
endpoint
Allows you to specify a custom endpoint for the AWS API.
Type: string
Default: ""
credentials
Optional manual configuration of AWS credentials to use. More information can be found in this document.
Type: object
credentials.profile
A profile from ~/.aws/credentials
to use.
Type: string
Default: ""
credentials.id
The ID of credentials to use.
Type: string
Default: ""
credentials.secret
The secret for the credentials being used.
This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.
Type: string
Default: ""
credentials.token
The token for the credentials being used, required when using short term credentials.
Type: string
Default: ""
credentials.from_ec2_role
Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.
Type: bool
Default: false
Requires version 1.0.0 or newer
credentials.role
A role ARN to assume.
Type: string
Default: ""
credentials.role_external_id
An external ID to provide when assuming a role.
Type: string
Default: ""