Skip to main content

Configuration

Common parametersโ€‹

parameterdescription
concurrency.enrichOptional. Default: 256. Number of events that can get enriched at the same time within a chunk (events are processed by chunks in the app).
concurrency.sinkOptional. Default for enrich-pubsub and enrich-rabbitmq: 3. Default for enrich-kinesis: 1. Number of chunks that can get sunk at the same time. WARNING for enrich-kinesis: if greater than 1, records can get checkpointed before they are sunk.
assetsUpdatePeriodOptional. E.g. 7 days. Period after which enrich assets (e.g. the maxmind database for the IpLookups enrichment) should be checked for udpates. Assets will never be updated if this key is missing.
monitoring.sentry.dsnOptional. E.g. http://sentry.acme.com. To track uncaught runtime exceptions in Sentry.
monitoring.metrics.statsd.hostnameOptional. E.g. localhost. Hostname of the StatsD server to send enrichment metrics (latency and event counts) to.
monitoring.metrics.statsd.portOptional. E.g. 8125. Port of the StatsD server.
monitoring.metrics.statsd.periodOptional. E.g. 10 seconds. How frequently to send metrics to StatsD server.
monitoring.metrics.statsd.tagsOptional. E.g. { "env": "prod" }. Key-value pairs attached to each metric sent to StatsD to provide contextual information.
monitoring.metrics.statsd.prefixOptional. Default: snowplow.enrich. Pefix of StatsD metric names.
monitoring.metrics.stdout.periodOptional. E.g. 10 seconds. If set, metrics will be printed in the logs with this frequency.
monitoring.metrics.stdout.prefixOptional. Default: snowplow.enrich. Prefix for the metrics appearing in the logs.
telemetry.disableOptional. Set to true to disable telemetry.
telemetry.userProvidedIdOptional. See here for more information.
featureFlags.acceptInvalidOptional. Default: false. Enrich 3.0.0 introduces the validation of the enriched events against atomic schema before emitting. If set to false, a bad row will be emitted instead of the enriched event if validation fails. If set to true, invalid enriched events will be emitted, as before.
featureFlags.legacyEnrichmentOrderOptional. Default: false. In early versions of enrich-kinesis and enrich-pubsub (>= 3.1.5), the Javascript enrichment incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true to keep the erroneous behavior of those previous versions.

Instead of a message queue, it's also possible to read collector payloads from files on disk. This can be used for instance for testing purposes. In this case the configuration needs to be as below.

parameterdescription
input.typeRequired. Must be FileSystem.
input.dirRequired. E.g. /input/collectorPayloads/. Directory containing collector payloads encoded with Thrift.

Likewise, it's possible to write enriched events, pii events and bad rows to files instead of PubSub or Kinesis.

To write enriched events to files:

parameterdescription
output.good.typeRequired. Must be FileSystem.
output.good.fileRequired. E.g. /output/enriched. File where enriched events will be written.
output.good.maxBytesOptional. E.g. 1048576. Maximum size of a file in bytes. Triggers file rotation.

To write bad rows to files:

parameterdescription
output.bad.typeRequired. Must be FileSystem.
output.bad.fileRequired. E.g. /output/badRows. File where bad rows will be written.
output.bad.maxBytesOptional. E.g. 1048576. Maximum size of a file in bytes. Triggers file rotation.

To write pii events to files:

parameterdescription
output.pii.typeRequired. Must be FileSystem.
output.pii.fileRequired. E.g. /output/pii. File where pii events will be written.
output.pii.maxBytesOptional. E.g. 1048576. Maximum size of a file in bytes. Triggers file rotation.

enrich-pubsubโ€‹

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

parameterdescription
input.subscriptionRequired. E.g. projects/example-project/subscriptions/collectorPayloads. PubSub subscription identifier for the collector payloads.
input.parallelPullCountOptional. Default: 1. Number of threads used internally by permutive library to handle incoming messages. These threads do very little "work" apart from writing the message to a concurrent queue.
input.maxQueueSizeOptional. Default: 3000. Configures the "max outstanding element count" of PubSub. This is the principal way we control concurrency in the app; it puts an upper bound on the number of events in memory at once. An event counts towards this limit starting from when it received by the permutive library, until we ack it (after publishing to output). The value must be large enough that it does not cause the sink to block whilst it is waiting for a batch to be completed. The first of maxQueueSize and maxRequestBytes being reached will pause the consumption.
input.maxRequestBytesOptional. Default: 50000000 (50MB). Configures the "maximum outstanding request bytes" of PubSub subscriber. It puts an upper bound on the events' bytes that can be hold in memory at once before getting acked. The value must be large enough to not cause the sink to block whilst it is waiting for a batch to be completed. The first of maxQueueSize and maxRequestBytes being reached will pause the consumption.
input.maxAckExtensionPeriodOptional. Default: 1 hour. Maximum period a message ack deadline can be extended. A zero duration disables auto deadline extension.
output.good.topicRequired. E.g. projects/example-project/topics/enriched. Name of the PubSub topic that will receive the enriched events.
output.good.attributesOptional. Enriched event fields to add as PubSub message attributes. For example, if this is [ "app_id" ] then the enriched event's app_id field will be an attribute of the PubSub message, as well as being a field within the enriched event.
output.good.delayThresholdOptional. Default: 200 milliseconds. Delay threshold to use for batching. After this amount of time has elapsed, before maxBatchSize and maxBatchBytes have been reached, messages from the buffer will be sent.
output.good.maxBatchSizeOptional. Default: 1000 (PubSub maximum). Maximum number of messages sent within a batch. When the buffer reaches this number of messages they are sent.
output.good.maxBatchBytesOptional. Default: 8000000 (PubSub maximum is 10MB). Maximum number of bytes sent within a batch. When the buffer reaches this size messages are sent.
output.bad.topicRequired. E.g. projects/example-project/topics/badrows. Name of the PubSub topic that will receive the bad rows.
output.bad.delayThresholdSame as output.good.delayThreshold for bad rows.
output.bad.maxBatchSizeSame as output.good.maxBatchSize for bad rows.
output.bad.maxBatchBytesSame as output.good.maxBatchBytes for bad rows.
output.pii.topicOptional. Example: projects/test-project/topics/pii. Should be used in conjunction with the PII pseudonymization enrichment. When configured, enables an extra output topic for writing a pii_transformation event.
output.pii.attributesSame as output.good.attributes for pii events.
output.pii.delayThresholdSame as output.good.delayThreshold for pii events.
output.pii.maxBatchSizeSame as output.good.maxBatchSize for pii events.
output.pii.maxBatchBytesSame as output.good.maxBatchBytes for pii events.

enrich-kinesisโ€‹

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

