From 0cffa97608307a79b76d70d80b9d163a8b08c331 Mon Sep 17 00:00:00 2001 From: Sergey Lyapustin Date: Sun, 19 Jun 2022 15:15:57 +0200 Subject: [PATCH] Initial version. --- .gitignore | 4 ++ Procfile | 2 + README.md | 4 ++ airflow.cfg | 10 ++++ app.json | 22 +++++++++ dags/etl.py | 79 ++++++++++++++++++++++++++++++ dags/sql/employees_schema.sql | 7 +++ dags/sql/employees_temp_schema.sql | 8 +++ requirements.txt | 1 + runtime.txt | 1 + 10 files changed, 138 insertions(+) create mode 100644 Procfile create mode 100644 airflow.cfg create mode 100644 app.json create mode 100644 dags/etl.py create mode 100644 dags/sql/employees_schema.sql create mode 100644 dags/sql/employees_temp_schema.sql create mode 100644 requirements.txt create mode 100644 runtime.txt diff --git a/.gitignore b/.gitignore index b6e4761..67d5c24 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,7 @@ dmypy.json # Pyre type checker .pyre/ + +# Ignore some Airflow folders +dags/files/ +logs/ diff --git a/Procfile b/Procfile new file mode 100644 index 0000000..1664a04 --- /dev/null +++ b/Procfile @@ -0,0 +1,2 @@ + +web: airflow webserver --port $PORT --daemon & airflow scheduler diff --git a/README.md b/README.md index 0d5442d..dcf1205 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,6 @@ # airflow Airflow Demo on Heroku + +## References + +- [Airflow Tutorial](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html) diff --git a/airflow.cfg b/airflow.cfg new file mode 100644 index 0000000..2a61ab4 --- /dev/null +++ b/airflow.cfg @@ -0,0 +1,10 @@ +[core] +sql_alchemy_conn = $DATABASE_URL + +# Do not load examples DAGs +load_examples = False + +[webserver] +authenticate = True +auth_backend = airflow.contrib.auth.backends.password_auth +rbac = True # Required for role-based-access-control diff --git a/app.json b/app.json new file mode 100644 index 0000000..aedf478 --- /dev/null +++ b/app.json @@ -0,0 +1,22 @@ +{ + "name": "Airflow Demo on Heroku", + "description": "Airflow Demo on Heroku", + "repository": "https://github.com/slyapustin/celery-flower-heroku", + "keywords": [ + "airflow", + "heroku", + "python" + ], + "addons": [ + "heroku-postgresql:hobby-dev" + ], + "env": { + "AIRFLOW_HOME": { + "description": "Airflow Home Directory Location", + "value": "/app" + } + }, + "scripts": { + "postdeploy": "airflow initdb" + } +} \ No newline at end of file diff --git a/dags/etl.py b/dags/etl.py new file mode 100644 index 0000000..510d3ba --- /dev/null +++ b/dags/etl.py @@ -0,0 +1,79 @@ +import datetime +import pendulum +import os + +import requests +from airflow.decorators import dag, task +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.providers.postgres.operators.postgres import PostgresOperator + + +@dag( + #schedule_interval="0 0 * * *", + schedule_interval="*/5 * * * *", + start_date=pendulum.datetime(2022, 6, 19, tz="UTC"), + catchup=False, + dagrun_timeout=datetime.timedelta(minutes=60), +) +def Etl(): + create_employees_table = PostgresOperator( + task_id="create_employees_table", + postgres_conn_id="tutorial_pg_conn", + sql="sql/employees_schema.sql", + ) + + create_employees_temp_table = PostgresOperator( + task_id="create_employees_temp_table", + postgres_conn_id="tutorial_pg_conn", + sql="sql/employees_temp_schema.sql", + ) + + @task + def get_data(): + # NOTE: configure this as appropriate for your airflow environment + data_path = "/opt/airflow/dags/files/employees.csv" + os.makedirs(os.path.dirname(data_path), exist_ok=True) + + url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv" + + response = requests.request("GET", url) + + with open(data_path, "w") as file: + file.write(response.text) + + postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") + conn = postgres_hook.get_conn() + cur = conn.cursor() + with open(data_path, "r") as file: + cur.copy_expert( + "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", + file, + ) + conn.commit() + + @task + def merge_data(): + query = """ + INSERT INTO employees + SELECT * + FROM ( + SELECT DISTINCT * + FROM employees_temp + ) + ON CONFLICT ("Serial Number") DO UPDATE + SET "Serial Number" = excluded."Serial Number"; + """ + try: + postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") + conn = postgres_hook.get_conn() + cur = conn.cursor() + cur.execute(query) + conn.commit() + return 0 + except Exception as e: + return 1 + + [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data() + + +dag = Etl() diff --git a/dags/sql/employees_schema.sql b/dags/sql/employees_schema.sql new file mode 100644 index 0000000..fa6363c --- /dev/null +++ b/dags/sql/employees_schema.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS employees ( + "Serial Number" NUMERIC PRIMARY KEY, + "Company Name" TEXT, + "Employee Markme" TEXT, + "Description" TEXT, + "Leave" INTEGER + ); \ No newline at end of file diff --git a/dags/sql/employees_temp_schema.sql b/dags/sql/employees_temp_schema.sql new file mode 100644 index 0000000..01bb0da --- /dev/null +++ b/dags/sql/employees_temp_schema.sql @@ -0,0 +1,8 @@ +DROP TABLE IF EXISTS employees_temp; + CREATE TABLE employees_temp ( + "Serial Number" NUMERIC PRIMARY KEY, + "Company Name" TEXT, + "Employee Markme" TEXT, + "Description" TEXT, + "Leave" INTEGER + ); \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c9dded9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +apache-airflow==2.3.2 diff --git a/runtime.txt b/runtime.txt new file mode 100644 index 0000000..c6f7782 --- /dev/null +++ b/runtime.txt @@ -0,0 +1 @@ +python-3.9.13