Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hamilton + quokka support #58

Closed
HamiltonRepoMigrationBot opened this issue Feb 26, 2023 · 4 comments
Closed

hamilton + quokka support #58

HamiltonRepoMigrationBot opened this issue Feb 26, 2023 · 4 comments
Labels
issue stale: revisit when necessary Closing because this issue is stale as the world has moved. Reopen as necessary. migrated-from-old-repo Migrated from old repository

Comments

@HamiltonRepoMigrationBot
Copy link
Collaborator

Issue by skrawcz
Thursday Jan 05, 2023 at 00:48 GMT
Originally opened as stitchfix/hamilton#269


Working on how to make Hamilton handle Quokka better.


skrawcz included the following code: https://github.com/stitchfix/hamilton/pull/269/commits

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Thursday Jan 05, 2023 at 07:25 GMT


Thoughts to think about:

  • quokka, like spark, relies on a central object to be manipulated so that it can perform query optimizations.
  • adding support for hamilton means that we need to under the hood manipulate this object correctly.
  • I believe with a default hamilton function we have enough information to know what to do, it's just a matter of figuring out how to traverse the DAG in a way to construct the right order of operations.

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Thursday Jan 05, 2023 at 07:46 GMT


Sketch of some code converting the hello world example

# Data Loading
# Filtering is part of data loading -- do we also expose columns like this?
@extract_columns(*["l_quantity", "l_extendedprice", "l_discount", "l_tax", "l_returnflag", "l_linestatus"])
def lineitem(qc: QuokkaContext, path: str,
             filter: str = "l_shipdate <= date '1998-12-01' - interval '90' day") -> DataStream:
    """Loads and filters data from the lineitem table"""
    ds: DataStream = qc.read_csv(path, sep="|", has_header=True)
    if filter:
        ds = ds.filter(filter)
    return ds


# transforms we want people to write
def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)


@groupby("l_returnflag", "l_linestatus", order_by=[...])
def grouped_lineitem(l_quantity: pl.Series, l_extendedprice: pl.Series,
                        disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
                        l_returnflag: pl.Series, l_linestatus: pl.Series) -> GroupedDataStream:
    pass

# maybe more subtly
def grouped_lineitem(l_returnflag: pl.Series, l_linestatus: pl.Series, *, l_quantity: pl.Series, l_extendedprice: pl.Series,
                        disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
                        ) -> GroupedDataStream:
    pass

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Saturday Jan 07, 2023 at 20:02 GMT


Parking a thought -- what about just enabling hamilton type functions instead of with_column?

Basically given a datastream, that's the input and the output is another datastream

def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)

def main(qc, path):
    temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
    adapter = QuokkaGraphAdapter_V2(base.DictResult()) 

    lineitem = qc.read_csv(path, sep="|", has_header=True)
    d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
    
    dr = hamilton.Driver({}, temp_module, adapter=adapter)
    d = dr.execute(["disc_price", "charge"])
    f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
        {
            "l_quantity": ["sum", "avg"],
            "l_extendedprice": ["sum", "avg"],
            "disc_price": "sum",
            "charge": "sum",
            "l_discount": "avg",
            "*": "count",
        }
    )
    return f.collect()

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Tuesday Feb 21, 2023 at 06:40 GMT


Tweaking the above slightly:

def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)

def main(qc, path):
    temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
    adapter = QuokkaGraphAdapter_V2() 

    lineitem = qc.read_csv(path, sep="|", has_header=True)
    d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
    
    dr = hamilton.Driver({}, temp_module, adapter=adapter)
    d = dr.execute(["disc_price", "charge"], inputs={c: d for c in d.schema}) # default is to append columns to passed in dataframe
    f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
        {
            "l_quantity": ["sum", "avg"],
            "l_extendedprice": ["sum", "avg"],
            "disc_price": "sum",
            "charge": "sum",
            "l_discount": "avg",
            "*": "count",
        }
    )
    return f.collect()

The QuokkaGraphAdapter_V2 adapter then intercepts and massages the internals appropriately to return a datastream with the extra columns.

@elijahbenizzy elijahbenizzy added the migrated-from-old-repo Migrated from old repository label Feb 26, 2023
@skrawcz skrawcz changed the title WIP: hamilton + quokka / pyspark hamilton + quokka support Mar 26, 2024
@skrawcz skrawcz added the issue stale: revisit when necessary Closing because this issue is stale as the world has moved. Reopen as necessary. label Jul 18, 2024
@skrawcz skrawcz closed this as completed Jul 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
issue stale: revisit when necessary Closing because this issue is stale as the world has moved. Reopen as necessary. migrated-from-old-repo Migrated from old repository
Projects
None yet
Development

No branches or pull requests

3 participants