-
Notifications
You must be signed in to change notification settings - Fork 37
Running Hamilton on Flyte #233
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts -- don't fully understand it -- I need to look into the flytekit API. I think we might want a better toolset for compiling to other frameworks...
class PandasSeriesTransformer(TypeTransformer[pd.Series]): | ||
""" | ||
Creates a transformer to handle PandasSeries, Similar to PandasTransformer | ||
in flight repo https://github.com/flyteorg/flytekit/blob/master/flytekit/types/schema/types_pandas.py |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's different between the one here and that one?
hamilton/experimental/h_flytekit.py
Outdated
# map inputs | ||
for input_node in node.dependencies: | ||
if input_node.name not in self.workflow.inputs: | ||
self.workflow.add_workflow_input(input_node.name, input_node.type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check if they're python approved types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is fairly limited support for custom types, but yeah a check can be added.
self.workflow.add_workflow_input(input_node.name, input_node.type) | ||
input_kwargs[input_node.name] = self.workflow.inputs[input_node.name] | ||
# add the node to workflow | ||
wf_node = self.workflow.add_entity(task, **input_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be misreading, but why would it already have outputs/where does it get it from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, See Imperative Workflow . It happens when you add the entity i.e. task with input_kwargs
hamilton/experimental/h_flytekit.py
Outdated
if len(outputs) == 1: | ||
self.workflow.add_workflow_output(node.name, list(outputs.values())[0], node.type) | ||
else: | ||
self.workflow.add_workflow_output(node.name, outputs, node.type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And why might it have multiple outputs?
Flytekit Python is the Python Library for easily authoring, testing, deploying, | ||
and interacting with Flyte tasks, workflows, and launch plans | ||
|
||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the right track @ramannanda9 !
Here we'd probably need an explanation here of how we're actually using Flyte. E.g. the code that we map things to, how does it behave/work and are there any caveats, decisions made, etc.
|
||
|
||
# Register with FlyteKit type engine | ||
TypeEngine.register(PandasSeriesTransformer()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ramannanda9 since Hamilton supports any object type, we need to think how to handle arbitrary python object types here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah not the best idea, but they ideally could default to just pickle dumping rather than raising unsupported type.
f428ae8
to
25498cc
Compare
See #139. The GraphAdapter treats a node as a PythonFunctionTask, it adds the node to an ImperativeWorkflow in flyte. The execution of workflow is performed during build_output. This way hamilton functions can be executed in Flyte runtime. We end up adding a PandasSeriesTransformer as that is required for using PandasSeries as function outputs and inputs of flight task nodes. Any customtype that is not a dataclass, native type or has support already in flyte will raise a ValueError Adds FlyteKitGraphAdapter tests Adds tests to show unsupported types
25498cc
to
93f150c
Compare
[Short description explaining the high-level reason for the pull request]
Changes
How I tested this
Notes
Checklist