Skip to content
Anton Parkhomenko edited this page Nov 13, 2018 · 14 revisions

Overview

This is a project to load Snowplow enriched events into the Google BigQuery.

Technical architecture

This application consists of two independent apps:

  1. Snowplow BigQuery Loader, an Apache Beam job which reads Snowplow enriched data from Google PubSub, transform into BigQuery-friendly format and loads it. Also it writes data into auxiliary types PubSub topic
  2. Snowplow BigQuery Mutator, a Scala app which reads types PubSub topic and performs necessary table mutations to add new columns
  3. Snoplow BigQuery Forwarder, an auxiliary Apache Beam job used to recover data from failedInserts topic (e.g. due mutation lag)

Snowplow BigQuery Loader

Overview

An Apache Beam job intended to run on Google Dataflow and load enriched data from enriched PubSub topic to Google BigQuery.

Algorithm

The Loader:

  • Reads Snowplow enriched events from input PubSub subscription
  • Uses the JSON transformer from the Snowplow Scala Analytics SDK to convert those enriched events into JSONs
  • Uses Iglu Client to fetch JSON Schemas for contexts and self-describing events
  • Uses Iglu Schema DDL to transform contexts and self-describing events into BigQuery format
  • Writes transformed data into BigQuery
  • Writes all encountered iglu types into types PubSub topic
  • Writes all data failed to be processed into badRows PubSub topic
  • Write data that succeeded to be transformed, but failed to be loaded into failedInserts topic

Snowplow BigQuery Mutator

Overview

This is a Scala app which reads data from types PubSub topic and performs table mutations.

Algorithm

The algorithm is as follows:

  • Read messages from types topic
  • Find out if message contains a type that has not been encountered yet (by checking internal cache)
  • If message contains new type - double-check it with connected BigQuery table
  • If type is not in the table - fetch its JSON Schema from Iglu Registry
  • Transform JSON Schema into BigQuery column definition
  • Add column to connected BigQuery table

Snowplow BigQuery Forwarder

An Apache Beam job intended to run on Google Dataflow and load enriched data from failedInserts into BigQuery.

Overview

This is a very generic and primitive Dataflow job. It could be done using standard Dataflow template jobs. But standard template job cannot accept subscription, only topic, which means it must be running all the time and 99% of time it will be idle.

Note: Forwarder is a streaming job, which means it won't terminate automatically once it loads everything to BigQuery - it must be done manually once it process enough records.

Mutation lag

BigQuery Loader inserts data into BigQuery in near real-time. At the same time, it sinks shredded_type payloads into types topic approximately every 5 seconds. It also can take up to 10-15 seconds for Mutator to fetch, parse message and execute alter table statement against the table.

If new type arrives from input subscription in this period of time and mutator fails to handle it - BigQuery rejects a row containing it and sends it to failedInserts topic. This topic contains JSON objects ready to be loaded to BigQuery (i.e. not canonical Snowplow Enriched event format).

In order to load this data again from failedInserts to BigQuery you can use Forwarder job. It reads a subscription from failedInserts and simply performs insert statements.

Topics and message formats

Snowplow BigQuery Loader uses Google PubSub topics and subscriptions to store intermeidate data and communicate between applications.

  • input subscription - data enriched by Beam Enrich, in canonical TSV+JSON format
  • types topic - all shredded types in iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0 self-describing payload encountered by Snowplow Loader are sinked here with ~5 seconds interval
  • typesSubscription subscription - subscription to types topic used by Mutator with iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0 self-describing payload
  • badRows topic - data that could not be processed by Loader due Iglu Registry unavailability with bad rows format
  • failedInserts topic - data that has been successfully transformed by Loader, but failed loading to BigQuery usually due mutation lag in BigQuery JSON format
Clone this wiki locally