Skip to main content

Snowplow Utils Macros

caution

This page is auto-generated from our dbt packages, some information may be incomplete

Snowplow Utils

App Id Filter

macros/utils/app_id_filter.sql

Description

Generates a sql filter for the values in app_ids applied on the app_id column.

Arguments

  • app_ids (list): List of app_ids to filter to include

Returns

app_id in (...) if any app_ids are provided, otherwise true.

Usage

app_id_filter(['web', 'mobile', 'news'])

-- returns
app_id in ('web', 'mobile', 'news')

Details

Code
Source
{% macro app_id_filter(app_ids) %}

{%- if app_ids|length -%}

app_id in ('{{ app_ids|join("','") }}') --filter on app_id if provided

{%- else -%}

true

{%- endif -%}

{% endmacro %}

Referenced By

Cast To Tstamp

macros/utils/cross_db/timestamp_functions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro cast_to_tstamp(tstamp_literal) -%}
{% if tstamp_literal is none or tstamp_literal|lower in ['null',''] %}
cast(null as {{type_timestamp()}})
{% else %}
cast('{{tstamp_literal}}' as {{type_timestamp()}})
{% endif %}
{%- endmacro %}

Depends On

  • macro.dbt.type_timestamp

Referenced By

Coalesce Field Paths

macros/utils/bigquery/combine_column_versions/coalesce_field_paths.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro coalesce_field_paths(paths, field_alias, include_field_alias, relation_alias) %}

{% set relation_alias = '' if relation_alias is none else relation_alias~'.' %}

{% set field_alias = '' if not include_field_alias else ' as '~field_alias %}

{% set joined_paths = relation_alias~paths|join(', '~relation_alias) %}

{% set coalesced_field_paths = 'coalesce('~joined_paths~')'~field_alias %}

{{ return(coalesced_field_paths) }}

{% endmacro %}

Referenced By

Combine Column Versions

macros/utils/bigquery/combine_column_versions/combine_column_versions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro combine_column_versions(relation, column_prefix, required_fields=[], nested_level=none, level_filter='equalto', relation_alias=none, include_field_alias=true, array_index=0, max_nested_level=15, exclude_versions=[]) %}

{# Create field_alias if not supplied i.e. is not tuple #}
{% set required_fields_tmp = required_fields %}
{% set required_fields = [] %}
{% for field in required_fields_tmp %}
{% set field_tuple = snowplow_utils.get_field_alias(field) %}
{% do required_fields.append(field_tuple) %}
{% endfor %}

{% set required_field_names = required_fields|map(attribute=0)|list %}

{# Determines correct level_limit. This limits recursive iterations during unnesting. #}
{% set level_limit = snowplow_utils.get_level_limit(nested_level, level_filter, required_field_names) %}

{# Limit level_limit to max_nested_level if required #}
{% set level_limit = max_nested_level if level_limit is none or level_limit > max_nested_level else level_limit %}

{%- set matched_columns = snowplow_utils.get_columns_in_relation_by_column_prefix(relation, column_prefix) -%}

{# Removes excluded versions, assuming column name ends with a version of format 'X_X_X' #}
{%- set filter_columns_by_version = snowplow_utils.exclude_column_versions(matched_columns, exclude_versions) -%}

{%- set flattened_fields_by_col_version = [] -%}

{# Flatten fields within each column version. Returns nested arrays of dicts. #}
{# Dict: {'field_name': str, 'field_alias': str, 'flattened_path': str, 'nested_level': int #}
{% for column in filter_columns_by_version|sort(attribute='name', reverse=true) %}
{% set flattened_fields = snowplow_utils.flatten_fields(fields=column.fields,
parent=column,
path=column.name,
array_index=array_index,
level_limit=level_limit
) %}

{% do flattened_fields_by_col_version.append(flattened_fields) %}

{% endfor %}

{# Flatten nested arrays and merges fields across col version. Returns array of dicts containing all field_paths for field. #}
{# Dict: {'field_name': str, 'flattened_field_paths': str, 'nested_level': int #}
{% set merged_fields = snowplow_utils.merge_fields_across_col_versions(flattened_fields_by_col_version) %}

{# Filters merged_fields based on required_fields if provided, or the level filter if provided. Default return all fields. #}
{% set matched_fields = snowplow_utils.get_matched_fields(fields=merged_fields,
required_field_names=required_field_names,
nested_level=nested_level,
level_filter=level_filter
) %}

{% set coalesced_field_paths = [] %}

{% for field in matched_fields %}

{% set passed_field_alias = required_fields|selectattr(0, "equalto", field.field_name)|map(attribute=1)|list %}
{% set default_field_alias = field.field_name|replace('.', '_') %}
{# Use passed_field_alias from required_fields if supplied #}
{% set field_alias = default_field_alias if not passed_field_alias|length else passed_field_alias[0] %}

{# Coalesce each field's path across all version of columns, ordered by latest col version. #}
{% set coalesced_field_path = snowplow_utils.coalesce_field_paths(paths=field.field_paths,
field_alias=field_alias,
include_field_alias=include_field_alias,
relation_alias=relation_alias) %}

{% do coalesced_field_paths.append(coalesced_field_path) %}

{% endfor %}

{# Returns array of all coalesced field paths #}
{{ return(coalesced_field_paths) }}

{% endmacro %}

Depends On

Referenced By

Current Timestamp In Utc

macros/utils/cross_db/timestamp_functions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro current_timestamp_in_utc() -%}
{{ return(adapter.dispatch('current_timestamp_in_utc', 'snowplow_utils')()) }}
{%- endmacro %}

Depends On

  • macro.dbt.current_timestamp
  • macro.dbt.type_timestamp

Referenced By

Exclude Column Versions

macros/utils/bigquery/combine_column_versions/exclude_column_versions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro exclude_column_versions(columns, exclude_versions) %}
{%- set filtered_columns_by_version = [] -%}
{% for column in columns %}
{%- set col_version = column.name[-5:] -%}
{% if col_version not in exclude_versions %}
{% do filtered_columns_by_version.append(column) %}
{% endif %}
{% endfor %}

{{ return(filtered_columns_by_version) }}

{% endmacro %}

Referenced By

Flatten Fields

macros/utils/bigquery/combine_column_versions/flatten_fields.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro flatten_fields(fields, parent, path, array_index, level_limit=none, level_counter=1, flattened_fields=[], field_name='') %}

{% for field in fields %}

{# Only recurse up-until level_limit #}
{% if level_limit is not none and level_counter > level_limit %}
{{ return(flattened_fields) }}
{% endif %}

{# If parent column is an array then take element [array_index]. #}
{% set delimiter = '[safe_offset(%s)].'|format(array_index) if parent.mode == 'REPEATED' else '.' %}
{% set path = path~delimiter~field.name %}
{% set field_name = field_name~'.'~field.name if field_name != '' else field_name~field.name %}

{% set field_dict = {
'field_name': field_name,
'path': path,
'nested_level': level_counter
} %}

{% do flattened_fields.append(field_dict) %}

{# If field has nested fields recurse to extract all fields, unless array. #}
{% if field.dtype == 'RECORD' and field.mode != 'REPEATED' %}

{{ snowplow_utils.flatten_fields(
fields=field.fields,
parent=field,
level_limit=level_limit,
level_counter=level_counter+1,
path=path,
flattened_fields=flattened_fields,
field_name=field_name
) }}

{% endif %}

{% endfor %}

{{ return(flattened_fields) }}

{% endmacro %}

Depends On

Referenced By

Get Array To String

macros/utils/cross_db/get_array_to_string.sql

Description

This macro takes care of harmonising cross-db array to string type functions. The macro supports a custom delimiter if you don't want to use a comma with no space (default).

Arguments

  • array_column (string): Name of the column to join into a string
  • column_prefix (string): Table alias for the array_column
  • delimiter (string): (Optional) String that determines how to delimit your array values. Default ','

Returns

The data warehouse appropriate sql to convert an array to a string.

Usage

select
...
{{ snowplow_utils.get_array_to_string('my_array_column', 'a', ', ') }}
...
from ... a

Details

Code
Source


{%- macro get_array_to_string(array_column, column_prefix, delimiter=',') -%}
{{ return(adapter.dispatch('get_array_to_string', 'snowplow_utils')(array_column, column_prefix, delimiter)) }}
{%- endmacro -%}


Referenced By

Get Cluster By

macros/utils/get_cluster_by.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_cluster_by(bigquery_cols=none, snowflake_cols=none) %}

{%- do exceptions.warn("Warning: the `get_cluster_by` macro is deprecated and will be removed in a future version of the package, please use `get_value_by_target_type` instead.") -%}


{% if target.type == 'bigquery' %}
{{ return(bigquery_cols) }}
{% elif target.type == 'snowflake' %}
{{ return(snowflake_cols) }}
{% endif %}

{% endmacro %}

Get Columns In Relation By Column Prefix

macros/utils/get_columns_in_relation_by_column_prefix.sql

Description

This macro returns an array of column objects within a relation that start with the given column prefix. This is useful when you have multiple versions of a column within a table and want to dynamically identify all versions.

Arguments

  • relation (relation): A table or ref type object to get the columns from
  • column_prefix (string): The prefix string to search for matching columns

Returns

An array of (column objects)[https://docs.getdbt.com/reference/dbt-classes#column]. The name of each column can be accessed with the name property.

Usage

get_columns_in_relation_by_column_prefix(ref('snowplow_web_base_events_this_run'), 'domain')

-- returns
['domain_sessionid', 'domain_userid', 'domain_sessionidx',...]

{% set matched_columns = snowplow_utils.get_columns_in_relation_by_column_prefix(
relation=ref('snowplow_web_base_events_this_run'),
column_prefix='custom_context_1_0_'
) %}

{% for column in matched_columns %}
{{ column.name }}
{% endfor %}

<h1>Renders</h1>
to something like:
'custom_context_1_0_1'
'custom_context_1_0_2'
'custom_context_1_0_3'

Details

Code
Source
{% macro get_columns_in_relation_by_column_prefix(relation, column_prefix) %}

{# Prevent introspective queries during parsing #}
{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- set columns = adapter.get_columns_in_relation(relation) -%}

{# get_columns_in_relation returns uppercase cols for snowflake so uppercase column_prefix #}
{%- set column_prefix = column_prefix.upper() if target.type == 'snowflake' else column_prefix -%}

{%- set matched_columns = [] -%}

{# add columns with matching prefix to matched_columns #}
{% for column in columns %}
{% if column.name.startswith(column_prefix) %}
{% do matched_columns.append(column) %}
{% endif %}
{% endfor %}

{% if matched_columns|length %}
{{ return(matched_columns) }}
{% else %}
{{ exceptions.raise_compiler_error("Snowplow: No columns found with prefix "~column_prefix) }}
{% endif %}

{% endmacro %}

Referenced By

Get Enabled Snowplow Models

macros/incremental_hooks/get_enabled_snowplow_models.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_enabled_snowplow_models(package_name, graph_object=none, models_to_run=var("models_to_run","")) -%}

{# Override dbt graph object if graph_object is passed. Testing purposes #}
{% if graph_object is not none %}
{% set graph = graph_object %}
{% endif %}

{# models_to_run optionally passed using dbt ls command. This returns a string of models to be run. Split into list #}
{% if models_to_run|length %}
{% set selected_models = models_to_run.split(" ") %}
{% else %}
{% set selected_models = none %}
{% endif %}

{% set enabled_models = [] %}
{% set untagged_snowplow_models = [] %}
{% set snowplow_model_tag = package_name+'_incremental' %}
{% set snowplow_events_this_run_path = 'model.'+package_name+'.'+package_name+'_base_events_this_run' %}

{% if execute %}

{% set nodes = graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}

{% for node in nodes %}
{# If selected_models is specified, filter for these models #}
{% if selected_models is none or node.name in selected_models %}

{% if node.config.enabled and snowplow_model_tag not in node.tags and snowplow_events_this_run_path in node.depends_on.nodes %}

{%- do untagged_snowplow_models.append(node.name) -%}

{% endif %}

{% if node.config.enabled and snowplow_model_tag in node.tags %}

{%- do enabled_models.append(node.name) -%}

{% endif %}

{% endif %}

{% endfor %}

{% if untagged_snowplow_models|length %}
{#
Prints warning for models that reference snowplow_base_events_this_run but are untagged as 'snowplow_web_incremental'
Without this tagging these models will not be inserted into the manifest, breaking the incremental logic.
Only catches first degree dependencies rather than all downstream models
#}
{%- do exceptions.raise_compiler_error("Snowplow Warning: Untagged models referencing '"+package_name+"_base_events_this_run'. Please refer to the Snowplow docs on tagging. "
+ "Models: "+ ', '.join(untagged_snowplow_models)) -%}

{% endif %}

{% endif %}

{{ return(enabled_models) }}

{%- endmacro %}

Referenced By

Get Field Alias

macros/utils/bigquery/combine_column_versions/get_field_alias.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_field_alias(field) %}

{# Check if field is supplied as tuple e.g. (field_name, field_alias) #}
{% if field is iterable and field is not string %}
{{ return(field) }}
{% else %}
{{ return((field, field|replace('.', '_'))) }}
{% endif %}

{% endmacro %}

Referenced By

Get Incremental Manifest Status

macros/incremental_hooks/get_incremental_manifest_status.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_incremental_manifest_status(incremental_manifest_table, models_in_run) -%}

{# In case of not execute just return empty strings to avoid hitting database #}
{% if not execute %}
{{ return(['', '', '', '']) }}
{% endif %}

{% set target_relation = adapter.get_relation(
database=incremental_manifest_table.database,
schema=incremental_manifest_table.schema,
identifier=incremental_manifest_table.name) %}

{% if target_relation is not none %}

{% set last_success_query %}
select min(last_success) as min_last_success,
max(last_success) as max_last_success,
coalesce(count(*), 0) as models
from {{ incremental_manifest_table }}
where model in ({{ snowplow_utils.print_list(models_in_run) }})
{% endset %}

{% set results = run_query(last_success_query) %}

{% if execute %}

{% set min_last_success = results.columns[0].values()[0] %}
{% set max_last_success = results.columns[1].values()[0] %}
{% set models_matched_from_manifest = results.columns[2].values()[0] %}
{% set has_matched_all_models = true if models_matched_from_manifest == models_in_run|length else false %}

{{ return([min_last_success, max_last_success, models_matched_from_manifest, has_matched_all_models]) }}

{% endif %}


{% else %}

{% do exceptions.warn("Snowplow Warning: " ~ incremental_manifest_table ~ " does not exist. This is expected if you are compiling a fresh installation of the dbt-snowplow-* packages.") %}

{{ return(['9999-01-01 00:00:00', '9999-01-01 00:00:00', 0, false]) }}

{% endif %}


{%- endmacro %}

Depends On

Referenced By

Get Incremental Manifest Table Relation

macros/incremental_hooks/get_incremental_manifest_table_relation.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_incremental_manifest_table_relation(package_name) %}

{%- set incremental_manifest_table = ref(package_name~'_incremental_manifest') -%}

{{ return(incremental_manifest_table) }}

{% endmacro %}

Referenced By

Get Level Limit

macros/utils/bigquery/combine_column_versions/get_level_limit.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_level_limit(level, level_filter, required_field_names) %}

{% set accepted_level_filters = ['equalto','lessthan','greaterthan'] %}

{% if level_filter is not in accepted_level_filters %}
{% set incompatible_level_filter_error_message -%}
Error: Incompatible level filter arg. Accepted args: {{accepted_level_filters|join(', ')}}
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(incompatible_level_filter_error_message)) }}
{% endif %}

{% if level is not none and required_field_names|length %}
{% set double_filter_error_message -%}
Error: Cannot filter fields by both `required_fields` and `level` arg. Please use only one.
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(double_filter_error_message)) }}
{% endif %}

{% if required_field_names|length and level_filter != 'equalto' %}
{% set required_fields_error_message -%}
Error: To filter fields using `required_fields` arg, `level_filter` must be set to `equalto`
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(required_fields_error_message)) }}
{% endif %}

{# level_limit is inclusive #}

{% if level is not none %}

{% if level_filter == 'equalto' %}

{% set level_limit = level %}

{% elif level_filter == 'lessthan' %}

{% set level_limit = level -1 %}

{% elif level_filter == 'greaterthan' %}

{% set level_limit = none %}

{% endif %}

{% elif required_field_names|length %}

{% set field_depths = [] %}
{% for field in required_field_names %}
{% set field_depth = field.split('.')|length %}
{% do field_depths.append(field_depth) %}
{% endfor %}

{% set level_limit = field_depths|max %}

{% else %}

{# Case when selecting all available fields #}

{% set level_limit = none %}

{% endif %}

{{ return(level_limit) }}

{% endmacro %}

Depends On

Referenced By

Get Matched Fields

macros/utils/bigquery/combine_column_versions/get_matched_fields.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_matched_fields(fields, required_field_names, nested_level, level_filter) %}

{% if not required_field_names|length %}

{% if nested_level is none %}

{% set matched_fields = fields %}

{% else %}

{% set matched_fields = fields|selectattr('nested_level',level_filter, nested_level)|list %}

{% endif %}

{% else %}

{% set matched_fields = fields|selectattr('field_name','in', required_field_names)|list %}

{% endif %}

{{ return(matched_fields) }}

{% endmacro %}

Referenced By

Get New Event Limits Table Relation

macros/incremental_hooks/get_new_event_limits_table_relation.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_new_event_limits_table_relation(package_name) %}

{%- set new_event_limits_table = ref(package_name~'_base_new_event_limits') -%}

{{ return(new_event_limits_table) }}

{% endmacro %}

Referenced By

Get Optional Fields

macros/utils/bigquery/get_optional_fields.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_optional_fields(enabled, fields, col_prefix, relation, relation_alias, include_field_alias=true) -%}

{%- if enabled -%}

{%- set combined_fields = snowplow_utils.combine_column_versions(
relation=relation,
column_prefix=col_prefix,
required_fields=fields|map(attribute='field')|list,
relation_alias=relation_alias,
include_field_alias=include_field_alias
) -%}

{{ combined_fields|join(',\n') }}

{%- else -%}

{% for field in fields %}

{%- set field_alias = snowplow_utils.get_field_alias(field.field)[1] -%}

cast(null as {{ field.dtype }}) as {{ field_alias }} {%- if not loop.last %}, {% endif %}
{% endfor %}

{%- endif -%}

{% endmacro %}

Depends On

Referenced By

Get Partition By

macros/utils/get_partition_by.sql

Description

This macro does not currently have a description.

Details

Code
Source
{%- macro get_partition_by(bigquery_partition_by=none, databricks_partition_by=none) -%}

{%- do exceptions.warn("Warning: the `get_partition_by` macro is deprecated and will be removed in a future version of the package, please use `get_value_by_target_type` instead.") -%}

{% if target.type == 'bigquery' %}
{{ return(bigquery_partition_by) }}
{% elif target.type in ['databricks', 'spark'] %}
{{ return(databricks_partition_by) }}
{% endif %}

{%- endmacro -%}

Get Quarantine Sql

macros/incremental_hooks/quarantine_sessions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_quarantine_sql(relation, max_session_length) %}

{# Find sessions exceeding max_session_days #}
{% set quarantine_sql -%}

select
session_id

from {{ relation }}
-- '=' since end_tstamp is restricted to start_tstamp + max_session_days
where end_tstamp = {{ snowplow_utils.timestamp_add(
'day',
max_session_length,
'start_tstamp'
) }}

{%- endset %}

{{ return(quarantine_sql) }}

{% endmacro %}

Depends On

Referenced By

Get Run Limits

macros/incremental_hooks/get_run_limits.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_run_limits(min_last_success, max_last_success, models_matched_from_manifest, has_matched_all_models, start_date) -%}

{% set start_tstamp = snowplow_utils.cast_to_tstamp(start_date) %}
{% set min_last_success = snowplow_utils.cast_to_tstamp(min_last_success) %}
{% set max_last_success = snowplow_utils.cast_to_tstamp(max_last_success) %}

{% if not execute %}
{{ return('') }}
{% endif %}

{% if models_matched_from_manifest == 0 %}
{# If no snowplow models are in the manifest, start from start_tstamp #}
{% do snowplow_utils.log_message("Snowplow: No data in manifest. Processing data from start_date") %}

{% set run_limits_query %}
select {{start_tstamp}} as lower_limit,
least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), start_tstamp) }},
{{ snowplow_utils.current_timestamp_in_utc() }}) as upper_limit
{% endset %}

{% elif not has_matched_all_models %}
{# If a new Snowplow model is added which isnt already in the manifest, replay all events up to upper_limit #}
{% do snowplow_utils.log_message("Snowplow: New Snowplow incremental model. Backfilling") %}

{% set run_limits_query %}
select {{ start_tstamp }} as lower_limit,
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), start_tstamp) }}) as upper_limit
{% endset %}

