Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Running Hamilton on Flyte #233

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft

Running Hamilton on Flyte #233

wants to merge 3 commits into from

Conversation

ramannanda9
Copy link
Member

[Short description explaining the high-level reason for the pull request]

Changes

How I tested this

Notes

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

Copy link
Collaborator

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts -- don't fully understand it -- I need to look into the flytekit API. I think we might want a better toolset for compiling to other frameworks...

class PandasSeriesTransformer(TypeTransformer[pd.Series]):
"""
Creates a transformer to handle PandasSeries, Similar to PandasTransformer
in flight repo https://github.com/flyteorg/flytekit/blob/master/flytekit/types/schema/types_pandas.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's different between the one here and that one?

# map inputs
for input_node in node.dependencies:
if input_node.name not in self.workflow.inputs:
self.workflow.add_workflow_input(input_node.name, input_node.type)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if they're python approved types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is fairly limited support for custom types, but yeah a check can be added.

self.workflow.add_workflow_input(input_node.name, input_node.type)
input_kwargs[input_node.name] = self.workflow.inputs[input_node.name]
# add the node to workflow
wf_node = self.workflow.add_entity(task, **input_kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misreading, but why would it already have outputs/where does it get it from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, See Imperative Workflow . It happens when you add the entity i.e. task with input_kwargs

if len(outputs) == 1:
self.workflow.add_workflow_output(node.name, list(outputs.values())[0], node.type)
else:
self.workflow.add_workflow_output(node.name, outputs, node.type)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why might it have multiple outputs?

Flytekit Python is the Python Library for easily authoring, testing, deploying,
and interacting with Flyte tasks, workflows, and launch plans

"""
Copy link
Collaborator

@skrawcz skrawcz Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the right track @ramannanda9 !

Here we'd probably need an explanation here of how we're actually using Flyte. E.g. the code that we map things to, how does it behave/work and are there any caveats, decisions made, etc.



# Register with FlyteKit type engine
TypeEngine.register(PandasSeriesTransformer())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ramannanda9 since Hamilton supports any object type, we need to think how to handle arbitrary python object types here too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah not the best idea, but they ideally could default to just pickle dumping rather than raising unsupported type.

@ramannanda9 ramannanda9 force-pushed the flyte_integration branch 2 times, most recently from f428ae8 to 25498cc Compare January 6, 2023 00:41
See #139.

The GraphAdapter treats a node as a PythonFunctionTask, it adds the node to an ImperativeWorkflow in flyte.
The execution of workflow is performed during build_output.
This way hamilton functions can be executed in Flyte runtime.

We end up adding a PandasSeriesTransformer as that is required for using PandasSeries as function outputs and inputs of flight task nodes.

Any customtype that is not a dataclass, native type or has support already in flyte will raise a ValueError

Adds FlyteKitGraphAdapter tests

Adds tests to show unsupported types
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants