Skip to content

Commit a00d25c

Browse files
authored
Update workflow (#24)
1 parent 36bcd72 commit a00d25c

File tree

11 files changed

+227
-285
lines changed

11 files changed

+227
-285
lines changed

dashboard.py

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,70 @@
1-
import dask.dataframe as dd
2-
import plotly.express as px
1+
import time
2+
3+
import pandas as pd
34
import streamlit as st
45

56
from pipeline.settings import RESULTS_DIR
67

78

89
@st.cache_data
9-
def get_data(region, part_type):
10-
return dd.read_parquet(
11-
RESULTS_DIR / region / part_type.upper() / "*.parquet"
12-
).compute()
13-
10+
def get_data(segment):
11+
return pd.read_parquet(RESULTS_DIR / f"{segment.lower()}.snappy.parquet")
1412

15-
description = """
16-
### Recommended Suppliers
17-
_Some text that explains the business problem being addressed..._
1813

19-
This query finds which supplier should be selected to place an order for a given part in a given region.
14+
st.markdown(
15+
"""
16+
### Top Unshipped Orders
17+
_Top 50 unshipped orders with the highest revenue._
2018
"""
21-
st.markdown(description)
22-
regions = list(map(str.title, ["EUROPE", "AFRICA", "AMERICA", "ASIA", "MIDDLE EAST"]))
23-
region = st.selectbox(
24-
"Region",
25-
regions,
26-
index=None,
27-
placeholder="Please select a region...",
2819
)
29-
part_types = list(map(str.title, ["COPPER", "BRASS", "TIN", "NICKEL", "STEEL"]))
30-
part_type = st.selectbox(
31-
"Part Type",
32-
part_types,
20+
21+
SEGMENTS = ["automobile", "building", "furniture", "machinery", "household"]
22+
23+
24+
def files_exist():
25+
# Do we have all the files needed for the dashboard?
26+
files = list(RESULTS_DIR.rglob("*.snappy.parquet"))
27+
return len(files) == len(SEGMENTS)
28+
29+
30+
with st.spinner("Waiting for data..."):
31+
while not files_exist():
32+
time.sleep(5)
33+
34+
segments = list(
35+
map(str.title, ["automobile", "building", "furniture", "machinery", "household"])
36+
)
37+
segment = st.selectbox(
38+
"Segment",
39+
segments,
3340
index=None,
34-
placeholder="Please select a part type...",
41+
placeholder="Please select a product segment...",
3542
)
36-
if region and part_type:
37-
df = get_data(region, part_type)
43+
if segment:
44+
df = get_data(segment)
45+
df = df.drop(columns="o_shippriority")
46+
df["l_orderkey"] = df["l_orderkey"].map(lambda x: f"{x:09}")
47+
df["revenue"] = df["revenue"].round(2)
3848
df = df.rename(
3949
columns={
40-
"n_name": "Country",
41-
"s_name": "Supplier",
42-
"s_acctbal": "Balance",
43-
"p_partkey": "Part ID",
50+
"l_orderkey": "Order ID",
51+
"o_order_time": "Date Ordered",
52+
"revenue": "Revenue",
4453
}
4554
)
46-
maxes = df.groupby("Country").Balance.idxmax()
47-
data = df.loc[maxes]
48-
figure = px.choropleth(
49-
data,
50-
locationmode="country names",
51-
locations="Country",
52-
featureidkey="Supplier",
53-
color="Balance",
54-
color_continuous_scale="viridis",
55-
hover_data=["Country", "Supplier", "Balance"],
55+
56+
df = df.set_index("Order ID")
57+
st.dataframe(
58+
df.style.format({"Revenue": "${:,}"}),
59+
column_config={
60+
"Date Ordered": st.column_config.DateColumn(
61+
"Date Ordered",
62+
format="MM/DD/YYYY",
63+
help="Date order was placed",
64+
),
65+
"Revenue": st.column_config.NumberColumn(
66+
"Revenue (in USD)",
67+
help="Total revenue of order",
68+
),
69+
},
5670
)
57-
st.plotly_chart(figure, theme="streamlit", use_container_width=True)
58-
on = st.toggle("Show data")
59-
if on:
60-
st.write(
61-
df[["Country", "Supplier", "Balance", "Part ID"]], use_container_width=True
62-
)

environment.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,8 @@ dependencies:
1717
- s3fs
1818
- universal_pathlib <0.2.0
1919
- boto3
20-
- dask-deltatable
20+
- deltalake=0.15.3
21+
# - dask-deltatable
22+
- pip
23+
- pip:
24+
- git+https://github.com/fjetter/dask-deltatable.git@dask_expr

pipeline/dashboard.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import os
2+
import shlex
3+
import subprocess
4+
5+
import coiled
6+
import requests
7+
from prefect import flow
8+
from rich import print
9+
10+
from .settings import DASHBOARD_FILE, LOCAL, REGION
11+
12+
port = 8080
13+
name = "etl-tpch-dashboard"
14+
subdomain = "etl-tpch"
15+
16+
17+
def deploy():
18+
print("[green]Deploying dashboard...[/green]")
19+
cmd = f"streamlit run {DASHBOARD_FILE} --server.port {port} --server.headless true"
20+
if LOCAL:
21+
subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE)
22+
else:
23+
cmd = f"""
24+
coiled run \
25+
--region {REGION} \
26+
--vm-type t3.medium \
27+
-f dashboard.py \
28+
-f pipeline \
29+
--subdomain {subdomain} \
30+
--port {port} \
31+
-e AWS_ACCESS_KEY_ID={os.environ['AWS_ACCESS_KEY_ID']} \
32+
-e AWS_SECRET_ACCESS_KEY={os.environ['AWS_SECRET_ACCESS_KEY']} \
33+
--detach \
34+
--keepalive '520 weeks' \
35+
--name {name} \
36+
-- \
37+
{cmd}
38+
"""
39+
subprocess.run(shlex.split(cmd))
40+
print(f"Dashboard is available at [blue]{get_address()}[/blue] :rocket:")
41+
42+
43+
def get_address():
44+
if LOCAL:
45+
return f"http://0.0.0.0:{port}"
46+
else:
47+
with coiled.Cloud() as cloud:
48+
account = cloud.default_account
49+
return f"http://{subdomain}.{account}.dask.host:{port}"
50+
51+
52+
@flow(log_prints=True)
53+
def deploy_dashboard():
54+
address = get_address()
55+
try:
56+
r = requests.get(address)
57+
r.raise_for_status()
58+
except Exception:
59+
deploy()
60+
else:
61+
print("Dashboard is healthy")

pipeline/data.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,29 @@
11
import datetime
22
import os
3+
import uuid
34

45
import coiled
56
import duckdb
7+
import pandas as pd
68
import psutil
79
from dask.distributed import print
810
from prefect import flow, task
911

1012
from .settings import LOCAL, PROCESSED_DIR, REGION, STAGING_DIR, fs, lock_generate
1113

1214

15+
def new_time(t, t_start=None, t_end=None):
16+
17+
d = pd.Timestamp("1998-12-31") - pd.Timestamp("1992-01-01")
18+
return t_start + (t - pd.Timestamp("1992-01-01")) * ((t_end - t_start) / d)
19+
20+
1321
@task(log_prints=True)
1422
@coiled.function(
1523
name="data-generation",
1624
local=LOCAL,
1725
region=REGION,
18-
keepalive="5 minutes",
26+
vm_type="m6i.2xlarge",
1927
tags={"workflow": "etl-tpch"},
2028
)
2129
def generate(scale: float, path: os.PathLike) -> None:
@@ -42,7 +50,8 @@ def generate(scale: float, path: os.PathLike) -> None:
4250
.arrow()
4351
.column("table_name")
4452
)
45-
for table in map(str, tables):
53+
now = pd.Timestamp.now()
54+
for table in reversed(sorted(map(str, tables))):
4655
if table in static_tables and (
4756
list((STAGING_DIR / table).rglob("*.json"))
4857
or list((PROCESSED_DIR / table).rglob("*.parquet"))
@@ -51,15 +60,49 @@ def generate(scale: float, path: os.PathLike) -> None:
5160
continue
5261
print(f"Exporting table: {table}")
5362
stmt = f"""select * from {table}"""
54-
df = con.sql(stmt).arrow()
63+
df = con.sql(stmt).df()
64+
65+
# Make order IDs unique across multiple data generation cycles
66+
if table == "orders":
67+
# Generate new, random uuid order IDs
68+
df["o_orderkey_new"] = pd.Series(
69+
(uuid.uuid4().hex for _ in range(df.shape[0])),
70+
dtype="string[pyarrow]",
71+
)
72+
orderkey_new = df[["o_orderkey", "o_orderkey_new"]].set_index(
73+
"o_orderkey"
74+
)
75+
df = df.drop(columns="o_orderkey").rename(
76+
columns={"o_orderkey_new": "o_orderkey"}
77+
)
78+
elif table == "lineitem":
79+
# Join with `orderkey_new` mapping to convert old order IDs to new order IDs
80+
df = (
81+
df.set_index("l_orderkey")
82+
.join(orderkey_new)
83+
.reset_index(drop=True)
84+
.rename(columns={"o_orderkey_new": "l_orderkey"})
85+
)
86+
87+
# Shift times to be more recent
88+
if table == "lineitem":
89+
df["l_shipdate"] = new_time(
90+
df["l_shipdate"], t_start=now, t_end=now + pd.Timedelta("7 days")
91+
)
92+
df = df.rename(columns={"l_shipdate": "l_ship_time"})
93+
cols = [c for c in df.columns if "date" in c]
94+
df[cols] = new_time(
95+
df[cols], t_start=now - pd.Timedelta("15 minutes"), t_end=now
96+
)
97+
df = df.rename(columns={c: c.replace("date", "_time") for c in cols})
5598

5699
outfile = (
57100
path
58101
/ table
59102
/ f"{table}_{datetime.datetime.now().isoformat().split('.')[0]}.json"
60103
)
61104
fs.makedirs(outfile.parent, exist_ok=True)
62-
df.to_pandas().to_json(
105+
df.to_json(
63106
outfile,
64107
date_format="iso",
65108
orient="records",
@@ -73,7 +116,6 @@ def generate(scale: float, path: os.PathLike) -> None:
73116
def generate_data():
74117
with lock_generate:
75118
generate(
76-
scale=0.01,
119+
scale=1,
77120
path=STAGING_DIR,
78121
)
79-
generate.fn.client.restart(wait_for_workers=False)

pipeline/monitor.py

Lines changed: 0 additions & 9 deletions
This file was deleted.

pipeline/preprocess.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from prefect.tasks import exponential_backoff
88

99
from .settings import (
10-
ARCHIVE_DIR,
1110
LOCAL,
1211
PROCESSED_DIR,
1312
REGION,
@@ -29,7 +28,7 @@
2928
name="data-etl",
3029
local=LOCAL,
3130
region=REGION,
32-
keepalive="5 minutes",
31+
vm_type="m6i.2xlarge",
3332
tags={"workflow": "etl-tpch"},
3433
)
3534
def json_file_to_parquet(file):
@@ -42,19 +41,10 @@ def json_file_to_parquet(file):
4241
deltalake.write_deltalake(
4342
outfile, data, mode="append", storage_options=storage_options
4443
)
45-
print(f"Saved {outfile}")
44+
fs.rm(str(file))
4645
return file
4746

4847

49-
@task
50-
def archive_json_file(file):
51-
outfile = ARCHIVE_DIR / file.relative_to(STAGING_DIR)
52-
fs.makedirs(outfile.parent, exist_ok=True)
53-
fs.mv(str(file), str(outfile))
54-
55-
return outfile
56-
57-
5848
def list_new_json_files():
5949
return list(STAGING_DIR.rglob("*.json"))
6050

@@ -63,18 +53,16 @@ def list_new_json_files():
6353
def json_to_parquet():
6454
with lock_json_to_parquet:
6555
files = list_new_json_files()
66-
files = json_file_to_parquet.map(files)
67-
futures = archive_json_file.map(files)
56+
futures = json_file_to_parquet.map(files)
6857
for f in futures:
69-
print(f"Archived {str(f.result())}")
58+
print(f"Processed {str(f.result())}")
7059

7160

7261
@task(log_prints=True)
7362
@coiled.function(
74-
name="data-etl",
7563
local=LOCAL,
7664
region=REGION,
77-
keepalive="5 minutes",
65+
vm_type="m6i.xlarge",
7866
tags={"workflow": "etl-tpch"},
7967
)
8068
def compact(table):

0 commit comments

Comments
 (0)