Skip to content

Commit 8dca03a

Browse files
authored
More updates (#25)
1 parent a00d25c commit 8dca03a

File tree

6 files changed

+35
-17
lines changed

6 files changed

+35
-17
lines changed

pipeline/config.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
# Output location for data files. Can be a local directory
2+
# or a remote path like "s3://path/to/bucket".
3+
data-dir: ./data-test
14
# Whether to run data-processing tasks locally
25
# or on the cloud with Coiled.
36
local: true
4-
# Output location for data files. Can be a local directory
5-
# or a remote path like "s3://path/to/bucket".
6-
data-dir: ./data
7+
# If using Coiled (`local: false`), use this Coiled workspace.
8+
# Defaults to your default workspace.
9+
workspace: null

pipeline/dashboard.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
from prefect import flow
88
from rich import print
99

10-
from .settings import DASHBOARD_FILE, LOCAL, REGION
10+
from .settings import DASHBOARD_FILE, LOCAL, REGION, WORKSPACE
1111

1212
port = 8080
13-
name = "etl-tpch-dashboard"
14-
subdomain = "etl-tpch"
13+
name = "dashboard"
14+
subdomain = "dashboard"
1515

1616

1717
def deploy():
@@ -22,6 +22,7 @@ def deploy():
2222
else:
2323
cmd = f"""
2424
coiled run \
25+
--workspace {WORKSPACE} \
2526
--region {REGION} \
2627
--vm-type t3.medium \
2728
-f dashboard.py \
@@ -31,7 +32,6 @@ def deploy():
3132
-e AWS_ACCESS_KEY_ID={os.environ['AWS_ACCESS_KEY_ID']} \
3233
-e AWS_SECRET_ACCESS_KEY={os.environ['AWS_SECRET_ACCESS_KEY']} \
3334
--detach \
34-
--keepalive '520 weeks' \
3535
--name {name} \
3636
-- \
3737
{cmd}

pipeline/data.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,24 @@
44

55
import coiled
66
import duckdb
7+
import numpy as np
78
import pandas as pd
89
import psutil
910
from dask.distributed import print
1011
from prefect import flow, task
1112

12-
from .settings import LOCAL, PROCESSED_DIR, REGION, STAGING_DIR, fs, lock_generate
13+
from .settings import (
14+
LOCAL,
15+
PROCESSED_DIR,
16+
REGION,
17+
STAGING_DIR,
18+
WORKSPACE,
19+
fs,
20+
lock_generate,
21+
)
1322

1423

1524
def new_time(t, t_start=None, t_end=None):
16-
1725
d = pd.Timestamp("1998-12-31") - pd.Timestamp("1992-01-01")
1826
return t_start + (t - pd.Timestamp("1992-01-01")) * ((t_end - t_start) / d)
1927

@@ -24,7 +32,7 @@ def new_time(t, t_start=None, t_end=None):
2432
local=LOCAL,
2533
region=REGION,
2634
vm_type="m6i.2xlarge",
27-
tags={"workflow": "etl-tpch"},
35+
account=WORKSPACE,
2836
)
2937
def generate(scale: float, path: os.PathLike) -> None:
3038
static_tables = ["customer", "nation", "part", "partsupp", "region", "supplier"]
@@ -84,12 +92,15 @@ def generate(scale: float, path: os.PathLike) -> None:
8492
.rename(columns={"o_orderkey_new": "l_orderkey"})
8593
)
8694

87-
# Shift times to be more recent
95+
# Shift times to be more recent and lineitem prices to be non-uniform
8896
if table == "lineitem":
8997
df["l_shipdate"] = new_time(
90-
df["l_shipdate"], t_start=now, t_end=now + pd.Timedelta("7 days")
98+
df["l_shipdate"], t_start=now, t_end=now + pd.Timedelta("3 days")
9199
)
92100
df = df.rename(columns={"l_shipdate": "l_ship_time"})
101+
df["l_extendedprice"] = (
102+
np.random.rand(df.shape[0]) * df["l_extendedprice"]
103+
)
93104
cols = [c for c in df.columns if "date" in c]
94105
df[cols] = new_time(
95106
df[cols], t_start=now - pd.Timedelta("15 minutes"), t_end=now

pipeline/preprocess.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
PROCESSED_DIR,
1212
REGION,
1313
STAGING_DIR,
14+
WORKSPACE,
1415
fs,
1516
lock_compact,
1617
lock_json_to_parquet,
@@ -25,11 +26,11 @@
2526
retry_jitter_factor=1,
2627
)
2728
@coiled.function(
28-
name="data-etl",
29+
name="json-to-parquet",
2930
local=LOCAL,
3031
region=REGION,
3132
vm_type="m6i.2xlarge",
32-
tags={"workflow": "etl-tpch"},
33+
account=WORKSPACE,
3334
)
3435
def json_file_to_parquet(file):
3536
"""Convert raw JSON data file to Parquet."""
@@ -60,10 +61,11 @@ def json_to_parquet():
6061

6162
@task(log_prints=True)
6263
@coiled.function(
64+
name="compact",
6365
local=LOCAL,
6466
region=REGION,
6567
vm_type="m6i.xlarge",
66-
tags={"workflow": "etl-tpch"},
68+
account=WORKSPACE,
6769
)
6870
def compact(table):
6971
print(f"Compacting table {table}")

pipeline/reduce.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
PROCESSED_DIR,
1313
REGION,
1414
RESULTS_DIR,
15+
WORKSPACE,
1516
fs,
1617
lock_compact,
1718
storage_options,
@@ -29,9 +30,9 @@ def unshipped_orders_by_revenue(segment):
2930
cluster = functools.partial(
3031
coiled.Cluster,
3132
name="reduce",
33+
workspace=WORKSPACE,
3234
region=REGION,
33-
n_workers=10,
34-
tags={"workflow": "etl-tpch"},
35+
n_workers=30,
3536
shutdown_on_close=False,
3637
idle_timeout="1 minute",
3738
wait_for_workers=True,

pipeline/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
data = yaml.safe_load(f)
1111

1212
LOCAL = data["local"]
13+
WORKSPACE = data["workspace"]
1314
ROOT = Path(data["data-dir"]).resolve()
1415
fs = fsspec.filesystem(ROOT.protocol, use_listings_cache=False)
1516

0 commit comments

Comments
 (0)