gcs
gcs_streamer
This connector provides the ability to stream events into Google Cloud Storage.
Authentication happens over the GCP autentication
For this connector, while the (name, bucket name) pair does not change, the consecutive events will be appended to the same gcs object.
Modes of operation
All our object storage connectors that do uploads (s3_streamer
and gcs_streamer
) operate with the same two modes.
yolo
The yolo
mode is going to report every event as successfully delivered if uploading it to gcs worked or not.
This mode is intended for "fire-and-forget" use-cases, where it is not important if a single event in the middle of an upload failed or is missing.
The upload will continue nonetheless and only be finished when a new event with another bucket or name is received.
This mode is well-suited for e.g. aggregating time-series events into gcs object by time unit (e.g. hour or day) where it is not too important that the resulting gcs object is 1:1 mirroring of all the events in the correct order without gaps.
consistent
consistent
mode, on the other hand, is well suited when uploading consistent and correct gcs objects is important. In this mode, the gcs_streamer
will fail a complete upload whenever only 1 single event could not be successfully uploaded to gcs. This guarantees that only complete and consistent gcs objects will show up in your buckets. But due to this logic, it is possible to lose events and whole uploads if the upstream connectors don't replay failed events. The wal
connector and the kafka_consumer
can replay events, so it makes sense to pair the gcs_streamer
in consistent
mode with those.
This mode guarantees that only complete gcs objects with all events in the right order are uploaded to gcs, object where uploading failed for some reason will be deleted, expecting the upstream to retry the upload. It is well-suited for uploading complete files, like images or documents.
Metadata
Two metadata fields are required for the connector to work - $gcs_streamer.name
(will be used as the object name) and $gcs_streamer.bucket
(the name of the bucket where the object will be placed).
Configuration
All of the configuration options,, aside of token, are optional.
name | description | default |
---|---|---|
url | The HTTP(s) endpoint to which the requests will be made | "https://storage.googleapis.com/upload/storage/v1" |
bucket | The optional bucket to stream events into if not overwritten by event metadata $gcs_streamer.bucket | |
mode | The mode of operation for this connector. See Modes of operation. | |
connect_timeout | The timeout for the connection (in nanoseconds) | 10_000_000_000 (10 seconds) |
buffer_size | The size of a single request body, in bytes (must be divisible by 256kiB, as required by Google) | 8388608 (8MiB, the minimum recommended by Google) |
max_retries | The number of retries to perform for a failed request to GCS. | 3 |
backoff_base_time | Base waiting time in nanoseconds for exponential backoff when doing retries of failed requests to GCS. | 25_000_000 (25 ms) |
token | The authentication token see GCP autentication |
Example
define flow main
flow
use std::{size, time::nanos};
define connector metronome from metronome
with
config = {"interval": nanos::from_millis(10)}
end;
define connector output from gcs_streamer
with
config = {
"mode": "consistent",
"buffer_size": size::kiB(64),
"backoff_base_time": nanos::from_millis(200)
},
codec = "json"
end;
define pipeline main
pipeline
define script add_meta
script
use std::string;
let file_id = event.id - (event.id % 4);
let $gcs_streamer = {
"name": "my_file_#{"#{file_id}"}.txt",
"bucket": "tremor-test-bucket"
};
emit {"a": "B"}
end;
create script add_meta from add_meta;
select event from in into add_meta;
select event from add_meta into out;
select event from add_meta/err into err;
end;
define connector console from stdio
with
codec = "json"
end;
create connector s1 from metronome;
create connector s2 from output;
create connector errors from console;
create pipeline main;
connect /connector/s1 to /pipeline/main;
connect /pipeline/main to /connector/s2;
connect /pipeline/main/err to /connector/errors;
end;
deploy flow main;