gcp_pubsub
Sends messages to a GCP Cloud Pub/Sub topic. Metadata from messages are sent as attributes.
- Common
- Advanced
# Common config fields, showing default values
output:
label: ""
gcp_pubsub:
project: "" # No default (required)
topic: "" # No default (required)
endpoint: ""
max_in_flight: 64
count_threshold: 100
delay_threshold: 10ms
byte_threshold: 1000000
metadata:
exclude_prefixes: []
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
# All config fields, showing default values
output:
label: ""
gcp_pubsub:
project: "" # No default (required)
topic: "" # No default (required)
endpoint: ""
ordering_key: "" # No default (optional)
max_in_flight: 64
count_threshold: 100
delay_threshold: 10ms
byte_threshold: 1000000
publish_timeout: 1m0s
metadata:
exclude_prefixes: []
flow_control:
max_outstanding_bytes: -1
max_outstanding_messages: 1000
limit_exceeded_behavior: block
batching:
count: 0
byte_size: 0
period: ""
jitter: 0
check: ""
processors: [] # No default (optional)
For information on how to set up credentials check out this guide.
Troubleshooting
If you're consistently seeing Failed to send message to gcp_pubsub: context deadline exceeded
error logs without any further information it is possible that you are encountering https://github.com/warpstreamlabs/bento/issues/1042, which occurs when metadata values contain characters that are not valid utf-8. This can frequently occur when consuming from Kafka as the key metadata field may be populated with an arbitrary binary value, but this issue is not exclusive to Kafka.
If you are blocked by this issue then a work around is to delete either the specific problematic keys:
pipeline:
processors:
- mapping: |
meta kafka_key = deleted()
Or delete all keys with:
pipeline:
processors:
- mapping: meta = deleted()
Fields
project
The project ID of the topic to publish to.
Type: string
topic
The topic to publish to. This field supports interpolation functions.
Type: string
endpoint
An optional endpoint to override the default of pubsub.googleapis.com:443
. This can be used to connect to a region specific pubsub endpoint. For a list of valid values check out this document.
Type: string
Default: ""
# Examples
endpoint: us-central1-pubsub.googleapis.com:443
endpoint: us-west3-pubsub.googleapis.com:443
ordering_key
The ordering key to use for publishing messages. This field supports interpolation functions.
Type: string
max_in_flight
The maximum number of messages to have in flight at a given time. Increasing this may improve throughput.
Type: int
Default: 64
count_threshold
Publish a pubsub buffer when it has this many messages
Type: int
Default: 100
delay_threshold
Publish a non-empty pubsub buffer after this delay has passed.
Type: string
Default: "10ms"
byte_threshold
Publish a batch when its size in bytes reaches this value.
Type: int
Default: 1000000
publish_timeout
The maximum length of time to wait before abandoning a publish attempt for a message.
Type: string
Default: "1m0s"
# Examples
publish_timeout: 10s
publish_timeout: 5m
publish_timeout: 60m
metadata
Specify criteria for which metadata values are sent as attributes, all are sent by default.
Type: object
metadata.exclude_prefixes
Provide a list of explicit metadata key prefixes to be excluded when adding metadata to sent messages.
Type: array
Default: []
flow_control
For a given topic, configures the PubSub client's internal buffer for messages to be published.
Type: object
flow_control.max_outstanding_bytes
Maximum size of buffered messages to be published. If less than or equal to zero, this is disabled.
Type: int
Default: -1
flow_control.max_outstanding_messages
Maximum number of buffered messages to be published. If less than or equal to zero, this is disabled.
Type: int
Default: 1000
flow_control.limit_exceeded_behavior
Configures the behavior when trying to publish additional messages while the flow controller is full. The available options are block (default), ignore (disable), and signal_error (publish results will return an error).
Type: string
Default: "block"
Options: ignore
, block
, signal_error
.
batching
Configures a batching policy on this output. While the PubSub client maintains its own internal buffering mechanism, preparing larger batches of messages can further trade-off some latency for throughput.
Type: object
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching:
count: 10
jitter: 0.1
period: 10s
batching.count
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.jitter
A non-negative factor that adds random delay to batch flush intervals, where delay is determined uniformly at random between 0
and jitter * period
. For example, with period: 100ms
and jitter: 0.1
, each flush will be delayed by a random duration between 0-10ms
.
Type: float
Default: 0
# Examples
jitter: 0.01
jitter: 0.1
jitter: 1
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array