{% elif min_last_success != max_last_success %}
{# If all models in the run exists in the manifest but are out of sync, replay from the min last success to the max last success #}
{% do snowplow_utils.log_message("Snowplow: Snowplow incremental models out of sync. Syncing") %}

{% set run_limits_query %}
select {{ snowplow_utils.timestamp_add('hour', -var("snowplow__lookback_window_hours", 6), min_last_success) }} as lower_limit,
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }}) as upper_limit
{% endset %}

{% else %}
{# Else standard run of the model #}
{% do snowplow_utils.log_message("Snowplow: Standard incremental run") %}

{% set run_limits_query %}
select
{{ snowplow_utils.timestamp_add('hour', -var("snowplow__lookback_window_hours", 6), min_last_success) }} as lower_limit,
least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }},
{{ snowplow_utils.current_timestamp_in_utc() }}) as upper_limit
{% endset %}

{% endif %}

{{ return(run_limits_query) }}

{% endmacro %}

Depends On

Referenced By

Get Schemas By Pattern

macros/utils/get_schemas_by_pattern.sql

Description

Given a pattern, finds and returns all schemas that match that pattern. Note that for databricks any single character matches (_) will not be properly translated due to databricks using a regex expression instead of a SQL like clause.

Arguments

  • schema_pattern (string): The pattern for the schema(s) you wish to find. For all non-databricks should be of the usual SQL like form. % will be automatically translated for databricks, but other special characters may not be

Returns

  A list of schemas that match the pattern provided.

Details

Code
Source
{% macro get_schemas_by_pattern(schema_pattern) %}
{{ return(adapter.dispatch('get_schemas_by_pattern', 'snowplow_utils')
(schema_pattern)) }}
{% endmacro %}

Depends On

  • macro.dbt.replace
  • macro.dbt.run_query
  • macro.dbt_utils.get_tables_by_pattern_sql

Referenced By

Get Sde Or Context

macros/utils/get_sde_or_context.sql

Description

This macro exists for Redshift and Postgres users to more easily select their self-describing event and context tables and apply de-duplication before joining onto their (already de-duplicated) events table. The root_id and root_tstamp columns are by default returned as schema_name_id and schema_name_tstamp respectively, where schema_name is the value in the schema_name column of the table. In the case where multiple entities may be sent in the context (e.g. products in a search results), you should set the single_entity argument to false and use an additional criteria in your join (see the snowplow docs for further details).

