Google Cloud Storage Loader
Overview
Cloud Storage Loader is a Dataflow job which dumps event from an input PubSub subscription into a Cloud Storage bucket.
Cloud Storage loader is built on top of Apache Beam and its Scala wrapper SCIO.
Setup guide
Running
Cloud Storage Loader comes both as a Docker image and a ZIP archive.
Docker image
Docker image can be found on Docker Hub.
A container can be run as follows:
docker run \
-v $PWD/config:/snowplow/config \ # if running outside GCP
-e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP
snowplow/snowplow-google-cloud-storage-loader:0.5.4 \
--runner=DataFlowRunner \
--jobName=[JOB-NAME] \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
To display the help message:
docker run snowplow/snowplow-google-cloud-storage-loader:0.5.4 \
--help
To display documentation about Cloud Storage Loader-specific options:
docker run snowplow/snowplow-google-cloud-storage-loader:0.5.4 \
--help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options
ZIP archive
Archive is hosted on GitHub at this URI:
https://github.com/snowplow-incubator/snowplow-google-cloud-storage-loader/releases/download/0.5.4/snowplow-google-cloud-storage-loader-0.5.4.zip
Once unzipped the artifact can be run as follows:
./bin/snowplow-google-cloud-storage-loader \
--runner=DataFlowRunner \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
To display the help message:
./bin/snowplow-google-cloud-storage-loader --help
To display documentation about Cloud Storage Loader-specific options:
./bin/snowplow-google-cloud-storage-loader --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options
Configuration
Cloud Storage Loader specific options
--inputSubscription=StringThe Cloud Pub/Sub subscription to read from, formatted like projects/[PROJECT]/subscriptions/[SUB]. Required.--outputDirectory=gs://[BUCKET]/The Cloud Storage directory to output files to, ending in /. Required.--outputFilenamePrefix=StringThe prefix for output files. Default: output. Optional.--shardTemplate=StringA valid shard template as described here, which will be part of the filenames. Default:-W-P-SSSSS-of-NNNNN. Optional.--outputFilenameSuffix=StringThe suffix for output files. Default: .txt. Optional.--windowDuration=IntThe window duration in minutes. Default: 5. Optional.--compression=StringThe compression used (gzip, bz2 or none). Note that bz2 can't be loaded into BigQuery. Default: no compression. Optional.--numShards=IntThe maximum number of output shards produced when writing. Default: 1. Optional.--dateFormat=YYYY/MM/dd/HH/A date format string used for partitioning via date inoutputDirectoryandpartitionedOutputDirectory. Default:YYYY/MM/dd/HH/. Optional. For example, the date formatYYYY/MM/dd/HH/would produce a directory structure like this:gs://bucket/
└── 2022
└── 12
└── 15
├── ...
├── 18
├── 19
├── 20
└── ...--partitionedOutputDirectory=gs://[BUCKET]/The Cloud Storage directory to output files to, partitioned by schema, ending with /. Unpartitioned data will be sent tooutputDirectory. Optional.
Dataflow options
To run the Cloud Storage Loader on Dataflow, it is also necessary to specify additional configuration options. None of these options have default values, and they are all required.
--runner=DataFlowRunnerPassing the stringDataFlowRunnerspecifies that we want to run on Dataflow.--jobName=[NAME]Specify a name for your Dataflow job that will be created.--project=[PROJECT]The name of your GCP project.--streaming=truePasstrueto notify Dataflow that we're running a streaming application.--workerZone=[ZONE]The zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched.--region=[REGION]The region where the Dataflow job will be launched.--gcpTempLocation=gs://[BUCKET]/The GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored.
The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.