Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] cmdline/submitor #754

Draft
wants to merge 3 commits into
base: cmdline_orchestration
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions examples/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,28 @@ Test the installation with

# Features

Currently 4 commands:

## Commands
- `build`: creates a Hamilton `Driver` from specified modules. It"s useful to validate the dataflow definition
- `validate`: calls `Driver.validate_execution()` for a set of `inputs` and `overrides` passed through the `--context` option.
- `view`: calls `dr.display_all_functions()` on the built `Driver`
- `version`: generates node hashes based on their source code, and a dataflow hash from the collection of node hashes.
- `diff`: get a diff of added/deleted/edited nodes between the current version of Python modules and another git reference (`default=HEAD`, i.e., the last commited version). You can get a visualization of the diffs

## Options
- all commands receive `MODULES` which is a list of path to Python modules to assembled as a single dataflow
- all commands receive `--context` (`-ctx`), which is a file (`.py` or `.json`) that include top-level headers (see `config.py` and `config.json` in this repo for example):
- `HAMILTON_CONFIG`: `typing.Mapping` passed to `driver.Builder.with_config()`
- `HAMILTON_FINAL_VARS`: `typing.Sequence` passed to `driver.validate_execution(final_vars=...)`
- `HAMILTON_INPUTS`: `typing.Mapping` passed to `driver.validate_execution(inputs=...)`
- `HAMILTON_OVERRIDES`: `typing.Mapping` passed to `driver.validate_execution(overrides=...)`
- Using a `.py` context file provides more flexibility than `.json` to define inputs and overrides objects.
- all commands receive a `--name` (`-n`), which is used to name the output file (when the command produces a file). If `None`, a file name will be derived from the `MODULES` argument.
- When using a command that generates a file:
- passing a file path: will output the file with this name at this location
- passing a directory: will output the file with the `--name` value (either explicit or default derived from `MODULES`) at this location
- passing a file path with the name `default`: will output the file with the name replaced by `--name` value at this location. This is useful when you need to specify a type via filename. For example, `hamilton view -o /path/to/default.pdf my_dataflow.py` will create the file `/path/to/my_dataflow.pdf`. (This behavior may change)


See [DOCS.md](./DOCS.md) for the full references

# Usage
Expand Down
6 changes: 6 additions & 0 deletions examples/cli/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"HAMILTON_CONFIG": {
"holiday": "halloween"
},
"HAMILTON_FINAL_VARS": ["customers_df", "customer_summary_table"]
}
3 changes: 3 additions & 0 deletions examples/cli/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
HAMILTON_CONFIG = dict(config_exists="true")

HAMILTON_FINAL_VARS = ["config_when", "customer_summary_table"]
11 changes: 9 additions & 2 deletions examples/cli/module_v1.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import pandas as pd

from hamilton.function_modifiers import extract_columns
from hamilton.function_modifiers import config, extract_columns


def customers_df(customers_path: str = "customers.csv") -> pd.DataFrame:
@config.when(holiday="halloween")
def customers_df__halloween() -> pd.DataFrame:
"""Example of using @config.when function modifier"""
return pd.read_csv("/path/to/halloween/customers.csv")


@config.when_not(holiday="halloween")
def customers_df__default(customers_path: str = "customers.csv") -> pd.DataFrame:
"""Load the customer dataset."""
return pd.read_csv(customers_path)

Expand Down
67 changes: 67 additions & 0 deletions examples/cmdline_orchestrator/cmdline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import functools
import subprocess

from hamilton.execution.executors import DefaultExecutionManager, TaskExecutor
from hamilton.execution.grouping import TaskImplementation
from hamilton.function_modifiers import tag


class CMDLineExecutionManager(DefaultExecutionManager):
def get_executor_for_task(self, task: TaskImplementation) -> TaskExecutor:
"""Simple implementation that returns the local executor for single task executions,

:param task: Task to get executor for
:return: A local task if this is a "single-node" task, a remote task otherwise
"""
is_single_node_task = len(task.nodes) == 1
if not is_single_node_task:
raise ValueError("Only single node tasks supported")
(node,) = task.nodes
if "cmdline" in node.tags: # hard coded for now
return self.remote_executor
return self.local_executor


import inspect


def cmdline():
"""Decorator to run the result of a function as a command line command."""

def decorator(func):
if not inspect.isgeneratorfunction(func):
raise ValueError("Function must be a generator.")

@functools.wraps(func)
def wrapper(*args, **kwargs):
# we don't want to change the current working directory
# until we are executing the function
# name = func.__name__

# If the function is a generator, we need to block and monitor the task if required
generator = func(*args, **kwargs)
cmd: str = next(generator)
# Run the command and capture the output
process = subprocess.run(cmd, shell=True, capture_output=True, check=True)

try:
process_result = process.stdout.decode("utf-8")
generator.send(process_result)
# ValueError should not be hit because a StopIteration should be raised, unless
# there are multiple yields in the generator.
raise ValueError("Generator cannot have multiple yields.")
except StopIteration as e:
result = e.value

# change back to the original working directory
# Return the output
return result

# get the return type and set it as the return type of the wrapper
wrapper.__annotations__["return"] = inspect.signature(
func
).return_annotation # Jichen: why [2] ?
return wrapper

# decorator = tag(cmdline="slurm")(decorator)
return decorator
50 changes: 50 additions & 0 deletions examples/cmdline_orchestrator/funcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import time
from subprocess import CompletedProcess

from cmdline import cmdline
from submit import submit

from hamilton.function_modifiers import tag

@cmdline()
def echo(message: str) -> str:
# before
result = yield f"echo {message}"
# after
print(result)
return result

@submit(name="local_slurm", type="slurm") # Jichen: initialize new submitor
def echo_slurm(message: str) -> str:

# before
result = yield {
"job_name": "test_slurm1",
"n_cores": 4,
"memory": 1000,
"max_runtime": 100,
"work_dir": ".",
"monitor": True,
"cmd": f"echo {message}"
}
# after
return "done"

@submit(name="local_slurm") # Jichen: reuse submitor
def echo_slurm2(message: str) -> str:

# before
result = yield {
"job_name": "test_slurm2",
"n_cores": 4,
"memory": 1000,
"max_runtime": 100,
"work_dir": ".",
"monitor": True,
"cmd": f"echo {message}"
}
# after
return "done"

# @submit(name="local_lsf", type="lsf") # Jichen: another submitor
# def echo_lsf(message: str) -> str: ...
41 changes: 41 additions & 0 deletions examples/cmdline_orchestrator/run_cmdline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import os

from hamilton.execution.executors import MultiThreadingExecutor, SynchronousLocalTaskExecutor
from hamilton.experimental.h_cache import CachingGraphAdapter
from hamilton import driver

if __name__ == "__main__":
import funcs
from cmdline import CMDLineExecutionManager
from dagworks import adapters

from hamilton import driver

tracker = adapters.DAGWorksTracker(
project_id=19350,
api_key=os.environ["DAGWORKS_API_KEY"],
username="[email protected]",
dag_name="my_version_of_the_dag",
tags={"environment": "DEV", "team": "MY_TEAM", "version": "X"}
)

dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_execution_manager(
CMDLineExecutionManager(SynchronousLocalTaskExecutor(), MultiThreadingExecutor(5))
)
.with_modules(funcs)
.with_adapters(
tracker,
CachingGraphAdapter("./cache"),
# PrintLnHook()
)
.build()
)
dr.display_all_functions("graph.dot")
print(dr.list_available_variables())
# for var in dr.list_available_variables():
# print(dr.execute([var.name], inputs={"start": "hello"}))
result = dr.execute(["echo"], inputs={"message": "hello"})
assert result['echo'] == 'hello\n'
40 changes: 40 additions & 0 deletions examples/cmdline_orchestrator/run_slurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os

from hamilton.execution.executors import MultiThreadingExecutor, SynchronousLocalTaskExecutor
from hamilton.experimental.h_cache import CachingGraphAdapter
from hamilton import driver