Note that is the responsibility of the user to ensure they have no duplicate names when using this macro multiple times or when a schema column name matches on already in the events table. In this case the prefix argument should be used and aliasing applied to the output.

Arguments

  • schema (string): The schema your context or sde table is in
  • identifier (string): The table name of your context or sde
  • lower_limit (string): Lower limit to filter the root_tstamp field on, only used if both lower and upper are provided
  • upper_limit (string): Upper limit to filter the root_tstamp field on, only used if both lower and upper are provided
  • prefix (string): A string to prefix (additional _ added automatically) the column names with. If not provided root_id and root_tstamp will be prefixed with the schema name.

Returns

CTE sql for deduplicating records from the schema table, without the schema details columns. The final CTE is the name of the original table.

Usage

With at most one entity per context:

with {{ snowplow_utils.get_sde_or_context('atomic', 'nl_basjes_yauaa_context_1', "'2023-01-01'", "'2023-02-01'")}}

select
...
from my_events_table a
left join nl_basjes_yauaa_context_1 b on
a.event_id = b.yauaa_context__id
and a.collector_tstamp = b.yauaa_context__tstamp

With the possibility of multiple entities per context, your events table must already be de-duped but still have a field with the number of duplicates:

with {{ snowplow_utils.get_sde_or_context('atomic', 'nl_basjes_yauaa_context_1', "'2023-01-01'", "'2023-02-01'", single_entity = false)}}

select
...
from my_events_table a
left join nl_basjes_yauaa_context_1 b on
a.event_id = b.yauaa_context__id
and a.collector_tstamp = b.yauaa_context__tstamp
and mod(b.yauaa_context__index, a.duplicate_count) = 0

Details

Code
Source
{% macro get_sde_or_context(schema, identifier, lower_limit, upper_limit, prefix = none, single_entity = true) %}
{{ return(adapter.dispatch('get_sde_or_context', 'snowplow_utils')(schema, identifier, lower_limit, upper_limit, prefix, single_entity)) }}
{% endmacro %}

Depends On

  • macro.dbt_utils.get_single_value

Get Session Lookback Limit

macros/incremental_hooks/get_session_lookback_limit.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_session_lookback_limit(lower_limit) %}

{% if not execute %}
{{ return('')}}
{% endif %}

{% set limit_query %}
select
{{ snowplow_utils.timestamp_add(
'day',
-var("snowplow__session_lookback_days", 365),
lower_limit) }} as session_lookback_limit

{% endset %}

{% set results = run_query(limit_query) %}

{% if execute %}

{% set session_lookback_limit = snowplow_utils.cast_to_tstamp(results.columns[0].values()[0]) %}

{{ return(session_lookback_limit) }}

{% endif %}

{% endmacro %}

Depends On

Referenced By

Get Snowplow Delete Insert Sql

macros/materializations/snowplow_incremental/common/get_snowplow_delete_insert_sql.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_snowplow_delete_insert_sql(target, source, unique_key, dest_cols_csv, predicates) -%}
{{ adapter.dispatch('get_snowplow_delete_insert_sql', 'snowplow_utils')(target, source, unique_key, dest_cols_csv, predicates) }}
{%- endmacro %}

Depends On

Get Snowplow Merge Sql

macros/materializations/snowplow_incremental/common/get_snowplow_merge_sql.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_snowplow_merge_sql(target, source, unique_key, dest_columns, predicates, include_sql_header) -%}
{{ adapter.dispatch('get_snowplow_merge_sql', 'snowplow_utils')(target, source, unique_key, dest_columns, predicates, include_sql_header) }}
{%- endmacro %}

Depends On

Get Snowplow Upsert Limits Sql

macros/materializations/snowplow_incremental/common/get_snowplow_upsert_limits_sql.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_snowplow_upsert_limits_sql(tmp_relation, upsert_date_key, disable_upsert_lookback) -%}
{{ adapter.dispatch('get_snowplow_upsert_limits_sql', 'snowplow_utils')(tmp_relation, upsert_date_key, disable_upsert_lookback) }}
{%- endmacro %}

Depends On

Get Split To Array

macros/utils/cross_db/get_split_to_array.sql

Description

This macro takes care of harmonising cross-db split to array type functions. The macro supports a custom delimiter if your string is not delimited by a comma with no space (default).

Arguments

  • string_column (string): Name of the column to split into an array
  • column_prefix (string): Table alias for the string_column
  • delimiter (string): (Optional) String that determines how to split your string. Default ','

Returns

The data warehouse appropriate sql to perform a split to array.

Usage

select
...
{{ snowplow_utils.get_split_to_array('my_string_column', 'a', ', ') }}
...
from ... a

Details

Code
Source


{%- macro get_split_to_array(string_column, column_prefix, delimiter=',') -%}
{{ return(adapter.dispatch('get_split_to_array', 'snowplow_utils')(string_column, column_prefix, delimiter)) }}
{%- endmacro -%}


Referenced By

Get String Agg

macros/utils/cross_db/get_string_agg.sql

Description

This macro takes care of harmonising cross-db list_agg, string_agg type functions. These are aggregate functions (i.e. to be used with a group by) that take values from grouped rows and concatenates them into a single string. This macro supports ordering values by an arbitrary column and ensuring unique values (i.e. applying distinct).

Note that databricks does not have list/string_agg function so a more complex expression is used.

Arguments

  • base_column (string): Name of the column to aggregate values for
  • column_prefix (string): Table alias for the base_column
  • separator (string): (Optional) String to use to separate your values. Default ','
  • order_by_column (string): (Optional) Column to order your values by before aggregating. Default base_column
  • sort_numeric (boolean): (Optional) Is the column you are ordering by a numeric value (regardless of stored type). Default false
  • order_by_column_prefix (string): (Optional) Table alias for the order_by_column. Default column_prefix
  • is_distinct (boolean): (Optional) Do you want to apply distinct to your values. Will be applied after ordering. Default false
  • order_desc (boolean): (Optional) Do you wish to apply the ordering descending. Default false

Returns

The data warehouse appropriate sql to perform a list/string_agg.

Usage

select
...
{{ snowplow_utils.get_string_agg('base_column', 'column_prefix', ';', 'order_by_col', sort_numeric=true, order_by_column_prefix='order_by_column_prefix', is_distict=True, order_desc=True) }},
...
from ...
group by ...

Details

Code
Source


{%- macro get_string_agg(base_column, column_prefix, separator=',', order_by_column=base_column, sort_numeric=false, order_by_column_prefix=column_prefix, is_distinct=false, order_desc=false) -%}

{{ return(adapter.dispatch('get_string_agg', 'snowplow_utils')(base_column, column_prefix, separator, order_by_column, sort_numeric, order_by_column_prefix, is_distinct, order_desc)) }}

{%- endmacro -%}


Referenced By

Get Successful Models

macros/incremental_hooks/get_successful_models.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_successful_models(models=[], run_results=results) -%}

