Skip to main content

Spark

The Spark job reads bad rows from an S3 location and stores the recovered payloads in Kinesis, unrecovered and unrecoverable in other S3 buckets.

Building

To build the fat jar, run:

sbt spark/assembly

Running

Using the JAR directly (which is hosted at s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/):

spark-submit \
--class com.snowplowanalytcs.snowplow.event.recovery.Main \
--master master-url \
--deploy-mode deploy-mode \
snowplow-event-recovery-spark-0.1.1.jar
--input s3://bad-rows-location/
--region eu-central-1
--output collector-payloads
--failedOutput s3://unrecovered-collector-payloads-location/
--unrecoverableOutput s3://unrecoverable-collector-payloads-location/
--config base64-encoded-configuration

Or through an EMR step:

aws emr add-steps --cluster-id j-XXXXXXXX --steps \
Name=snowplow-event-recovery,\
Type=CUSTOM_JAR,\
Jar=s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.2.0.jar,\
MainClass=com.snowplowanalytics.snowplow.event.recovery.Main,\
Args=[--input,s3://bad-rows-location/,--region,eu-central-1,--output,collector-payloads,--failedOutput,s3://unrecovered-collector-payloads-location/,--unrecoverableOutput,s3://unrecoverable-collector-payloads-location/,--config,base64-encoded-configuration],\
ActionOnFailure=CONTINUE

Or using Dataflow Runner, with emr-config.json:

{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
"data": {
"name": "emr-recovery-cluster",
"logUri": "s3://logs/",
"region": "eu-central-1",
"credentials": {
"accessKeyId": "{{secret "aws-access-key-id"}}",
"secretAccessKey": "{{secret "aws-secret-access-key"}}"
},
"roles": {
"jobflow": "EMR_EC2_DefaultRole",
"service": "{{ prefix }}-event-recovery"
},
"ec2": {
"amiVersion": "5.17.0",
"keyName": "some-key-name",
"location": {
"vpc": {
"subnetId": "subnet-sample"
}
},
"instances": {
"master": {
"type": "m1.medium",
"ebsConfiguration": {
"ebsOptimized": true,
"ebsBlockDeviceConfigs": [
{
"volumesPerInstance": 12,
"volumeSpecification": {
"iops": 8,
"sizeInGB": 10,
"volumeType": "gp2"
}
}
]
}
},
"core": {
"type": "m1.medium",
"count": 1
},
"task": {
"type": "m1.medium",
"count": 0,
"bid": "0.015"
}
}
},
"tags": [
{
"key": "client",
"value": "com.snplow.eng"
},
{
"key": "job",
"value": "recovery"
}
],
"applications": [ "Hadoop", "Spark" ]
}
}

And emr-playbook.json:

{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
"data": {
"region": "eu-west-1",
"credentials": {
"accessKeyId": "{{secret "aws-access-key-id"}}",
"secretAccessKey": "{{secret "aws-secret-access-key"}}"
},
"steps": [
{
"type": "CUSTOM_JAR",
"name": "Staging of bad rows",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"arguments": [
"--src",
"s3n://${BUCKET_ID}/recovery/enriched/bad/run=2019-01-12-15-04-23/",
"--dest",
"s3n://${BUCKET_ID}/stage_01_19/"
]
},
{
"type": "CUSTOM_JAR",
"name": "Move to HDFS",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"arguments": [
"--src",
"s3n://${BUCKET_ID}/stage_01_19/",
"--dest",
"hdfs:///local/to-recover/",
"--outputCodec",
"none"
]
},
{
"type": "CUSTOM_JAR",
"name": "snowplow-event-recovery",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "command-runner.jar",
"arguments": [
"spark-submit",
"--class",
"com.snowplowanalytics.snowplow.event.recovery.Main",
"--master",
"yarn",
"--deploy-mode",
"cluster",
"s3://bad-rows/snowplow-event-recovery-spark-0.2.0.jar",
"--input",
"hdfs:///local/to-recover/",
"--output",
"good-events",
"--region",
"eu-central-1",
"--failedOutput",
"s3://bad-rows/left",
"--unrecoverableOutput",
"s3://bad-rows/left/unrecoverable",
"--resolver",
"eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5pZ2x1L3Jlc29sdmVyLWNvbmZpZy9qc29uc2NoZW1hLzEtMC0xIiwiZGF0YSI6eyJjYWNoZVNpemUiOjAsInJlcG9zaXRvcmllcyI6W3sibmFtZSI6ICJJZ2x1IENlbnRyYWwiLCJwcmlvcml0eSI6IDAsInZlbmRvclByZWZpeGVzIjogWyAiY29tLnNub3dwbG93YW5hbHl0aWNzIiBdLCJjb25uZWN0aW9uIjogeyJodHRwIjp7InVyaSI6Imh0dHA6Ly9pZ2x1Y2VudHJhbC5jb20ifX19LHsibmFtZSI6IlByaXYiLCJwcmlvcml0eSI6MCwidmVuZG9yUHJlZml4ZXMiOlsiY29tLnNub3dwbG93YW5hbHl0aWNzIl0sImNvbm5lY3Rpb24iOnsiaHR0cCI6eyJ1cmkiOiJodHRwczovL3Jhdy5naXRodWJ1c2VyY29udGVudC5jb20vcGVlbC9zY2hlbWFzL21hc3RlciJ9fX1dfX0=",
"--config",
"eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9yZWNvdmVyaWVzL2pzb25zY2hlbWEvMi0wLTEiLCJkYXRhIjp7ImlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93LmJhZHJvd3MvZW5yaWNobWVudF9mYWlsdXJlcy9qc29uc2NoZW1hLzEtMC0qIjpbeyJuYW1lIjoibWFpbi1mbG93IiwiY29uZGl0aW9ucyI6W10sInN0ZXBzIjpbXX1dLCJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy5iYWRyb3dzL2FkYXB0ZXJfZmFpbHVyZXMvanNvbnNjaGVtYS8xLTAtKiI6W3sibmFtZSI6Im1haW4tZmxvdyIsImNvbmRpdGlvbnMiOltdLCJzdGVwcyI6W119XSwiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cuYmFkcm93cy9zY2hlbWFfdmlvbGF0aW9ucy9qc29uc2NoZW1hLzEtMC0qIjpbeyJuYW1lIjoibWFpbi1mbG93IiwiY29uZGl0aW9ucyI6W10sInN0ZXBzIjpbXX1dLCJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy5iYWRyb3dzL3RyYWNrZXJfcHJvdG9jb2xfdmlvbGF0aW9ucy9qc29uc2NoZW1hLzEtMC0qIjpbeyJuYW1lIjoibWFpbi1mbG93IiwiY29uZGl0aW9ucyI6W10sInN0ZXBzIjpbXX1dfX0="

]
}

{
"type": "CUSTOM_JAR",
"name": "Back to S3",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"arguments": [
"--src",
"hdfs:///local/recovered/",
"--dest",
"s3n://${BUCKET_ID}/raw/"
]
}
],
"tags": [
{
"key": "client",
"value": "com.snowplowanalytics"
},
{
"key": "job",
"value": "recovery"
}
]
}
}

Run:

dataflow-runner run-transient --emr-config ./emr-config.json --emr-playbook ./emr-playbook.json
Was this page helpful?