parameterdescription
input.appNameOptional. Default: snowplow-enrich-kinesis. Name of the application which the KCL daemon should assume. A DynamoDB table with this name will be created.
input.streamNameRequired. E.g. raw. Name of the Kinesis stream with the collector payloads to read from.
input.regionOptional. E.g. eu-central-1. Region where the Kinesis stream is located. This field is optional if it can be resolved with AWS region provider chain. It checks places like env variables, system properties, AWS profile file.
input.initialPosition.typeOptional. Default: TRIM_HORIZON. Set the initial position to consume the Kinesis stream. Possible values: LATEST (most recent data), TRIM_HORIZON (oldest available data), AT_TIMESTAMP (start from the record at or after the specified timestamp).
input.initialPosition.timestampRequired for AT_TIMESTAMP. E.g. 2020-07-17T10:00:00Z.
input.retrievalMode.typeOptional. Default: Polling. Set the mode for retrieving records. Possible values: Polling or FanOut.
input.retrievalMode.maxRecordsRequired for Polling. Default: 10000. Maximum size of a batch returned by a call to getRecords. Records are checkpointed after a batch has been fully processed, thus the smaller maxRecords, the more often records can be checkpointed into DynamoDb, but possibly reducing the throughput.
input.bufferSizeOptional. Default: 3. Size of the internal buffer used when reading messages from Kinesis, each buffer holding up to maxRecords from above.
input.customEndpointOptional. E.g. http://localhost:4566. Endpoint url configuration to override aws kinesis endpoints. Can be used to specify local endpoint when using localstack.
input.dynamodbCustomEndpointOptional. E.g. http://localhost:4566. Endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table. Can be used to specify local endpoint when using localstack.
input.cloudwatchCustomEndpointOptional. E.g. http://localhost:4566. Endpoint url configuration to override aws cloudwatch endpoint for metrics. Can be used to specify local endpoint when using localstack.
output.good.streamNameRequired. E.g. enriched. Name of the Kinesis stream to write to the enriched events.
output.good.regionSame as input.region for enriched events stream.
output.good.partitionKeyOptional. How the output stream will be partitioned in Kinesis. Events with the same partition key value will go to the same shard. Possible values: event_id, event_fingerprint, domain_userid, network_userid, user_ipaddress, domain_sessionid, user_fingerprint. If not specified, the partition key will be a random UUID.
output.good.backoffPolicy.minBackoffOptional. Default: 100 milliseconds. Minimum backoff before retrying when writing fails with internal errors.
output.good.backoffPolicy.maxBackoffOptional. Default: 10 seconds. Maximum backoff before retrying when writing fails with internal errors.
output.good.backoffPolicy.maxRetriesOptional. Default: 10. Maximum number of retries for internal errors.
output.good.throttledBackoffPolicy.minBackoff (since 3.4.1)Optional. Default: 100 milliseconds. Minimum backoff before retrying when writing fails in case of throughput exceeded.
output.good.throttledBackoffPolicy.maxBackoff (since 3.4.1)Optional. Default: 1 second. Maximum backoff before retrying when writing fails in case of throughput exceeded. Writing is retried forever.
output.good.recordLimitOptional. Default: 500 (maximum allowed). Limits the number of events in a single PutRecords request. Several requests are made in parallel.
output.good.customEndpointOptional. E.g. http://localhost:4566. To use a custom Kinesis endpoint.
output.bad.streamNameRequired. E.g. bad. Name of the Kinesis stream to write to the bad rows.
output.bad.regionSame as output.good.region for bad rows.
output.bad.backoffPolicy.minBackoffSame as output.good.backoffPolicy.minBackoff for bad rows.
output.bad.backoffPolicy.maxBackoffSame as output.good.backoffPolicy.maxBackoff for bad rows.
output.bad.backoffPolicy.maxRetriesSame as output.good.backoffPolicy.maxRetries for bad rows.
output.bad.throttledBackoffPolicy.minBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.minBackoff for bad rows.
output.bad.throttledBackoffPolicy.maxBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.maxBackoff for bad rows.
output.bad.recordLimitSame as output.good.recordLimit for bad rows.
output.bad.customEndpointSame as output.good.customEndpoint for pii events.
output.pii.streamNameOptional. E.g. pii. Should be used in conjunction with the PII pseudonymization enrichment. When configured, enables an extra output stream for writing a pii_transformation event.
output.pii.regionSame as output.good.region for pii events.
output.pii.partitionKeySame as output.good.partitionKey for pii events.
output.pii.backoffPolicy.minBackoffSame as output.good.backoffPolicy.minBackoff for pii events.
output.pii.backoffPolicy.maxBackoffSame as output.good.backoffPolicy.maxBackoff for pii events.
output.pii.backoffPolicy.maxRetriesSame as output.good.backoffPolicy.maxRetries for pii events.
output.pii.throttledBackoffPolicy.minBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.minBackoff for pii events.
output.pii.throttledBackoffPolicy.maxBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.maxBackoff for pii events.
output.pii.recordLimitSame as output.good.recordLimit for pii events.
output.pii.customEndpointSame as output.good.customEndpoint for pii events.

enrich-kafkaโ€‹

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