{% set successful_models = [] %}
{# Remove the patch version from dbt version #}
{% set dbt_version_trunc = dbt_version.split('.')[0:2]|join('.')|float %}

{% if execute %}

{% for res in run_results -%}
{# Filter for models #}
{% if res.node.unique_id.startswith('model.') %}

{% set is_model_to_include = true if not models|length or res.node.name in models else false %}

{# run_results schema changed between dbt v0.18 and v0.19 so different methods to define success #}
{% if dbt_version_trunc <= 0.18 %}
{% set skipped = true if res.status is none and res.skip else false %}
{% set errored = true if res.status == 'ERROR' else false %}
{% set success = true if not (skipped or errored) else false %}
{% else %}
{% set success = true if res.status == 'success' else false %}
{% endif %}

{% if success and is_model_to_include %}

{%- do successful_models.append(res.node.name) -%}

{% endif %}

{% endif %}

{% endfor %}

{{ return(successful_models) }}

{% endif %}

{%- endmacro %}

Referenced By

Get Value By Target

macros/utils/get_value_by_target.sql

Description

This macro is designed to dynamically return values based on the target (target.name) you are running against. Your target names are defined in your profiles.yml file. This can be useful for dynamically changing variables within your project, depending on whether you are running in dev or prod.

Arguments

  • dev_value: Value to use if target is development
  • default_value: Value to use if target is not development
  • dev_target_name (string): (Optional) Name of the development target. Default dev

Returns

The value relevant to the target environment

Usage


# dbt_project.yml
...
vars:
snowplow_web:
snowplow__backfill_limit_days: "{{ snowplow_utils.get_value_by_target(dev_value=1, default_value=30, dev_target_name='dev') }}"

Details

Code
Source
{% macro get_value_by_target(dev_value, default_value, dev_target_name='dev') %}

{% if target.name == dev_target_name %}
{% set value = dev_value %}
{% else %}
{% set value = default_value %}
{% endif %}

{{ return(value) }}

{% endmacro %}

Referenced By

Get Value By Target Type

macros/utils/get_value_by_target_type.sql

Description

Returns the value provided based on the target.type. This is useful when you need a different value based on which warehouse is being used e.g. cluster fields or partition keys.

Arguments

  • bigquery_val (string): (Optional) Value to return if the target.type is bigquery. Default None
  • snowflake_val (string): (Optional) Value to return if the target.type is snowflake. Default None
  • redshift_val (string): (Optional) Value to return if the target.type is redshift. Default None
  • postgres_val (string): (Optional) Value to return if the target.type is postgres. Default None
  • databricks_val (string): (Optional) Value to return if the target.type is databricks. Default None

Returns

The appropriate value for the target warehouse type, or an error if not an expected target type.

Details

Code
Source
{%- macro get_value_by_target_type(bigquery_val=none, snowflake_val=none, redshift_val=none, postgres_val=none, databricks_val=none) -%}

{% if target.type == 'bigquery' %}
{{ return(bigquery_val) }}
{% elif target.type == 'snowflake' %}
{{ return(snowflake_val) }}
{% elif target.type == 'redshift' %}
{{ return(redshift_val) }}
{% elif target.type == 'postgres' %}
{{ return(postgres_val) }}
{% elif target.type in ['databricks', 'spark'] %}
{{ return(databricks_val) }}
{% else %}
{{ exceptions.raise_compiler_error("Snowplow: Unexpected target type "~target.type) }}
{% endif %}

{%- endmacro -%}

Referenced By

Is Run With New Events

macros/utils/is_run_with_new_events.sql

Description

This macro is designed for use with Snowplow data modelling packages like snowplow-web. It can be used in any incremental models, to effectively block the incremental model from being updated with old data which it has already consumed. This saves cost as well as preventing historical data from being overwritten with partially complete data (due to a batch back-fill for instance).

The macro utilizes the snowplow_[platform]_incremental_manifest table to determine whether the model from which the macro is called, i.e. {{ this }}, has already consumed the data in the given run. If it has, it returns false. If the data in the run contains new data, true is returned.

For the sessions lifecycle identifier it does not use the manifest as this table is not included in it.

Arguments

  • package_name (string): The modeling package name e.g. snowplow-mobile

Returns

true if the run contains new events previously not consumed by this, false otherwise.

Usage


{{
config(
materialized='incremental',
unique_key='screen_view_id',
upsert_date_key='start_tstamp'
)
}}

select
...

from {{ ref('snowplow_mobile_base_events_this_run' ) }}
where {{ snowplow_utils.is_run_with_new_events('snowplow_mobile') }} --returns false if run doesn't contain new events.

Details

Code
Source
{% macro is_run_with_new_events(package_name) %}

{%- set new_event_limits_relation = snowplow_utils.get_new_event_limits_table_relation(package_name) -%}
{%- set incremental_manifest_relation = snowplow_utils.get_incremental_manifest_table_relation(package_name) -%}

{% if is_incremental() %}

{%- set node_identifier = this.identifier -%}
{%- set base_sessions_lifecycle_identifier = package_name+'_base_sessions_lifecycle_manifest' -%}

{# base_sessions_lifecycle not included in manifest so query directly. Otherwise use the manifest for performance #}
{%- if node_identifier == base_sessions_lifecycle_identifier -%}
{#Technically should be max(end_tstsamp) but table is partitioned on start_tstamp so cheaper to use.
Worst case we update the manifest during a backfill when we dont need to, which should be v rare. #}
{% set has_been_processed_query %}
select
case when
(select upper_limit from {{ new_event_limits_relation }}) <= (select max(start_tstamp) from {{this}})
then false
else true end
{% endset %}

{%- else -%}

{% set has_been_processed_query %}
select
case when
(select upper_limit from {{ new_event_limits_relation }})
<= (select last_success from {{ incremental_manifest_relation }} where model = '{{node_identifier}}')
then false
else true end
{% endset %}

{%- endif -%}

{% set results = run_query(has_been_processed_query) %}

{% if execute %}
{% set has_new_events = results.columns[0].values()[0] | as_bool() %}
{# Snowflake: dbt 0.18 returns bools as ints. Ints are not accepted as predicates in Snowflake. Cast to be safe. #}
{% set has_new_events = 'cast('~has_new_events~' as boolean)' %}
{% endif %}

{% else %}

{% set has_new_events = true %}

{% endif %}

{{ return(has_new_events) }}

{% endmacro %}

Depends On

Referenced By

Log Message

macros/utils/log_message.sql

Description

A wrapper macro for the dbt_utils.pretty_log_format using the snowplow__has_log_enabled to determine if the log is also printed to the stdout.

Arguments

  • message (string): The string message to print.
  • is_printed (boolean): Boolean value to determine if the log is also printed to the stdout

Details

Code
Source
{% macro log_message(message, is_printed=var('snowplow__has_log_enabled', true)) %}
{{ log(dbt_utils.pretty_log_format(message), info=is_printed) }}
{% endmacro %}

Depends On

  • macro.dbt_utils.pretty_log_format

Referenced By

Materialization Snowplow Incremental Bigquery

macros/materializations/snowplow_incremental/bigquery/snowplow_incremental.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% materialization snowplow_incremental, adapter='bigquery' -%}

{%- set full_refresh_mode = (should_full_refresh()) -%}

{# Required keys. Throws error if not present #}
{%- set unique_key = config.require('unique_key') -%}
{%- set raw_partition_by = config.require('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}

{# Raise error if dtype is int64. Unsupported. #}
{% if partition_by.data_type == 'int64' %}
{%- set wrong_dtype_message -%}
Datatype int64 is not supported by 'snowplow_incremental'
Please use one of the following: timestamp | date | datetime
{%- endset -%}
{% do exceptions.raise_compiler_error(wrong_dtype_message) %}
{% endif %}

{% set disable_upsert_lookback = config.get('disable_upsert_lookback') %}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
{%- set tmp_relation = make_temp_relation(this) %}

{# Validate early so we dont run SQL if the strategy is invalid or missing keys #}
{% set strategy = snowplow_utils.snowplow_validate_get_incremental_strategy(config) -%}

{%- set cluster_by = config.get('cluster_by', none) -%}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}

{% set build_sql = snowplow_utils.snowplow_merge(
tmp_relation,
target_relation,
unique_key,
partition_by,
dest_columns,
disable_upsert_lookback) %}

{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}

{{ run_hooks(post_hooks) }}

{% set target_relation = this.incorporate(type='table') %}

{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

Depends On

Materialization Snowplow Incremental Databricks

macros/materializations/snowplow_incremental/databricks/snowplow_incremental.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% materialization snowplow_incremental, adapter='databricks' -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{# Required keys. Throws error if not present #}
{%- set unique_key = config.require('unique_key') -%}
{%- set upsert_date_key = config.require('upsert_date_key') -%}

{% set disable_upsert_lookback = config.get('disable_upsert_lookback') %}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{# Validate early so we dont run SQL if the strategy is invalid or missing keys #}
{% set strategy = snowplow_utils.snowplow_validate_get_incremental_strategy(config) -%}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}

{% set build_sql = snowplow_utils.snowplow_merge( tmp_relation,
target_relation,
unique_key,
upsert_date_key,
dest_columns,
disable_upsert_lookback)%}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% set target_relation = target_relation.incorporate(type='table') %}
{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

Depends On

Materialization Snowplow Incremental Default

macros/materializations/snowplow_incremental/default/snowplow_incremental.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% materialization snowplow_incremental, default -%}

{% set full_refresh_mode = flags.FULL_REFRESH %}

{# Required keys. Throws error if not present #}
{%- set unique_key = config.require('unique_key') -%}
{%- set upsert_date_key = config.require('upsert_date_key') -%}

{% set disable_upsert_lookback = config.get('disable_upsert_lookback') %}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
{% do adapter.drop_relation(backup_relation) %}

{% do adapter.rename_relation(target_relation, backup_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% do to_drop.append(backup_relation) %}
{% else %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{% set build_sql = snowplow_utils.snowplow_delete_insert(
tmp_relation,
target_relation,
unique_key,
upsert_date_key,
dest_columns,
disable_upsert_lookback) %}
{% endif %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{% do adapter.commit() %}

{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

Depends On

  • macro.dbt.create_indexes
  • macro.dbt.create_table_as
  • macro.dbt.load_relation
  • macro.dbt.make_temp_relation
  • macro.dbt.run_hooks
  • macro.dbt.run_query
  • macro.dbt.should_full_refresh
  • macro.dbt.statement
  • macro.snowplow_utils.snowplow_delete_insert

Materialization Snowplow Incremental Snowflake

macros/materializations/snowplow_incremental/snowflake/snowplow_incremental.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% materialization snowplow_incremental, adapter='snowflake' -%}

{% set original_query_tag = set_query_tag() %}

{%- set full_refresh_mode = (should_full_refresh()) -%}

{# Required keys. Throws error if not present #}
{%- set unique_key = config.require('unique_key') -%}
{%- set upsert_date_key = config.require('upsert_date_key') -%}

{% set disable_upsert_lookback = config.get('disable_upsert_lookback') %}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{# Validate early so we don't run SQL if the strategy is invalid or missing keys #}
{% set strategy = snowplow_utils.snowplow_validate_get_incremental_strategy(config) -%}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}

{% set build_sql = snowplow_utils.snowplow_snowflake_get_incremental_sql(strategy,
tmp_relation,
target_relation,
unique_key,
upsert_date_key,
dest_columns,
disable_upsert_lookback)%}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% set target_relation = target_relation.incorporate(type='table') %}
{% do persist_docs(target_relation, model) %}

{% do unset_query_tag(original_query_tag) %}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

Depends On

Materialization Snowplow Incremental Spark

macros/materializations/snowplow_incremental/spark/snowplow_incremental.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% materialization snowplow_incremental, adapter='spark' -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{# Required keys. Throws error if not present #}
{%- set unique_key = config.require('unique_key') -%}
{%- set upsert_date_key = config.require('upsert_date_key') -%}

{% set disable_upsert_lookback = config.get('disable_upsert_lookback') %}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{# Validate early so we dont run SQL if the strategy is invalid or missing keys #}
{% set strategy = snowplow_utils.snowplow_validate_get_incremental_strategy(config) -%}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}

{% set build_sql = snowplow_utils.snowplow_merge( tmp_relation,
target_relation,
unique_key,
upsert_date_key,
dest_columns,
disable_upsert_lookback)%}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% set target_relation = target_relation.incorporate(type='table') %}
{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

Depends On

Merge Fields Across Col Versions

macros/utils/bigquery/combine_column_versions/merge_fields_across_col_versions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro merge_fields_across_col_versions(fields_by_col_version) %}

{# Flatten nested list of dicts into single list #}
{% set all_cols = fields_by_col_version|sum(start=[]) %}

{% set all_field_names = all_cols|map(attribute="field_name")|list %}

{% set unique_field_names = all_field_names|unique|list %}

{% set merged_fields = [] %}

{% for field_name in unique_field_names %}

{# Get all field_paths per field. Returned as array. #}
{% set field_paths = all_cols|selectattr('field_name','equalto', field_name)|map(attribute='path')|list %}

{# Get nested_level of field. Returned as single element array. #}
{% set nested_level = all_cols|selectattr('field_name',"equalto", field_name)|map(attribute='nested_level')|list%}

{% set merged_field = {
'field_name': field_name,
'field_paths': field_paths,
'nested_level': nested_level[0]
} %}

{% do merged_fields.append(merged_field) %}

{% endfor %}

{{ return(merged_fields) }}

{% endmacro %}

Referenced By

N Timedeltas Ago

macros/utils/n_timedeltas_ago.sql

Description

This macro takes the current timestamp and subtracts n units, as defined by the timedelta_attribute, from it. This is achieved using the Python datetime module, rather than querying your database. By combining this with the get_value_by_target macro, you can dynamically set dates depending on your environment.

Arguments

  • n (integer): The number of timedeltas to subtract from the current timestamp
  • timedelta_attribute (string): The type of units to subtract. This can be any valid attribute of the timedelta object

Returns

Current timestamp minus n units.

Usage


{{ snowplow_utils.n_timedeltas_ago(1, 'weeks') }}

Details

Code
Source
{% macro n_timedeltas_ago(n, timedelta_attribute) %}

{% set arg_dict = {timedelta_attribute: n} %}
{% set now = modules.datetime.datetime.now() %}
{% set n_timedeltas_ago = (now - modules.datetime.timedelta(**arg_dict)) %}

{{ return(n_timedeltas_ago) }}

{% endmacro %}

Post Ci Cleanup

macros/utils/post_ci_cleanup.sql

Description

This macro deletes all schemas that start with the specified schema_pattern, mostly for use before/after CI testing to ensure a clean start and removal of data after CI tests.

Arguments

  • schema_pattern (string): The prefix of the schema(s) to delete

Details

Code
Source
{% macro post_ci_cleanup(schema_pattern=target.schema) %}

{# Get all schemas with the target.schema prefix #}
{% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern~'%') %}

{% if schemas|length %}

{%- if target.type in ['databricks', 'spark'] -%}
{# Generate sql to drop all identified schemas #}
{% for schema in schemas -%}
{%- set drop_schema_sql -%}
DROP SCHEMA IF EXISTS {{schema}} CASCADE;
{%- endset -%}

{% do run_query(drop_schema_sql) %}

{% endfor %}

{%- else -%}
{# Generate sql to drop all identified schemas #}
{% set drop_schema_sql -%}

{% for schema in schemas -%}
DROP SCHEMA IF EXISTS {{schema}} CASCADE;
{% endfor %}

{%- endset %}

{# Drop schemas #}
{% do run_query(drop_schema_sql) %}

{%- endif -%}

{% endif %}

{% endmacro %}

Depends On

Print List

macros/utils/print_list.sql

Description

Prints an array as a seperator separated quoted list.

Arguments

  • list (array): Array object to print the (quoted) items of
  • separator (string): The character(s) to separate the items by, default ,

Returns

Separated output of items in the list, quoted.

Details

Code
Source
{% macro print_list(list, separator = ',') %}

{%- for item in list %} '{{item}}' {%- if not loop.last %}{{separator}}{% endif %} {% endfor -%}

{% endmacro %}

Referenced By

Print Run Limits

macros/incremental_hooks/get_incremental_manifest_status.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro print_run_limits(run_limits_relation) -%}

{% set run_limits_query %}
select lower_limit, upper_limit from {{ run_limits_relation }}
{% endset %}

{# Derive limits from manifest instead of selecting from limits table since run_query executes during 2nd parse the limits table is yet to be updated. #}
{% set results = run_query(run_limits_query) %}

{% if execute %}

{% set lower_limit = snowplow_utils.tstamp_to_str(results.columns[0].values()[0]) %}
{% set upper_limit = snowplow_utils.tstamp_to_str(results.columns[1].values()[0]) %}
{% set run_limits_message = "Snowplow: Processing data between " + lower_limit + " and " + upper_limit %}

{% do snowplow_utils.log_message(run_limits_message) %}

{% endif %}

{%- endmacro %}

Depends On

Referenced By

Quarantine Sessions

macros/incremental_hooks/quarantine_sessions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro quarantine_sessions(package_name, max_session_length, src_relation=this) %}

{{ return(adapter.dispatch('quarantine_sessions', 'snowplow_utils')(package_name, max_session_length, src_relation=this)) }}

{% endmacro %}

Depends On

Referenced By

Return Base New Event Limits

macros/incremental_hooks/return_base_new_event_limits.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro return_base_new_event_limits(base_events_this_run) -%}

{# In case of not execute just return empty strings to avoid hitting database #}
{% if not execute %}
{{ return(['','',''])}}
{% endif %}

{% set target_relation = adapter.get_relation(
database=base_events_this_run.database,
schema=base_events_this_run.schema,
identifier=base_events_this_run.name) %}

{% if target_relation is not none %}

{% set limit_query %}
select
lower_limit,
upper_limit,
{{ snowplow_utils.timestamp_add('day',
-var("snowplow__max_session_days", 3),
'lower_limit') }} as session_start_limit

from {{ base_events_this_run }}
{% endset %}

{% set results = run_query(limit_query) %}

{% if execute %}

{% set lower_limit = snowplow_utils.cast_to_tstamp(results.columns[0].values()[0]) %}
{% set upper_limit = snowplow_utils.cast_to_tstamp(results.columns[1].values()[0]) %}
{% set session_start_limit = snowplow_utils.cast_to_tstamp(results.columns[2].values()[0]) %}

{{ return([lower_limit, upper_limit, session_start_limit]) }}

{% endif %}

{% else %}

{% do exceptions.warn("Snowplow Warning: " ~ base_events_this_run ~ " does not exist. This is expected if you are compiling a fresh installation of the dbt-snowplow-* packages.") %}

{% set dummy_limit = snowplow_utils.cast_to_tstamp('9999-01-01 00:00:00') %}

{{ return([dummy_limit, dummy_limit, dummy_limit]) }}

{% endif %}

{%- endmacro %}

Depends On

Referenced By

Return Limits From Model

macros/utils/return_limits_from_model.sql

Description

Calculates and returns the minimum (lower) and maximum (upper) values of specified columns within the specified table. Useful to find ranges of a column within a table.

Arguments

  • model (relation): A string or ref type object to refer to a model or table to return limits from
  • lower_limit_col (string): The column to take the min of to get the lower limit
  • upper_limit_col (string): The column to take the max of to get the upper limit

Returns

A list of two objects, the lower and upper values from the columns in the model

Details

Code
Source
{% macro return_limits_from_model(model, lower_limit_col, upper_limit_col) -%}

{# In case of not execute just return empty strings to avoid hitting database #}
{% if not execute %}
{{ return(['','']) }}
{% endif %}

{% set target_relation = adapter.get_relation(
database=model.database,
schema=model.schema,
identifier=model.name) %}

{% if target_relation is not none %}

{% set limit_query %}
select
min({{lower_limit_col}}) as lower_limit,
max({{upper_limit_col}}) as upper_limit
from {{ model }}
{% endset %}

{% set results = run_query(limit_query) %}

{% if execute %}

{# If there is no data within the limits, we should warn them otherwise they may be stuck here forever#}
{%- if results.columns[0].values()[0] is none or results.columns[1].values()[0] is none -%}
{# Currently warnings do not actually do anything other than text in logs, this makes it more visible https://github.com/dbt-labs/dbt-core/issues/6721 #}
{{ snowplow_utils.log_message("Snowplow Warning: *************") }}
{% do exceptions.warn("Snowplow Warning: No data in "~this~" for date range from variables, please modify your run variables to include data if this is not expected.") %}
{{ snowplow_utils.log_message("Snowplow Warning: *************") }}
{# This allows for bigquery to still run the same way the other warehouses do, but also ensures no data is processed #}
{% set lower_limit = snowplow_utils.cast_to_tstamp('9999-01-01 00:00:00') %}
{% set upper_limit = snowplow_utils.cast_to_tstamp('9999-01-02 00:00:00') %}
{%- else -%}
{% set lower_limit = snowplow_utils.cast_to_tstamp(results.columns[0].values()[0]) %}
{% set upper_limit = snowplow_utils.cast_to_tstamp(results.columns[1].values()[0]) %}
{%- endif -%}

{{ return([lower_limit, upper_limit]) }}

{% endif %}

{% else %}

{% do exceptions.warn("Snowplow Warning: " ~ model ~ " does not exist. This is expected if you are compiling a fresh installation of the dbt-snowplow-* packages.") %}
{% set dummy_limit = snowplow_utils.cast_to_tstamp('9999-01-01 00:00:00') %}

{{ return([dummy_limit, dummy_limit]) }}

{% endif %}


{% endmacro %}

Depends On

Referenced By

Set Query Tag

macros/utils/set_query_tag.sql

Description

This macro takes a provided statement as argument and generates the SQL command to set this statement as the query_tag for Snowflake databases, and does nothing otherwise. It can be used to safely set the query_tag regardless of database type.

Arguments

  • statement (string): The statement to use as the query_tag within Snowflake

Returns

An alter session command set to the query_tag to the statement for Snowflake, otherwise nothing

Usage


{{ snowplow_utils.set_query_tag('snowplow_query_tag') }}

Details

Code
Source
{%- macro set_query_tag(statement) -%}
{{ return(adapter.dispatch('set_query_tag', 'snowplow_utils')(statement)) }}
{%- endmacro -%}


Referenced By

Snowplow Delete From Manifest

macros/utils/snowplow_delete_from_manifest.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_delete_from_manifest(models, incremental_manifest_table) %}

{# Ensure models is a list #}
{%- if models is string -%}
{%- set models = [models] -%}
{%- endif -%}

{# No models to delete or not in execute mode #}
{% if not models|length or not execute %}
{{ return('') }}
{% endif %}

{# Get the manifest table to ensure it exits #}
{%- set incremental_manifest_table_exists = adapter.get_relation(incremental_manifest_table.database,
incremental_manifest_table.schema,
incremental_manifest_table.name) -%}

{%- if not incremental_manifest_table_exists -%}
{{return(dbt_utils.log_info("Snowplow: "+incremental_manifest_table|string+" does not exist"))}}
{%- endif -%}

{# Get all models in the manifest and compare to list of models to delete #}
{%- set models_in_manifest = dbt_utils.get_column_values(table=incremental_manifest_table, column='model') -%}
{%- set unmatched_models, matched_models = [], [] -%}

{%- for model in models -%}

{%- if model in models_in_manifest -%}
{%- do matched_models.append(model) -%}
{%- else -%}
{%- do unmatched_models.append(model) -%}
{%- endif -%}

{%- endfor -%}

{%- if not matched_models|length -%}
{{return(dbt_utils.log_info("Snowplow: None of the supplied models exist in the manifest"))}}
{%- endif -%}

{% set delete_statement %}
{%- if target.type in ['databricks', 'spark'] -%}
delete from {{ incremental_manifest_table }} where model in ({{ snowplow_utils.print_list(matched_models) }});
{%- else -%}
-- We don't need transaction but Redshift needs commit statement while BQ does not. By using transaction we cover both.
begin;
delete from {{ incremental_manifest_table }} where model in ({{ snowplow_utils.print_list(matched_models) }});
commit;
{%- endif -%}
{% endset %}

{%- do run_query(delete_statement) -%}

{%- if matched_models|length -%}
{% do snowplow_utils.log_message("Snowplow: Deleted models "+snowplow_utils.print_list(matched_models)+" from the manifest") %}
{%- endif -%}

{%- if unmatched_models|length -%}
{% do snowplow_utils.log_message("Snowplow: Models "+snowplow_utils.print_list(unmatched_models)+" do not exist in the manifest") %}
{%- endif -%}

{% endmacro %}

Depends On

Referenced By

Snowplow Delete Insert

macros/materializations/snowplow_incremental/common/snowplow_delete_insert.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_delete_insert(tmp_relation, target_relation, unique_key, upsert_date_key, dest_columns, disable_upsert_lookback) -%}
{{ adapter.dispatch('snowplow_delete_insert', 'snowplow_utils')(tmp_relation, target_relation, unique_key, upsert_date_key, dest_columns, disable_upsert_lookback) }}
{%- endmacro %}

Depends On

Referenced By

Snowplow Incremental Post Hook

macros/incremental_hooks/snowplow_incremental_post_hook.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_incremental_post_hook(package_name) %}

{% set enabled_snowplow_models = snowplow_utils.get_enabled_snowplow_models(package_name) -%}

{% set successful_snowplow_models = snowplow_utils.get_successful_models(models=enabled_snowplow_models) -%}

{% set incremental_manifest_table = snowplow_utils.get_incremental_manifest_table_relation(package_name) -%}

{% set base_events_this_run_table = ref(package_name~'_base_events_this_run') -%}

{{ snowplow_utils.update_incremental_manifest_table(incremental_manifest_table, base_events_this_run_table, successful_snowplow_models) }}

{% endmacro %}

Depends On

Snowplow Is Incremental

macros/materializations/snowplow_incremental/common/snowplow_is_incremental.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_is_incremental() %}
{#-- do not run introspective queries in parsing #}
{% if not execute %}
{{ return(False) }}
{% else %}

{%- set error_message = "Warning: the `snowplow_is_incremental` macro is deprecated as is the materialization, and should be replaced with dbt's `is_incremental` materialization. It will be removed completely in a future version of the package." -%}
{%- do exceptions.warn(error_message) -%}

{% set relation = adapter.get_relation(this.database, this.schema, this.table) %}
{{ return(relation is not none
and relation.type == 'table'
and model.config.materialized in ['incremental','snowplow_incremental']
and not should_full_refresh()) }}
{% endif %}
{% endmacro %}

Depends On

  • macro.dbt.should_full_refresh

Referenced By

Snowplow Merge

macros/materializations/snowplow_incremental/common/snowplow_merge.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_merge(tmp_relation, target_relation, unique_key, upsert_date_key, dest_columns, disable_upsert_lookback) -%}
{{ adapter.dispatch('snowplow_merge', 'snowplow_utils')(tmp_relation, target_relation, unique_key, upsert_date_key, dest_columns, disable_upsert_lookback) }}
{%- endmacro %}

Depends On

Referenced By

Snowplow Mobile Delete From Manifest

macros/utils/snowplow_delete_from_manifest.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_mobile_delete_from_manifest(models) %}

{{ snowplow_utils.snowplow_delete_from_manifest(models, ref('snowplow_mobile_incremental_manifest')) }}

{% endmacro %}

Depends On

Snowplow Snowflake Get Incremental Sql

macros/materializations/snowplow_incremental/snowflake/snowplow_incremental.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, upsert_date_key, dest_columns, disable_upsert_lookback) %}
{% if strategy == 'merge' %}
{% do return(snowplow_utils.snowplow_merge(tmp_relation, target_relation, unique_key, upsert_date_key, dest_columns, disable_upsert_lookback)) %}
{% elif strategy == 'delete+insert' %}
{% do return(snowplow_utils.snowplow_delete_insert(tmp_relation, target_relation, unique_key, upsert_date_key, dest_columns, disable_upsert_lookback)) %}
{% else %}
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
{% endif %}
{% endmacro %}

Depends On

Referenced By

Snowplow Validate Get Incremental Strategy

macros/materializations/snowplow_incremental/common/snowplow_validate_get_incremental_strategy.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_validate_get_incremental_strategy(config) -%}
{{ adapter.dispatch('snowplow_validate_get_incremental_strategy', 'snowplow_utils')(config) }}
{%- endmacro %}

Depends On

Referenced By

Snowplow Web Delete From Manifest

macros/utils/snowplow_delete_from_manifest.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro snowplow_web_delete_from_manifest(models) %}

{{ snowplow_utils.snowplow_delete_from_manifest(models, ref('snowplow_web_incremental_manifest')) }}

{% endmacro %}

Depends On

Throw Compiler Error

macros/utils/throw_compiler_error.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro throw_compiler_error(error_message, disable_error=var("snowplow__disable_errors", false)) %}

{% if disable_error %}

{{ return(error_message) }}

{% else %}

{{ exceptions.raise_compiler_error(error_message) }}

{% endif %}

{% endmacro %}

Referenced By

Timestamp Add

macros/utils/cross_db/timestamp_functions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro timestamp_add(datepart, interval, tstamp) %}
{{ return(adapter.dispatch('timestamp_add', 'snowplow_utils')(datepart, interval, tstamp)) }}
{% endmacro %}

Depends On

  • macro.dbt.dateadd

Referenced By

Timestamp Diff

macros/utils/cross_db/timestamp_functions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro timestamp_diff(first_tstamp, second_tstamp, datepart) %}
{{ return(adapter.dispatch('timestamp_diff', 'snowplow_utils')(first_tstamp, second_tstamp, datepart)) }}
{% endmacro %}

Depends On

  • macro.dbt.datediff

Referenced By

To Unixtstamp

macros/utils/cross_db/timestamp_functions.sql

Description

This macro does not currently have a description.

Details

Code
Source



{%- macro to_unixtstamp(tstamp) -%}
{{ adapter.dispatch('to_unixtstamp', 'snowplow_utils') (tstamp) }}
{%- endmacro %}

Depends On

Referenced By

Tstamp To Str

macros/utils/tstamp_to_str.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro tstamp_to_str(tstamp) -%}
'{{ tstamp.strftime("%Y-%m-%d %H:%M:%S") }}'
{%- endmacro %}

Referenced By

Type Max String

macros/utils/cross_db/datatypes.sql

Description

This macro does not currently have a description.

Details

Code
Source


{%- macro type_max_string() -%}
{{ return(adapter.dispatch('type_max_string', 'snowplow_utils')()) }}
{%- endmacro -%}


Depends On

  • macro.dbt.type_string

Referenced By

Unnest

macros/utils/cross_db/unnest.sql

Description

This macro does not currently have a description.

Details

Code
Source


{%- macro unnest(id_column, unnest_column, field_alias, source_table) -%}
{{ return(adapter.dispatch('unnest', 'snowplow_utils')(id_column, unnest_column, field_alias, source_table)) }}
{%- endmacro -%}


Referenced By

Update Incremental Manifest Table

macros/incremental_hooks/update_incremental_manifest_table.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro update_incremental_manifest_table(manifest_table, base_events_table, models) -%}

{{ return(adapter.dispatch('update_incremental_manifest_table', 'snowplow_utils')(manifest_table, base_events_table, models)) }}

{% endmacro %}

Referenced By

Was this page helpful?