Skip to content

Commit

Permalink
Initial version.
Browse files Browse the repository at this point in the history
  • Loading branch information
slyapustin committed Jun 19, 2022
1 parent 1771dfa commit 0cffa97
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ dmypy.json

# Pyre type checker
.pyre/

# Ignore some Airflow folders
dags/files/
logs/
2 changes: 2 additions & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

web: airflow webserver --port $PORT --daemon & airflow scheduler
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
# airflow
Airflow Demo on Heroku

## References

- [Airflow Tutorial](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html)
10 changes: 10 additions & 0 deletions airflow.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[core]
sql_alchemy_conn = $DATABASE_URL

# Do not load examples DAGs
load_examples = False

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
rbac = True # Required for role-based-access-control
22 changes: 22 additions & 0 deletions app.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "Airflow Demo on Heroku",
"description": "Airflow Demo on Heroku",
"repository": "https://github.com/slyapustin/celery-flower-heroku",
"keywords": [
"airflow",
"heroku",
"python"
],
"addons": [
"heroku-postgresql:hobby-dev"
],
"env": {
"AIRFLOW_HOME": {
"description": "Airflow Home Directory Location",
"value": "/app"
}
},
"scripts": {
"postdeploy": "airflow initdb"
}
}
79 changes: 79 additions & 0 deletions dags/etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import datetime
import pendulum
import os

import requests
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator


@dag(
#schedule_interval="0 0 * * *",
schedule_interval="*/5 * * * *",
start_date=pendulum.datetime(2022, 6, 19, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
def Etl():
create_employees_table = PostgresOperator(
task_id="create_employees_table",
postgres_conn_id="tutorial_pg_conn",
sql="sql/employees_schema.sql",
)

create_employees_temp_table = PostgresOperator(
task_id="create_employees_temp_table",
postgres_conn_id="tutorial_pg_conn",
sql="sql/employees_temp_schema.sql",
)

@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)

url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"

response = requests.request("GET", url)

with open(data_path, "w") as file:
file.write(response.text)

postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()

@task
def merge_data():
query = """
INSERT INTO employees
SELECT *
FROM (
SELECT DISTINCT *
FROM employees_temp
)
ON CONFLICT ("Serial Number") DO UPDATE
SET "Serial Number" = excluded."Serial Number";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
return 1

[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()


dag = Etl()
7 changes: 7 additions & 0 deletions dags/sql/employees_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS employees (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);
8 changes: 8 additions & 0 deletions dags/sql/employees_temp_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
apache-airflow==2.3.2
1 change: 1 addition & 0 deletions runtime.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
python-3.9.13

0 comments on commit 0cffa97

Please sign in to comment.