Skip to main content

gcp_bigquery_write_api

BETA

This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.

Sends messages as new rows to a Google Cloud BigQuery table using the BigQuery Storage Write API.

Introduced in version 1.3.0.

# Common config fields, showing default values
output:
label: ""
gcp_bigquery_write_api:
project: ""
dataset: "" # No default (required)
table: "" # No default (required)

You can use the Storage Write API to stream records into BigQuery in real time or to batch process an arbitrarily large number of records and commit them in a single atomic operation.

BigQuery API Limitation

The AppendRows request is limited to 10 MB.

If you experience issues with this limitation, tweak the component's batch policy using the batching field. You can read more at Message Batching.

Fields

project

The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.

Type: string
Default: ""

dataset

The BigQuery Dataset ID.

Type: string

table

Interpolation of Message Batches

It is assumed that the first message in the batch will resolve the bloblang query and that string will be used for all messages in the batch.

The table to insert messages to. This field supports interpolation functions.

Type: string

endpoint

Used to overwrite the default gRPC and HTTP BigQuery endpoints.

Type: object

endpoint.http

The endpoint used to create the BigQuery client.

Type: string
Default: ""

endpoint.grpc

The endpoint used to create the BigQuery Storage API client.

Type: string
Default: ""

stream_type

Storage API Stream Types

Only DEFAULT stream types are currently enabled. Future versions will see support extended to COMMITTED, BUFFERED, and PENDING.

sets the type of stream this write client is managing.

Type: string
Default: "DEFAULT"

OptionSummary
DEFAULTDefaultStream most closely mimics the legacy bigquery tabledata.insertAll semantics. Successful inserts are committed immediately, and there's no tracking offsets as all writes go into a default stream that always exists for a table.

batching

Allows you to configure a batching policy.

Type: object

# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching:
count: 10
jitter: 0.1
period: 10s

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples

period: 1s

period: 1m

period: 500ms

batching.jitter

A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0 and jitter * period. For example, with period: 100ms and jitter: 0.1, each flush will be delayed by a random duration between 0-10ms.

Type: float
Default: 0

# Examples

jitter: 0.01

jitter: 0.1

jitter: 1

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples

check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array

max_in_flight

The maximum number of message batches to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 64