Google Cloud Storage Loader
Overview
Cloud Storage Loader is a Dataflow job which dumps event from an input PubSub subscription into a Cloud Storage bucket.
Cloud Storage loader is built on top of Apache Beam and its Scala wrapper SCIO.
Setup guide
Running
Cloud Storage Loader comes both as a Docker image and a ZIP archive.
Docker image
Docker image can be found on Docker Hub.
A container can be run as follows:
docker run \
-v $PWD/config:/snowplow/config \ # if running outside GCP
-e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP
snowplow/snowplow-google-cloud-storage-loader:0.5.4 \
--runner=DataFlowRunner \
--jobName=[JOB-NAME] \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
To display the help message:
docker run snowplow/snowplow-google-cloud-storage-loader:0.5.4 \
--help
To display documentation about Cloud Storage Loader-specific options:
docker run snowplow/snowplow-google-cloud-storage-loader:0.5.4 \
--help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options
ZIP archive
Archive is hosted on GitHub at this URI:
https://github.com/snowplow-incubator/snowplow-google-cloud-storage-loader/releases/download/0.5.4/snowplow-google-cloud-storage-loader-0.5.4.zip
Once unzipped the artifact can be run as follows:
./bin/snowplow-google-cloud-storage-loader \
--runner=DataFlowRunner \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
To display the help message:
./bin/snowplow-google-cloud-storage-loader --help
To display documentation about Cloud Storage Loader-specific options:
./bin/snowplow-google-cloud-storage-loader --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options
Configuration
Cloud Storage Loader specific options
--inputSubscription=String
The Cloud Pub/Sub subscription to read from, formatted like projects/[PROJECT]/subscriptions/[SUB]. Required.--outputDirectory=gs://[BUCKET]/
The Cloud Storage directory to output files to, ending in /. Required.--outputFilenamePrefix=String
The prefix for output files. Default: output. Optional.--shardTemplate=String
A valid shard template as described here, which will be part of the filenames. Default:-W-P-SSSSS-of-NNNNN
. Optional.--outputFilenameSuffix=String
The suffix for output files. Default: .txt. Optional.--windowDuration=Int
The window duration in minutes. Default: 5. Optional.--compression=String
The compression used (gzip, bz2 or none). Note that bz2 can't be loaded into BigQuery. Default: no compression. Optional.--numShards=Int
The maximum number of output shards produced when writing. Default: 1. Optional.--dateFormat=YYYY/MM/dd/HH/
A date format string used for partitioning via date inoutputDirectory
andpartitionedOutputDirectory
. Default:YYYY/MM/dd/HH/
. Optional. For example, the date formatYYYY/MM/dd/HH/
would produce a directory structure like this:gs://bucket/
└── 2022
└── 12
└── 15
├── ...
├── 18
├── 19
├── 20
└── ...--partitionedOutputDirectory=gs://[BUCKET]/
The Cloud Storage directory to output files to, partitioned by schema, ending with /. Unpartitioned data will be sent tooutputDirectory
. Optional.
Dataflow options
To run the Cloud Storage Loader on Dataflow, it is also necessary to specify additional configuration options. None of these options have default values, and they are all required.
--runner=DataFlowRunner
Passing the stringDataFlowRunner
specifies that we want to run on Dataflow.--jobName=[NAME]
Specify a name for your Dataflow job that will be created.--project=[PROJECT]
The name of your GCP project.--streaming=true
Passtrue
to notify Dataflow that we're running a streaming application.--workerZone=[ZONE]
The zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched.--region=[REGION]
The region where the Dataflow job will be launched.--gcpTempLocation=gs://[BUCKET]/
The GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored.
The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.