Skip to content

Demo Prefect for building a JSON to PostgreSQL Pipeline.

Notifications You must be signed in to change notification settings

yaseenlotfi/prefect-etl-demo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Order Data ETL with Prefect

Setup a daily pipeline to ingest raw Order API data into a PostgreSQL db.

TODOs:

  1. Define schema with SQLAlchemy
    • Pandas inference isn't great
    • Currently forces analysts to deal with type conversions, especially with datetimes
  2. Define order summary view

Table of Contents

Setup

Developed using Python >=3.7 for latest features like typing and dataclasses.

Python Environment

  • Recommend using an isolated environment manager like virtualenv, pyenv, pipenv, etc.
    • Install dependencies: pip install -r requirements.txt

Configuration

  • To avoid hardcoding resources, the pipeline will pull the following from environment variables:
    • _PGSQL_HOSTNAME - Name of the db host to connect
    • _PGSQL_PORT - Port number on the host
    • _PGSQL_USER - Username for auth
    • _PGSQL_PASS - Password for auth
    • _PGSQL_DATABASE - Default database name to use
    • _AWS_S3_URL_SOURCE - s3 url to source data (object URI not given)

Note, while not necessary, I use direnv for local development to automatically export these env variables based on my current working directory. It uses a dotfile called .envrc which is git ignored to safely store sensitive info like credentials locally.

In cloud environments, I tend to use services like Secrets Manager or HashiCorp Vault to fetch credentials at runtime or rely on default compute metadata for authentication within the cloud environment. The trade-off is that it makes the code less portable and introduces vendor lock-in.

Usage

Run using a LocalExecutor with python pipeline.py

The pipeline is scheduled to run daily if it were to be deployed to a compute environment.

Design

Decided to use Prefect for a lightweight, Pythonic ETL tool that is both easy to run+test locally as well as deploy to a production compute environment.

  • Only using the Python package for this demo, otherwise I would use Docker to run a Prefect Server to provide pgsql metadata storage, GraphQL API, web server, etc.

Started off with the intention of using the Prefect PostgreSQL ExecuteMany Task but found the lower-level odbc client and SQL string manipulation would make the pipeline more complicated. Opted to use Pandas instead but in doing so gave up control over the table schemas - I've left this out due to time, otherwise it would be well-defined with SQLAlchemy data type boilerplate.

Regarding database normalization, it could be better. There is an argument to be made that OLAP workloads are better suited to columnar data stores which work best with denormalized tables because joins are expensive as opposed to relational databases like PostgreSQL designed for OLTP.

Improvements

  • Load tasks should ideally be concurrent
  • Append-only pipeline and index order tables by run_date
    • Keep historical record of orders
    • Would need to upsert users
  • Better database normalization
    • Namely, split orders into fact and dimensions
    • Might be some other entities that could be factored out like location and device/app
  • Current implementation won't scale to larger datasets:
    • Ideally can hit API directly async and store intermediate results in S3
      • Avoids iterating over every file and every row within each file
    • Larger data volume would benefit from parallelism i.e. Dask instead of Pandas
      • Currently relies on loading data dump into memory entirely
        • If able to use blob storage, can reduce memory footprint of workers

About

Demo Prefect for building a JSON to PostgreSQL Pipeline.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages