-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
hamilton + quokka support #58
Comments
Comment by skrawcz Thoughts to think about:
|
Comment by skrawcz Sketch of some code converting the hello world example # Data Loading
# Filtering is part of data loading -- do we also expose columns like this?
@extract_columns(*["l_quantity", "l_extendedprice", "l_discount", "l_tax", "l_returnflag", "l_linestatus"])
def lineitem(qc: QuokkaContext, path: str,
filter: str = "l_shipdate <= date '1998-12-01' - interval '90' day") -> DataStream:
"""Loads and filters data from the lineitem table"""
ds: DataStream = qc.read_csv(path, sep="|", has_header=True)
if filter:
ds = ds.filter(filter)
return ds
# transforms we want people to write
def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
"""Computes the discounted price"""
return l_extendedprice * (1 - l_discount)
def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
"""Computes the charge"""
return l_extendedprice * (1 - l_discount) * (1 + l_tax)
@groupby("l_returnflag", "l_linestatus", order_by=[...])
def grouped_lineitem(l_quantity: pl.Series, l_extendedprice: pl.Series,
disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
l_returnflag: pl.Series, l_linestatus: pl.Series) -> GroupedDataStream:
pass
# maybe more subtly
def grouped_lineitem(l_returnflag: pl.Series, l_linestatus: pl.Series, *, l_quantity: pl.Series, l_extendedprice: pl.Series,
disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
) -> GroupedDataStream:
pass |
Comment by skrawcz Parking a thought -- what about just enabling hamilton type functions instead of with_column? Basically given a datastream, that's the input and the output is another datastream def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
"""Computes the discounted price"""
return l_extendedprice * (1 - l_discount)
def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
"""Computes the charge"""
return l_extendedprice * (1 - l_discount) * (1 + l_tax)
def main(qc, path):
temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
adapter = QuokkaGraphAdapter_V2(base.DictResult())
lineitem = qc.read_csv(path, sep="|", has_header=True)
d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
dr = hamilton.Driver({}, temp_module, adapter=adapter)
d = dr.execute(["disc_price", "charge"])
f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
{
"l_quantity": ["sum", "avg"],
"l_extendedprice": ["sum", "avg"],
"disc_price": "sum",
"charge": "sum",
"l_discount": "avg",
"*": "count",
}
)
return f.collect() |
Comment by skrawcz Tweaking the above slightly: def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
"""Computes the discounted price"""
return l_extendedprice * (1 - l_discount)
def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
"""Computes the charge"""
return l_extendedprice * (1 - l_discount) * (1 + l_tax)
def main(qc, path):
temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
adapter = QuokkaGraphAdapter_V2()
lineitem = qc.read_csv(path, sep="|", has_header=True)
d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
dr = hamilton.Driver({}, temp_module, adapter=adapter)
d = dr.execute(["disc_price", "charge"], inputs={c: d for c in d.schema}) # default is to append columns to passed in dataframe
f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
{
"l_quantity": ["sum", "avg"],
"l_extendedprice": ["sum", "avg"],
"disc_price": "sum",
"charge": "sum",
"l_discount": "avg",
"*": "count",
}
)
return f.collect() The QuokkaGraphAdapter_V2 adapter then intercepts and massages the internals appropriately to return a datastream with the extra columns. |
Issue by skrawcz
Thursday Jan 05, 2023 at 00:48 GMT
Originally opened as stitchfix/hamilton#269
Working on how to make Hamilton handle Quokka better.
skrawcz included the following code: https://github.com/stitchfix/hamilton/pull/269/commits
The text was updated successfully, but these errors were encountered: