Skip to main content

Querying failed events in Athena and BigQuery

Athena on AWS and BigQuery on GCP are tools that let you query your failed events, using the cloud storage files as a back-end data source.

SELECT data.failure.messages FROM adapter_failures
WHERE from_iso8601_timestamp(data.failure.timestamp) > timestamp '2020-04-01'

This approach is great for debugging your pipeline without the need to load your failed events into a separate database.

Before you can query this data, you need to create corresponding tables in Athena or BigQuery as we explain below. Each different failed event type (e.g. schema violations, adapter failures) has a different schema, so you will need one table per event type.

Creating the tables

Go to the Athena dashboard and use the query editor. Start by creating a database (replace {{ DATABASE }} with the name of your pipeline, e.g. prod1 or qa1):

CREATE DATABASE IF NOT EXISTS {{ DATABASE }}

Then run each sql statement provided in the badrows-tables repository by copying them into the Athena query editor. We recommend creating all tables, although you can skip the ones you are not interested in.

Placeholders

Note that the sql statements contain a few placeholders which you will need to edit before you can create the tables:

  • {{ DATABASE }} — as above, change this to the name of your pipeline, e.g. prod1 or qa1.
  • s3://{{ BUCKET }}/{{ PIPELINE }} — this should point to the directory in S3 where your bad rows files are stored.

Creating a table in Athena

Querying the data

As example of using your Athena tables, you might start by getting counts of each failed event type from the last week. Repeat this query for each table you have created:

SELECT COUNT(*) FROM schema_violations
WHERE from_iso8601_timestamp(data.failure.timestamp) > DATE_ADD('day', -7, now())

Athena query

If you have schema violations, you might want to find which tracker sent the event:

SELECT data.payload.enriched.app_id, COUNT(*) FROM schema_violations
WHERE from_iso8601_timestamp(data.failure.timestamp) > DATE_ADD('day', -7, now())
GROUP BY data.payload.enriched.app_id

You can do a deeper dive into the error messages to get a explanation of the last 10 failures:

SELECT message.field AS field,
message.value AS value,
message.error AS error,
message.json AS json,
message.schemaKey AS schemaKey,
message.schemaCriterion AS schemaCriterion
FROM schema_violations
CROSS JOIN UNNEST(data.failure.messages) AS t(message)
ORDER BY data.failure.timestamp DESC
LIMIT 10
Was this page helpful?