gcp_bigquery_select
This component is experimental and therefore subject to change or removal outside of major version releases.
Executes a SELECT
query against BigQuery and replaces messages with the rows returned.
Introduced in version 1.0.0.
- Common
- Advanced
# Common config fields, showing default values
label: ""
gcp_bigquery_select:
project: "" # No default (required)
table: bigquery-public-data.samples.shakespeare # No default (required)
columns: [] # No default (optional)
where: type = ? and created_at > ? # No default (optional)
job_labels: {}
args_mapping: root = [ "article", now().ts_format("2006-01-02") ] # No default (optional)
prefix: "" # No default (optional)
suffix: "" # No default (optional)
# All config fields, showing default values
label: ""
gcp_bigquery_select:
project: "" # No default (required)
table: bigquery-public-data.samples.shakespeare # No default (required)
columns: [] # No default (optional)
columns_mapping: "" # No default (optional)
where: type = ? and created_at > ? # No default (optional)
job_labels: {}
args_mapping: root = [ "article", now().ts_format("2006-01-02") ] # No default (optional)
prefix: "" # No default (optional)
suffix: "" # No default (optional)
unsafe_dynamic_query: false
Examples
- Word count
- Unsafe Dynamic Query
Given a stream of English terms, enrich the messages with the word count from Shakespeare's public works:
pipeline:
processors:
- branch:
processors:
- gcp_bigquery_select:
project: test-project
table: bigquery-public-data.samples.shakespeare
columns:
- word
- sum(word_count) as total_count
where: word = ?
suffix: |
GROUP BY word
ORDER BY total_count DESC
LIMIT 10
args_mapping: root = [ this.term ]
result_map: |
root.count = this.get("0.total_count")
An example to show the use of the unsafe_dynamic_query field:
# {"table": "test.people", "columns": ["name", "age", "city"], "args": ["London", "Paris", "Dublin"]}
pipeline:
processors:
- gcp_bigquery_select:
project: ${GCP_PROJECT}
table: ${! this.table } # test.people
columns_mapping: root = this.columns #["name", "age", "city"]
where: ${! "city IN ("+"?,".repeat(this.args.length()-1)+"?)" } # city IN (?,?,?)
args_mapping: root = this.args # ["London", "Paris", "Dublin"]
unsafe_dynamic_query: true
Fields
project
GCP project where the query job will execute.
Type: string
table
Fully-qualified BigQuery table name to query. This field supports interpolation functions.
Type: string
# Examples
table: bigquery-public-data.samples.shakespeare
columns
A list of columns to query.
Type: array
columns_mapping
An optional Bloblang mapping which should evaluate to an array of column names to query.
Type: string
Requires version 1.5.0 or newer
where
An optional where clause to add. Placeholder arguments are populated with the args_mapping
field. Placeholders should always be question marks (?
).
Type: string
# Examples
where: type = ? and created_at > ?
where: user_id = ?
job_labels
A list of labels to add to the query job.
Type: object
Default: {}
args_mapping
An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field where
.
Type: string
# Examples
args_mapping: root = [ "article", now().ts_format("2006-01-02") ]
prefix
An optional prefix to prepend to the select query (before SELECT).
Type: string
suffix
An optional suffix to append to the select query.
Type: string
unsafe_dynamic_query
Whether to enable interpolation functions in the columns_mapping, table & where fields. When unsafe_dynamic_query
is set to true, you should provide a bloblang mapping via the columns_mapping
config field, and not columns
. Great care should be made to ensure your queries are defended against injection attacks.
Type: bool
Default: false
Requires version 1.5.0 or newer