parameterdescription
input.topicNameRequired. Name of the Kafka topic to read collector payloads from.
input.bootstrapServersRequired. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
input.consumerConfOptional. Kafka consumer configuration. See the docs for all properties.
output.good.topicNameRequired. Name of the Kafka topic to write to
output.good.bootstrapServersRequired. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
output.good.producerConfOptional. Kafka producer configuration. See the docs for all properties
output.good.partitionKeyOptional. Enriched event field to use as Kafka partition key
output.good.headersOptional. Enriched event fields to add as Kafka record headers
output.pii.topicNameOptional. Name of the Kafka topic to write to
output.pii.bootstrapServersOptional. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
output.pii.producerConfOptional. Kafka producer configuration. See the docs for all properties
output.pii.partitionKeyOptional. Enriched event field to use as Kafka partition key
output.pii.headersOptional. Enriched event fields to add as Kafka record headers
output.bad.topicNameOptional. Name of the Kafka topic to write to
output.bad.bootstrapServersOptional. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
output.bad.producerConfOptional. Kafka producer configuration. See the docs for all properties

enrich-rabbitmq-experimentalโ€‹

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

parameterdescription
input.queueRequired. E.g. raw. Queue to read collector payloads from.
input.cluster.nodes.hostRequired. E.g. localhost. Hostname of RabbitMQ cluster node.
input.cluster.nodes.portRequired. E.g. 5672. Port of RabbitMQ cluster node.
input.cluster.usernameRequired. E.g. guest. Username to connect to the cluster.
input.cluster.passwordRequired. E.g. guest. Password to connect to the cluster.
input.cluster.virtualHostRequired. E.g. "/". Virtual host to use when connecting to the cluster.
input.cluster.sslOptional. Default: false. Whether to use SSL or not to communicate with the cluster.
input.cluster.connectionTimeoutOptional. Default: 5. Timeout for the connection to the cluster (in seconds).
input.cluster.internalQueueSizeOptional. Default: 1000. Size of the fs2โ€™s bounded queue used internally to communicate with the AMQP Java driver.
input.cluster.automaticRecoveryOptional. Default: true. Whether the AMQP Java driver should try to recover broken connections.
input.cluster.requestedHeartbeatOptional. Default: 100. Interval to check that the TCP connection to the cluster is still alive.
input.checkpointBackoff.minBackoffOptional. Default: 100 ms. Minimum period before retrying to checkpoint.
input.checkpointBackoff.maxBackoffOptional. Default: 10 seconds. Maximum period to retry checkpoint.
input.checkpointBackoff.maxRetriesOptional. Default: 10. Maximum number of retries for checkpointing.
output.good.exchangeRequired. E.g. enriched. Exchange to send the enriched events to.
output.good.routingKeyRequired. E.g. routingKey. Routing key to use when sending the enriched events to the exchange.
output.good.cluster.nodes.hostRequired. E.g. localhost. Hostname of RabbitMQ cluster node.
output.good.cluster.nodes.portRequired. E.g. 5672. Port of RabbitMQ cluster node.
output.good.cluster.usernameRequired. E.g. guest. Username to connect to the cluster.
output.good.cluster.passwordRequired. E.g. guest. Password to connect to the cluster.
output.good.cluster.virtualHostRequired. E.g. "/". Virtual host to use when connecting to the cluster.
output.good.cluster.sslOptional. Default: false. Whether to use SSL or not to communicate with the cluster.
output.good.cluster.connectionTimeoutOptional. Default: 5. Timeout for the connection to the cluster (in seconds).
output.good.cluster.internalQueueSizeOptional. Default: 1000. Size of the fs2โ€™s bounded queue used internally to communicate with the AMQP Java driver.
output.good.cluster.automaticRecoveryOptional. Default: true. Whether the AMQP Java driver should try to recover broken connections.
output.good.cluster.requestedHeartbeatOptional. Default: 100. Interval to check that the TCP connection to the cluster is still alive.
output.good.backoffPolicy.minBackoffOptional. Default: 100 ms. Minimum period before retrying if writing to RabbitMQ fails.
output.good.backoffPolicy.maxBackoffOptional. Default: 10 seconds. Maximum period before retrying if writing to RabbitMQ fails.
output.good.backoffPolicy.retriesOptional. Default: 10. Maximum number of retry if writing to RabbitMQ fails. If maxRetries is reached the app crashes.
output.bad.exchangeLike output.good.exchange for bad rows.
output.bad.routingKeyLike output.good.routingKey for bad rows.
output.bad.cluster.nodes.hostLike output.good.cluster.nodes.host for bad rows.
output.bad.cluster.nodes.portLike output.good.cluster.nodes.port for bad rows.
output.bad.cluster.usernameLike output.good.cluster.username for bad rows.
output.bad.cluster.passwordLike output.good.cluster.password for bad rows.
output.bad.cluster.virtualHostLike output.good.cluster.virtualHost for bad rows.
output.bad.cluster.sslLike output.good.cluster.ssl for bad rows.
output.bad.cluster.connectionTimeoutLike output.good.cluster.connectionTimeout for bad rows.
output.bad.cluster.internalQueueSizeLike output.good.cluster.internalQueueSize for bad rows.
output.bad.cluster.automaticRecoveryLike output.good.cluster.automaticRecovery for bad rows.
output.bad.cluster.requestedHeartbeatLike output.good.cluster.requestedHeartbeat for bad rows.
output.bad.backoffPolicy.minBackoffLike output.good.backoffPolicy.minBackoff for bad rows.
output.bad.backoffPolicy.maxBackoffLike output.good.backoffPolicy.maxBackoff for bad rows.
output.bad.backoffPolicy.retriesLike output.good.backoffPolicy.retries for bad rows.
output.pii.exchangeLike output.good.exchange for pii events.
output.pii.routingKeyLike output.good.routingKey for pii events.
output.pii.cluster.nodes.hostLike output.good.cluster.nodes.host for pii events.
output.pii.cluster.nodes.portLike output.good.cluster.nodes.port for pii events.
output.pii.cluster.usernameLike output.good.cluster.username for pii events.
output.pii.cluster.passwordLike output.good.cluster.password for pii events.
output.pii.cluster.virtualHostLike output.good.cluster.virtualHost for pii events.
output.pii.cluster.sslLike output.good.cluster.ssl for pii events.
output.pii.cluster.connectionTimeoutLike output.good.cluster.connectionTimeout for pii events.
output.pii.cluster.internalQueueSizeLike output.good.cluster.internalQueueSize for pii events.
output.pii.cluster.automaticRecoveryLike output.good.cluster.automaticRecovery for pii events.
output.pii.cluster.requestedHeartbeatLike output.good.cluster.requestedHeartbeat for pii events.
output.pii.backoffPolicy.minBackoffLike output.good.backoffPolicy.minBackoff for pii events.
output.pii.backoffPolicy.maxBackoffLike output.good.backoffPolicy.maxBackoff for pii events.
output.pii.backoffPolicy.retriesLike output.good.backoffPolicy.retries for pii events.

