-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding a new show command, debug operation, and NoOpDestination #89
base: main
Are you sure you want to change the base?
Changes from all commits
f48f1bc
9e199b8
7abb5e1
d5f3187
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,6 @@ | |
|
||
from earthmover.earthmover import Earthmover | ||
|
||
|
||
class ExitOnExceptionHandler(logging.StreamHandler): | ||
""" | ||
|
||
|
@@ -50,7 +49,7 @@ def main(argv=None): | |
parser.add_argument('command', | ||
nargs="?", | ||
type=str, | ||
help='the command to run: `run`, `compile`, `visualize`' | ||
help='the command to run: `deps`, `compile`, `run`, `show`, `visualize`' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we actually have a |
||
) | ||
parser.add_argument("-c", "--config-file", | ||
nargs="?", | ||
|
@@ -94,12 +93,27 @@ def main(argv=None): | |
type=str, | ||
help='produces a JSON output file with structured information about run results' | ||
) | ||
parser.add_argument("--function", | ||
type=str, | ||
help='what to display with `earthmover show` (head, tail, describe, columns)' | ||
) | ||
parser.add_argument("--rows", | ||
type=int, | ||
help='how many rows of output to display with `earthmover show`' | ||
) | ||
parser.add_argument("--transpose", | ||
action='store_true', | ||
help='transposes the output of `earthmover show`' | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These three new CLI flags are only used by |
||
|
||
# Set empty defaults in case they've not been populated by the user. | ||
parser.set_defaults(**{ | ||
"selector": "*", | ||
"params": "", | ||
"results_file": "", | ||
"function": "head", | ||
"rows": 10, | ||
"transpose": False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Default values if the flags are omitted. I think these defaults make sense, but happy to discuss alternatives. |
||
}) | ||
|
||
### Parse the user-inputs and run Earthmover, depending on the command and subcommand passed. | ||
|
@@ -196,6 +210,15 @@ def main(argv=None): | |
logger.exception(e, exc_info=em.state_configs['show_stacktrace']) | ||
raise | ||
|
||
# Subcommand: show (compile + execute only up to one transformation, and display a debug operation) | ||
elif args.command == 'show': | ||
try: | ||
em.show(selector=args.selector, func=args.function, rows=args.rows, transpose=args.transpose) | ||
|
||
except Exception as err: | ||
logger.exception(err, exc_info=em.state_configs['show_stacktrace']) | ||
raise | ||
|
||
# Subcommand: run (compile + execute) | ||
# This is the default if none is specified. | ||
elif args.command == 'run' or not args.command: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ | |
from earthmover.graph import Graph | ||
from earthmover.package import Package | ||
from earthmover.runs_file import RunsFile | ||
from earthmover.nodes.destination import Destination | ||
from earthmover.nodes.destination import Destination, NoOpDestination | ||
from earthmover.nodes.source import Source | ||
from earthmover.nodes.transformation import Transformation | ||
from earthmover.yaml_parser import JinjaEnvironmentYamlLoader | ||
|
@@ -219,6 +219,35 @@ def filter_graph_on_selector(self, graph: Graph, selector: str) -> Graph: | |
|
||
return active_graph | ||
|
||
def show(self, selector="", func="head", rows=10, transpose=False): | ||
self.compile() | ||
config = { | ||
"source": f"$transformations.{selector}", | ||
"operations": [ | ||
{ | ||
"operation": "debug", | ||
"function": f"{func}", | ||
"rows": rows, | ||
"transpose": transpose | ||
} | ||
] | ||
} | ||
transformation_node = Transformation(name=f"{selector}_show", config=config, earthmover=self) | ||
self.graph.add_node(f"$transformations.{selector}_show", data=transformation_node) | ||
self.graph.add_edge(f"$transformations.{selector}", f"$transformations.{selector}_show") | ||
transformation_node.set_upstream_source(f"$transformations.{selector}", self.graph.ref(f"$transformations.{selector}")) | ||
config = { | ||
"kind": "noop", | ||
"source": f"$transformations.{selector}_show" | ||
} | ||
destination_node = NoOpDestination(name=f"{selector}_destination", config=config, earthmover=self) | ||
self.graph.add_node(f"$destinations.{selector}_destination", data=destination_node) | ||
self.graph.add_edge(f"$transformations.{selector}_show", f"$destinations.{selector}_destination") | ||
destination_node.set_upstream_source(f"$transformations.{selector}_show", self.graph.ref(f"$transformations.{selector}_show")) | ||
active_graph = self.filter_graph_on_selector(self.graph, selector=f"{selector}_destination") | ||
self.execute(active_graph) | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explaining what happens here; suppose you select the transformation node
The result is that the debug operation will cause information about |
||
def execute(self, graph: Graph): | ||
""" | ||
Iterate subgraphs in `Earthmover.graph` and execute each Node in order. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ | |
|
||
from typing import Tuple | ||
|
||
from earthmover.yaml_parser import YamlMapping | ||
|
||
|
||
class Destination(Node): | ||
""" | ||
|
@@ -18,8 +20,16 @@ class Destination(Node): | |
mode: str = None # Documents which class was chosen. | ||
allowed_configs: Tuple[str] = ('debug', 'expect', 'show_progress', 'repartition', 'source',) | ||
|
||
def __new__(cls, *args, **kwargs): | ||
return object.__new__(FileDestination) | ||
def __new__(cls, name: str, config, *args, **kwargs): | ||
if (type(config)==dict or type(config)==YamlMapping) and 'kind' in config.keys(): | ||
if config.get("kind","") == 'file': | ||
return object.__new__(FileDestination) | ||
elif config.get("kind","") == 'noop': | ||
return object.__new__(NoOpDestination) | ||
elif (type(config)==dict or type(config)==YamlMapping) and 'kind' not in config.keys(): | ||
# default for backward compatibility | ||
return object.__new__(FileDestination) | ||
# else: throw an error? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because I'm adding a new Destination class, I need a way to instantiate it... since In the future, we can extend this to other destination The default destination There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We've started using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment here: I think overloading |
||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
@@ -132,3 +142,37 @@ def render_row(self, row: pd.Series): | |
raise | ||
|
||
return json_string | ||
|
||
|
||
class NoOpDestination(Destination): | ||
""" | ||
This "no-op" destination facilitates | ||
- the new `show` command, which displays info about a partially-transformed dataframe | ||
- using earthmover programmatically as a compute engine | ||
without requiring any data to be output to file(s). | ||
|
||
This is necessary because graph paths not connected to a destination are pruned. | ||
""" | ||
mode: str = 'no-op' | ||
allowed_configs: Tuple[str] = ( | ||
'debug', 'expect', 'show_progress', 'repartition', 'source', | ||
'kind', | ||
) | ||
|
||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
def execute(self, **kwargs): | ||
""" | ||
There is a bug in Dask where `dd.to_csv(mode='a', single_file=True)` fails. | ||
This is resolved in 2023.8.1: https://docs.dask.org/en/stable/changelog.html#id7 | ||
|
||
:return: | ||
""" | ||
super().execute(**kwargs) | ||
|
||
self.data = ( | ||
self.upstream_sources[self.source].data | ||
.map_partitions(lambda x: x.apply(self.render_row, axis=1), meta=pd.Series('str')) | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new destination type does nothing, as the name suggests. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,7 @@ def __init__(self, name: str, config: 'YamlMapping', *, earthmover: 'Earthmover' | |
self.error_handler: 'ErrorHandler' = earthmover.error_handler | ||
|
||
self.error_handler.ctx.update( | ||
file=self.config.__file__, line=self.config.__line__, node=self, operation=None | ||
file=self.config.get("__file__",""), line=self.config.get("__line__",0), node=self, operation=None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To circumvent needing to change these lines, we should just initialize the No-Op config block as a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But what would those default values be? (when nodes are added programmatically, not from a YAML configuration file) Can |
||
) | ||
|
||
self.upstream_sources: Dict[str, Optional['Node']] = {} | ||
|
@@ -80,7 +80,7 @@ def execute(self, **kwargs): | |
:return: | ||
""" | ||
self.error_handler.ctx.update( | ||
file=self.config.__file__, line=self.config.__line__, node=self, operation=None | ||
file=self.config.get("__file__",""), line=self.config.get("__line__",0), node=self, operation=None | ||
) | ||
|
||
# Turn on the progress bar manually. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -177,3 +177,59 @@ def execute(self, data: 'DataFrame', data_mapping: Dict[str, Node], **kwargs) -> | |
raise | ||
|
||
return data | ||
|
||
|
||
class DebugOperation(Operation): | ||
""" | ||
|
||
""" | ||
allowed_configs: Tuple[str] = ( | ||
'operation', 'function', 'rows', 'transpose', 'skip_columns', 'keep_columns' | ||
) | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.func = self.error_handler.assert_get_key(self.config, 'function', dtype=str) | ||
self.rows = self.error_handler.assert_get_key(self.config, 'rows', dtype=int, default=10, required=False) | ||
self.skip_columns = self.error_handler.assert_get_key(self.config, 'skip_columns', dtype=list, required=False, default=[]) | ||
self.keep_columns = self.error_handler.assert_get_key(self.config, 'keep_columns', dtype=list, required=False, default=None) | ||
self.transpose = self.error_handler.assert_get_key(self.config, 'transpose', dtype=bool, required=False, default=None) | ||
|
||
def execute(self, data: 'DataFrame', data_mapping: Dict[str, Node], **kwargs) -> 'DataFrame': | ||
""" | ||
|
||
:return: | ||
""" | ||
super().execute(data, data_mapping=data_mapping, **kwargs) | ||
|
||
# subset to desired columns | ||
if not self.keep_columns: self.keep_columns = list(data.columns) | ||
selected_columns = [ c for c in list(data.columns) if c in self.keep_columns and c not in self.skip_columns ] | ||
debug_data = data[selected_columns] | ||
|
||
# construct log message | ||
transformation_name = f"${self.type}s.{self.name.replace('.operations:debug','')}" | ||
rows_str = ' '+str(self.rows) if self.func in ['head','tail'] else '' | ||
transpose_str = ', Transpose' if self.transpose else '' | ||
self.earthmover.logger.info(f"debug ({self.func}{rows_str}{transpose_str}) for {transformation_name}:") | ||
|
||
# call function and display debug info | ||
if self.func == 'head': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A warning that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tested this and it doesn't seem to... not sure why, or maybe that's configurable. |
||
debug = debug_data.head(self.rows) | ||
if self.transpose: debug = debug.transpose().reset_index(names="column") | ||
debug = debug.to_string(index=False) | ||
elif self.func == 'tail': | ||
debug = debug_data.tail(self.rows) | ||
if self.transpose: debug = debug.transpose().reset_index(names="column") | ||
debug = debug.to_string(index=False) | ||
elif self.func == 'describe': | ||
debug = debug_data.compute().describe() | ||
if self.transpose: debug = debug.transpose().reset_index(names="column") | ||
debug = debug.to_string(index=False) | ||
elif self.func == 'columns': | ||
debug = list(data.columns) | ||
# else: throw an error? | ||
print(debug) | ||
|
||
# do not actually transform the data | ||
return data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the default function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't currently one. What do you think? Does
head
make sense?