if __name__ == "__main__":
import funcs
from cmdline import CMDLineExecutionManager
from dagworks import adapters

from hamilton import driver

tracker = adapters.DAGWorksTracker(
project_id=19350,
api_key=os.environ["DAGWORKS_API_KEY"],
username="[email protected]",
dag_name="my_version_of_the_dag",
tags={"environment": "DEV", "team": "MY_TEAM", "version": "X"}
)

dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_execution_manager(
CMDLineExecutionManager(SynchronousLocalTaskExecutor(), MultiThreadingExecutor(5))
)
.with_modules(funcs)
.with_adapters(
tracker,
CachingGraphAdapter("./cache"),
# PrintLnHook()
)
.build()
)
dr.display_all_functions("graph.dot")
# print(dr.list_available_variables())
# for var in dr.list_available_variables():
# print(dr.execute([var.name], inputs={"start": "hello"}))
dr.execute(["echo_slurm"], inputs={"message": "hello_slurm"})
57 changes: 57 additions & 0 deletions examples/cmdline_orchestrator/submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import inspect, functools
from submitor.cluster import Cluster
from submitor.monitor import Monitor
from hamilton.function_modifiers import tag

cluster = Cluster()

def submit(name:str, type:str):
"""Decorator to run the result of a function as a command line command."""

submitor = cluster.init(name, type)

def decorator(func):
if not inspect.isgeneratorfunction(func):
raise ValueError("Function must be a generator.")

@functools.wraps(func)
def wrapper(*args, **kwargs):
# we don't want to change the current working directory
# until we are executing the function
# name = func.__name__

# If the function is a generator, we need to block and monitor the task if required
generator = func(*args, **kwargs)
arguments: dict = next(generator)
monitor = arguments.pop("monitor", False)
cmd = arguments.pop("cmd")
# Run the command and capture the output
job_id = submitor.submit(cmd, **arguments)
# slurm_task_info := Submitted batch job 30588834

# TODO: blocked, but usually many workflow execute in parallel,
# so we only need a global monitor to poll all tasks' statuses
if monitor:
monitor = Monitor(monitor, job_id)
monitor.wait()
try:
process_result = {"job_id": job_id}
generator.send(process_result)
# ValueError should not be hit because a StopIteration should be raised, unless
# there are multiple yields in the generator.
raise ValueError("Generator cannot have multiple yields.")
except StopIteration as e:
result = e.value

# change back to the original working directory
# Return the output
return result

# get the return type and set it as the return type of the wrapper
wrapper.__annotations__["return"] = inspect.signature(
func
).return_annotation # Jichen: why [2] ?
return wrapper

decorator = tag(cmdline="slurm")(decorator)
return decorator
Empty file.
26 changes: 26 additions & 0 deletions examples/cmdline_orchestrator/submitor/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from abc import ABC, abstractmethod
from pathlib import Path
from subprocess import check_output


class Adapter(ABC):

@abstractmethod
def execute(self, cmd: str, work_dir: str | Path | None = None):
pass


class LocalAdapter(Adapter):
"""
Submit jobs locally, which means hamilton will execute the jobs on the same machine where the hamilton is running.
"""
def execute(self, cmd: str, work_dir: str | Path | None = None):
out = check_output(cmd, cwd=work_dir, shell=True, universal_newlines=True)
return out


class RemoteAdapter(Adapter):
"""
Submit jobs remotely, which means hamilton will execute the jobs on a remote machine by using ssh etc.
"""
pass
23 changes: 23 additions & 0 deletions examples/cmdline_orchestrator/submitor/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from .submitor import Submitor
from .slurm import Slurm

class Cluster:
"""
A class to manage different cluster submitors.
"""
def __init__(self, ):

self._cluster:dict[str, Submitor] = {}

def init(self, name:str, type:str)->Submitor:

if name in self._cluster:
return self._cluster[name]
else:
if type == "slurm":
self._cluster[name] = Slurm(name)
else:
raise ValueError(f"Cluster type {type} not supported.")

return self._cluster[name]

Loading