Enriched events validation against atomic schemaโ€‹

Enriched events are expected to match atomic schema. However, until 3.0.0, it was never checked that the enriched events emitted by enrich were valid. If an event is not valid against atomic schema, a bad row should be emitted instead of the enriched event. However, this is a breaking change, and we want to give some time to users to adapt, in case today they are working downstream with enriched events that are not valid against atomic. For this reason, this new validation was added as a feature that can be deactivated like that:

"featureFlags": {
"acceptInvalid": true
}

In this case, enriched events that are not valid against atomic schema will still be emitted as before, so that enrich 3.0.0 can be fully backward compatible. It will be possible to know if the new validation would have had an impact by 2 ways:

  1. A new metric invalid_enriched has been introduced. It reports the number of enriched events that were not valid against atomic schema. As the other metrics, it can be seen on stdout and/or StatsD.
  2. Each time there is an enriched event invalid against atomic schema, a line will be logged with the bad row (add -Dorg.slf4j.simpleLogger.log.InvalidEnriched=debug to the JAVA_OPTS to see it).

If acceptInvalid is set to false, a bad row will be emitted instead of the enriched event in case it's not valid against atomic schema.

When we'll know that all our customers don't have any invalid enriched events any more, we'll remove the feature flags and it will be impossible to emit invalid enriched events.

Enrichmentsโ€‹

The list of the enrichments that can be configured can be found on this page.

Was this page helpful?