snowflake_put
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
Sends messages to Snowflake stages and, optionally, calls Snowpipe to load this data into one or more tables.
Introduced in version 1.0.0.
- Common
- Advanced
# Common config fields, showing default values
output:
label: ""
snowflake_put:
account: "" # No default (required)
region: us-west-2 # No default (optional)
cloud: aws # No default (optional)
user: "" # No default (required)
password: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
role: "" # No default (required)
database: "" # No default (required)
warehouse: "" # No default (required)
schema: "" # No default (required)
stage: "" # No default (required)
path: ""
file_name: ""
file_extension: ""
compression: AUTO
request_id: ""
snowpipe: "" # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
max_in_flight: 1
# All config fields, showing default values
output:
label: ""
snowflake_put:
account: "" # No default (required)
region: us-west-2 # No default (optional)
cloud: aws # No default (optional)
user: "" # No default (required)
password: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
role: "" # No default (required)
database: "" # No default (required)
warehouse: "" # No default (required)
schema: "" # No default (required)
stage: "" # No default (required)
path: ""
file_name: ""
file_extension: ""
upload_parallel_threads: 4
compression: AUTO
request_id: ""
snowpipe: "" # No default (optional)
client_session_keep_alive: false
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
max_in_flight: 1
In order to use a different stage and / or Snowpipe for each message, you can use function interpolations as described
here. When using batching, messages are grouped by the calculated
stage and Snowpipe and are streamed to individual files in their corresponding stage and, optionally, a Snowpipe
insertFiles
REST API call will be made for each individual file.
Credentials
Two authentication mechanisms are supported:
- User/password
- Key Pair Authentication
User/password
This is a basic authentication mechanism which allows you to PUT data into a stage. However, it is not compatible with Snowpipe.
Key Pair Authentication
This authentication mechanism allows Snowpipe functionality, but it does require configuring an SSH Private Key beforehand. Please consult the documentation for details on how to set it up and assign the Public Key to your user.
Note that the Snowflake documentation used to suggest using this command:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
to generate an encrypted SSH private key. However, in this case, it uses an encryption algorithm called
pbeWithMD5AndDES-CBC
, which is part of the PKCS#5 v1.5 and is considered insecure. Due to this, Bento does not
support it and, if you wish to use password-protected keys directly, you must use PKCS#5 v2.0 to encrypt them by using
the following command (as the current Snowflake docs suggest):
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
If you have an existing key encrypted with PKCS#5 v1.5, you can re-encrypt it with PKCS#5 v2.0 using this command:
openssl pkcs8 -in rsa_key_original.p8 -topk8 -v2 des3 -out rsa_key.p8
Please consult this pkcs8 command documentation for details on PKCS#5 algorithms.
Batching
It's common to want to upload messages to Snowflake as batched archives. The easiest way to do this is to batch your
messages at the output level and join the batch of messages with an
archive
and/or compress
processor.
For the optimal batch size, please consult the Snowflake documentation.
Snowpipe
Given a table called BENTO_TBL
with one column of type variant
:
CREATE OR REPLACE TABLE BENTO_DB.PUBLIC.BENTO_TBL(RECORD variant)
and the following BENTO_PIPE
Snowpipe:
CREATE OR REPLACE PIPE BENTO_DB.PUBLIC.BENTO_PIPE AUTO_INGEST = FALSE AS COPY INTO BENTO_DB.PUBLIC.BENTO_TBL FROM (SELECT * FROM @%BENTO_TBL) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO)
you can configure Bento to use the implicit table stage @%BENTO_TBL
as the stage
and
BENTO_PIPE
as the snowpipe
. In this case, you must set compression
to AUTO
and, if
using message batching, you'll need to configure an archive
processor
with the concatenate
format. Since the compression
is set to AUTO
, the
gosnowflake client library will compress the messages automatically so you
don't need to add a compress
processor for message batches.
If you add STRIP_OUTER_ARRAY = TRUE
in your Snowpipe FILE_FORMAT
definition, then you must use json_array
instead of concatenate
as the archive processor format.
Note: Only Snowpipes with FILE_FORMAT
TYPE
JSON
are currently supported.
Snowpipe Troubleshooting
Snowpipe provides the insertReport
and loadHistoryScan
REST API endpoints which can be used to get information about recent Snowpipe calls. In
order to query them, you'll first need to generate a valid JWT token for your Snowflake account. There are two methods
for doing so:
- Using the
snowsql
utility:
snowsql --private-key-path rsa_key.p8 --generate-jwt -a <account> -u <user>
- Using the Python
sql-api-generate-jwt
utility:
python3 sql-api-generate-jwt.py --private_key_file_path=rsa_key.p8 --account=<account> --user=<user>
Once you successfully generate a JWT token and store it into the JWT_TOKEN
environment variable, then you can,
for example, query the insertReport
endpoint using curl
:
curl -H "Authorization: Bearer ${JWT_TOKEN}" "https://<account>.snowflakecomputing.com/v1/data/pipes/<database>.<schema>.<snowpipe>/insertReport"
If you need to pass in a valid requestId
to any of these Snowpipe REST API endpoints, you can set a
uuid_v4() string in a metadata field called
request_id
, log it via the log
processor and
then configure request_id: ${ @request_id }
). Alternatively, you can enable debug logging as described
here and Bento will print the Request IDs that it sends to Snowpipe.
General Troubleshooting
The underlying gosnowflake
driver requires write access to
the default directory to use for temporary files. Please consult the os.TempDir
docs for details on how to change this directory via environment variables.
A silent failure can occur due to this issue, where the
underlying gosnowflake
driver doesn't return an error and doesn't
log a failure if it can't figure out the current username. One way to trigger this behaviour is by running Bento in a
Docker container with a non-existent user ID (such as --user 1000:1000
).
Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
Examples
- Kafka / realtime brokers
- No compression
- Parquet format with snappy compression
- Automatic compression
- DEFLATE compression
- RAW_DEFLATE compression
Upload message batches from realtime brokers such as Kafka persisting the batch partition and offsets in the stage path and filename similarly to the Kafka Connector scheme and call Snowpipe to load them into a table. When batching is configured at the input level, it is done per-partition.
input:
kafka:
addresses:
- localhost:9092
topics:
- foo
consumer_group: bento
batching:
count: 10
period: 3s
processors:
- mapping: |
meta kafka_start_offset = meta("kafka_offset").from(0)
meta kafka_end_offset = meta("kafka_offset").from(-1)
meta batch_timestamp = if batch_index() == 0 { now() }
- mapping: |
meta batch_timestamp = if batch_index() != 0 { meta("batch_timestamp").from(0) }
output:
snowflake_put:
account: bento
user: test@bento.dev
private_key_file: path_to_ssh_key.pem
role: ACCOUNTADMIN
database: BENTO_DB
warehouse: COMPUTE_WH
schema: PUBLIC
stage: "@%BENTO_TBL"
path: bento/BENTO_TBL/${! @kafka_partition }
file_name: ${! @kafka_start_offset }_${! @kafka_end_offset }_${! meta("batch_timestamp") }
upload_parallel_threads: 4
compression: NONE
snowpipe: BENTO_PIPE
Upload concatenated messages into a .json
file to a table stage without calling Snowpipe.
output:
snowflake_put:
account: bento
user: test@bento.dev
private_key_file: path_to_ssh_key.pem
role: ACCOUNTADMIN
database: BENTO_DB
warehouse: COMPUTE_WH
schema: PUBLIC
stage: "@%BENTO_TBL"
path: bento
upload_parallel_threads: 4
compression: NONE
batching:
count: 10
period: 3s
processors:
- archive:
format: concatenate
Upload concatenated messages into a .parquet
file to a table stage without calling Snowpipe.
output:
snowflake_put:
account: bento
user: test@bento.dev
private_key_file: path_to_ssh_key.pem
role: ACCOUNTADMIN
database: BENTO_DB
warehouse: COMPUTE_WH
schema: PUBLIC
stage: "@%BENTO_TBL"
path: bento
file_extension: parquet
upload_parallel_threads: 4
compression: NONE
batching:
count: 10
period: 3s
processors:
- parquet_encode:
schema:
- name: ID
type: INT64
- name: CONTENT
type: BYTE_ARRAY
default_compression: snappy
Upload concatenated messages compressed automatically into a .gz
archive file to a table stage without calling Snowpipe.
output:
snowflake_put:
account: bento
user: test@bento.dev
private_key_file: path_to_ssh_key.pem
role: ACCOUNTADMIN
database: BENTO_DB
warehouse: COMPUTE_WH
schema: PUBLIC
stage: "@%BENTO_TBL"
path: bento
upload_parallel_threads: 4
compression: AUTO
batching:
count: 10
period: 3s
processors:
- archive:
format: concatenate
Upload concatenated messages compressed into a .deflate
archive file to a table stage and call Snowpipe to load them into a table.
output:
snowflake_put:
account: bento
user: test@bento.dev
private_key_file: path_to_ssh_key.pem
role: ACCOUNTADMIN
database: BENTO_DB
warehouse: COMPUTE_WH
schema: PUBLIC
stage: "@%BENTO_TBL"
path: bento
upload_parallel_threads: 4
compression: DEFLATE
snowpipe: BENTO_PIPE
batching:
count: 10
period: 3s
processors:
- archive:
format: concatenate
- mapping: |
root = content().compress("zlib")
Upload concatenated messages compressed into a .raw_deflate
archive file to a table stage and call Snowpipe to load them into a table.
output:
snowflake_put:
account: bento
user: test@bento.dev
private_key_file: path_to_ssh_key.pem
role: ACCOUNTADMIN
database: BENTO_DB
warehouse: COMPUTE_WH
schema: PUBLIC
stage: "@%BENTO_TBL"
path: bento
upload_parallel_threads: 4
compression: RAW_DEFLATE
snowpipe: BENTO_PIPE
batching:
count: 10
period: 3s
processors:
- archive:
format: concatenate
- mapping: |
root = content().compress("flate")
Fields
account
Account name, which is the same as the Account Identifier
as described here.
However, when using an Account Locator,
the Account Identifier is formatted as <account_locator>.<region_id>.<cloud>
and this field needs to be
populated using the <account_locator>
part.
Type: string
region
Optional region field which needs to be populated when using
an Account Locator
and it must be set to the <region_id>
part of the Account Identifier
(<account_locator>.<region_id>.<cloud>
).
Type: string
# Examples
region: us-west-2
cloud
Optional cloud platform field which needs to be populated
when using an Account Locator
and it must be set to the <cloud>
part of the Account Identifier
(<account_locator>.<region_id>.<cloud>
).
Type: string
# Examples
cloud: aws
cloud: gcp
cloud: azure
user
Username.
Type: string
password
An optional password.
This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.
Type: string
private_key_file
The path to a file containing the private SSH key.
Type: string
private_key_pass
An optional private SSH key passphrase.
This field contains sensitive information that usually shouldn't be added to a config directly, read our secrets page for more info.
Type: string
role
Role.
Type: string
database
Database.
Type: string
warehouse
Warehouse.
Type: string
schema
Schema.
Type: string
stage
Stage name. Use either one of the supported stage types. This field supports interpolation functions.
Type: string
path
Stage path. This field supports interpolation functions.
Type: string
Default: ""
file_name
Stage file name. Will be equal to the Request ID if not set or empty. This field supports interpolation functions.
Type: string
Default: ""
Requires version v4.12.0 or newer
file_extension
Stage file extension. Will be derived from the configured compression
if not set or empty.
This field supports interpolation functions.
Type: string
Default: ""
Requires version v4.12.0 or newer
# Examples
file_extension: csv
file_extension: parquet
upload_parallel_threads
Specifies the number of threads to use for uploading files.
Type: int
Default: 4
compression
Compression type.
Type: string
Default: "AUTO"
Option | Summary |
---|---|
AUTO | Compression (gzip) is applied automatically by the output and messages must contain plain-text JSON. Default file_extension : gz . |
DEFLATE | Messages must be pre-compressed using the zlib algorithm (with zlib header, RFC1950). Default file_extension : deflate . |
GZIP | Messages must be pre-compressed using the gzip algorithm. Default file_extension : gz . |
NONE | No compression is applied and messages must contain plain-text JSON. Default file_extension : json . |
RAW_DEFLATE | Messages must be pre-compressed using the flate algorithm (without header, RFC1951). Default file_extension : raw_deflate . |
ZSTD | Messages must be pre-compressed using the Zstandard algorithm. Default file_extension : zst . |
request_id
Request ID. Will be assigned a random UUID (v4) string if not set or empty. This field supports interpolation functions.
Type: string
Default: ""
Requires version v4.12.0 or newer
snowpipe
An optional Snowpipe name. Use the <snowpipe>
part from <database>.<schema>.<snowpipe>
.
This field supports interpolation functions.
Type: string
client_session_keep_alive
Enable Snowflake keepalive mechanism to prevent the client session from expiring after 4 hours (error 390114).
Type: bool
Default: false
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.count
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
max_in_flight
The maximum number of parallel message batches to have in flight at any given time.
Type: int
Default: 1