Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding a new show command, debug operation, and NoOpDestination #89

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,22 @@ Note the difference between `min()`/`max()` and `str_min()`/`str_max()`: given a

</details>

#### Debug operation

<details>
<summary><code>debug</code></summary>

Sort rows by one or more columns.
```yaml
- operation: debug
function: head | tail | describe | columns
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't currently one. What do you think? Does head make sense?

rows: 5 # (optional, default=10; ignored if function=describe|columns)
transpose: True # (default=False; ignored when function=columns)
skip_columns: [a, b, c] # to avoid logging PII
keep_columns: [x, y, z] # to look only at specific columns
```
`function=head|tail` displays the `rows` first or last rows of the dataframe, respectively. (Note that on large dataframes, these may not truly be the first/last rows, due to Dask's memory optimizations.) `function=describe` shows statistics about the values in the dataframe. `function=columns` shows the column names in the dataframe. `transpose` can be helpful with very wide dataframes. `keep_columns` defaults to all columns, `skip_columns` defaults to no columns.
</details>


### **`destinations`**
Expand Down Expand Up @@ -649,12 +665,9 @@ Set either the number of bytes, or a text representation (e.g., "100MB") to shuf
(Note: this configuration is advanced, and its use may drastically affect performance.)

# Usage
Once you have the required [setup](#setup) and your source data, run the transformations with
```bash
earthmover run -c path/to/config.yaml
```
If you omit the optional `-c` flag, `earthmover` will look for an `earthmover.yaml` in the current directory.
Once you have the required [setup](#setup) and your source data, you can use the following commands in earthmover:

## Top-level commands
See a help message with
```bash
earthmover -h
Expand All @@ -667,6 +680,38 @@ earthmover -v
earthmover --version
```

## `deps`
If you reference `packages` in your `earthmover.yaml` (see [Project Composition](#project-composition)), before you can `run` earthmover you should
```bash
earthmover deps
```
which downloads remote packages or copy local packages into a `packages/` folder in your project directory.

## `compile`
Optionally compile your project with
```bash
earthmover compile
```
This
* builds the transformation graph
* does some basic validation, like making sure that references `sources` exist, and that the graph is acyclic
* if `config.show_graph` is True, produces the graph visualization images
* outputs `earthmover_compiled.yaml` in your current directory, which is a stitched-together YAML document that earthmover would execute on `earthmover run` - with packages composed, variables replaced, Jinja rendered, etc.

## `show`
While building your transformation instructions, you may find it helpful to look at the results of a particular transformation step. This can be done with
```bash
earthmover show -s my_transformation --function head --rows 3 --transpose
```
which will log output from the selected transformation node (`my_transformation`) in the graph. The flags `function`, `rows`, and `transpose` correspond to the options for a [debug operation](#debug-operation). Note that, unlike the behavior of [selectors](#selectors) with `earthmover run` which allow multiple (comma-separated) nodes to be specified, with `earthmover show` you should select exactly one transformation node. The output will reflect the dataframe after that transformation node has been executed.

## `run`
Run the transformations you've defined with
```bash
earthmover run -c path/to/config.yaml
```
If you omit the optional `-c` flag, `earthmover` will look for an `earthmover.yaml` in the current directory.


# Features
This tool includes several special features:
Expand Down
27 changes: 25 additions & 2 deletions earthmover/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from earthmover.earthmover import Earthmover


class ExitOnExceptionHandler(logging.StreamHandler):
"""

Expand Down Expand Up @@ -50,7 +49,7 @@ def main(argv=None):
parser.add_argument('command',
nargs="?",
type=str,
help='the command to run: `run`, `compile`, `visualize`'
help='the command to run: `deps`, `compile`, `run`, `show`, `visualize`'
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we actually have a visualize command, maybe that should be removed.

)
parser.add_argument("-c", "--config-file",
nargs="?",
Expand Down Expand Up @@ -94,12 +93,27 @@ def main(argv=None):
type=str,
help='produces a JSON output file with structured information about run results'
)
parser.add_argument("--function",
type=str,
help='what to display with `earthmover show` (head, tail, describe, columns)'
)
parser.add_argument("--rows",
type=int,
help='how many rows of output to display with `earthmover show`'
)
parser.add_argument("--transpose",
action='store_true',
help='transposes the output of `earthmover show`'
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three new CLI flags are only used by earthmover show.


# Set empty defaults in case they've not been populated by the user.
parser.set_defaults(**{
"selector": "*",
"params": "",
"results_file": "",
"function": "head",
"rows": 10,
"transpose": False,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default values if the flags are omitted. I think these defaults make sense, but happy to discuss alternatives.

})

### Parse the user-inputs and run Earthmover, depending on the command and subcommand passed.
Expand Down Expand Up @@ -196,6 +210,15 @@ def main(argv=None):
logger.exception(e, exc_info=em.state_configs['show_stacktrace'])
raise

# Subcommand: show (compile + execute only up to one transformation, and display a debug operation)
elif args.command == 'show':
try:
em.show(selector=args.selector, func=args.function, rows=args.rows, transpose=args.transpose)

except Exception as err:
logger.exception(err, exc_info=em.state_configs['show_stacktrace'])
raise

# Subcommand: run (compile + execute)
# This is the default if none is specified.
elif args.command == 'run' or not args.command:
Expand Down
30 changes: 30 additions & 0 deletions earthmover/earthmover.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from earthmover.nodes.destination import Destination
from earthmover.nodes.source import Source
from earthmover.nodes.transformation import Transformation
from earthmover.nodes.destination import *
tomreitz marked this conversation as resolved.
Show resolved Hide resolved
from earthmover.yaml_parser import JinjaEnvironmentYamlLoader
from earthmover import util

Expand Down Expand Up @@ -219,6 +220,35 @@ def filter_graph_on_selector(self, graph: Graph, selector: str) -> Graph:

return active_graph

def show(self, selector="", func="head", rows=10, transpose=False):
self.compile()
config = {
"source": f"$transformations.{selector}",
"operations": [
{
"operation": "debug",
"function": f"{func}",
"rows": rows,
"transpose": transpose
}
]
}
transformation_node = Transformation(name=f"{selector}_show", config=config, earthmover=self)
self.graph.add_node(f"$transformations.{selector}_show", data=transformation_node)
self.graph.add_edge(f"$transformations.{selector}", f"$transformations.{selector}_show")
transformation_node.set_upstream_source(f"$transformations.{selector}", self.graph.ref(f"$transformations.{selector}"))
config = {
"kind": "noop",
"source": f"$transformations.{selector}_show"
}
destination_node = NoOpDestination(name=f"{selector}_destination", config=config, earthmover=self)
self.graph.add_node(f"$destinations.{selector}_destination", data=destination_node)
self.graph.add_edge(f"$transformations.{selector}_show", f"$destinations.{selector}_destination")
destination_node.set_upstream_source(f"$transformations.{selector}_show", self.graph.ref(f"$transformations.{selector}_show"))
active_graph = self.filter_graph_on_selector(self.graph, selector=f"{selector}_destination")
self.execute(active_graph)


Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explaining what happens here; suppose you select the transformation node my_node (with earthmover show -s my_node):

  • we create a transformation node my_node_show with source=my_node which contains just one debug operation (per the CLI inputs)
  • we connect the new transformation node my_node_show to a new NoOpDestination node my_node_destination so earthmover wouldn't prune off our dangling transformation node
  • we filter down the graph to just the selected my_node (and upstream and downstream nodes)
  • we execute this subgraph

The result is that the debug operation will cause information about my_node to be output to the console.

def execute(self, graph: Graph):
"""
Iterate subgraphs in `Earthmover.graph` and execute each Node in order.
Expand Down
48 changes: 46 additions & 2 deletions earthmover/nodes/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from typing import Tuple

from earthmover.yaml_parser import YamlMapping


class Destination(Node):
"""
Expand All @@ -18,8 +20,16 @@ class Destination(Node):
mode: str = None # Documents which class was chosen.
allowed_configs: Tuple[str] = ('debug', 'expect', 'show_progress', 'repartition', 'source',)

def __new__(cls, *args, **kwargs):
return object.__new__(FileDestination)
def __new__(cls, name: str, config, *args, **kwargs):
if (type(config)==dict or type(config)==YamlMapping) and 'kind' in config.keys():
if config.get("kind","") == 'file':
return object.__new__(FileDestination)
elif config.get("kind","") == 'noop':
return object.__new__(NoOpDestination)
elif (type(config)==dict or type(config)==YamlMapping) and 'kind' not in config.keys():
# default for backward compatibility
return object.__new__(FileDestination)
# else: throw an error?
Copy link
Collaborator Author

@tomreitz tomreitz May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I'm adding a new Destination class, I need a way to instantiate it... since type is already a property of the Node superclass, and class is a reserved Python keyword, I landed on kind as the property one can use (in a destination's config) to pick a specific destination.

In the future, we can extend this to other destination kinds, such as file.jsonl, file.csv, file.tsv, file.parquet, perhaps even database.snowflake or database.postgres (with additional column typing config).

The default destination kind (when unspecified by the user) is the existing FileDestination, for backward compatibility.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've started using extension as this keyword. Maybe we can set extension to "debug" in this new model, instead of adding a new keyword field.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment here: I think overloading extension is not a good solution. A file's extension is not one-to-one with it's type.


def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -132,3 +142,37 @@ def render_row(self, row: pd.Series):
raise

return json_string


class NoOpDestination(Destination):
"""
This "no-op" destination facilitates
- the new `show` command, which displays info about a partially-transformed dataframe
- using earthmover programmatically as a compute engine
without requiring any data to be output to file(s).

This is necessary because graph paths not connected to a destination are pruned.
"""
mode: str = 'no-op'
allowed_configs: Tuple[str] = (
'debug', 'expect', 'show_progress', 'repartition', 'source',
'kind',
)


def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def execute(self, **kwargs):
"""
There is a bug in Dask where `dd.to_csv(mode='a', single_file=True)` fails.
This is resolved in 2023.8.1: https://docs.dask.org/en/stable/changelog.html#id7

:return:
"""
super().execute(**kwargs)

self.data = (
self.upstream_sources[self.source].data
.map_partitions(lambda x: x.apply(self.render_row, axis=1), meta=pd.Series('str'))
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new destination type does nothing, as the name suggests.

4 changes: 2 additions & 2 deletions earthmover/nodes/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, name: str, config: 'YamlMapping', *, earthmover: 'Earthmover'
self.error_handler: 'ErrorHandler' = earthmover.error_handler

self.error_handler.ctx.update(
file=self.config.__file__, line=self.config.__line__, node=self, operation=None
file=self.config.get("__file__",""), line=self.config.get("__line__",0), node=self, operation=None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since earthmover show injects nodes into the graph which didn't come from a file, I had to modify the context update in a few places like so.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To circumvent needing to change these lines, we should just initialize the No-Op config block as a YamlMapping and set default values for the __file__ and __line__ attributes in the class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what would those default values be? (when nodes are added programmatically, not from a YAML configuration file)

Can __file__ and __line__ be None? That's the only default value I think would make sense, but I don't know how it would come through in the logging messages, or if it would cause errors.

)

self.upstream_sources: Dict[str, Optional['Node']] = {}
Expand Down Expand Up @@ -80,7 +80,7 @@ def execute(self, **kwargs):
:return:
"""
self.error_handler.ctx.update(
file=self.config.__file__, line=self.config.__line__, node=self, operation=None
file=self.config.get("__file__",""), line=self.config.get("__line__",0), node=self, operation=None
)

# Turn on the progress bar manually.
Expand Down
2 changes: 1 addition & 1 deletion earthmover/nodes/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, *args, **kwargs):

# Force error-handler reset before graph is built.
self.error_handler.ctx.update(
file=self.config.__file__, line=self.config.__line__, node=self, operation=None
file=self.config.get("__file__",""), line=self.config.get("__line__",0), node=self, operation=None
)

def execute(self):
Expand Down
51 changes: 51 additions & 0 deletions earthmover/operations/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,54 @@ def execute(self, data: 'DataFrame', data_mapping: Dict[str, Node], **kwargs) ->
raise

return data


class DebugOperation(Operation):
"""

"""
allowed_configs: Tuple[str] = (
'operation', 'function', 'rows', 'transpose', 'skip_columns', 'keep_columns'
)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.func = self.error_handler.assert_get_key(self.config, 'function', dtype=str)
self.rows = self.error_handler.assert_get_key(self.config, 'rows', dtype=int, default=5, required=False)
self.skip_columns = self.error_handler.assert_get_key(self.config, 'skip_columns', dtype=list, required=False, default=[])
self.keep_columns = self.error_handler.assert_get_key(self.config, 'keep_columns', dtype=list, required=False, default=None)
self.transpose = self.error_handler.assert_get_key(self.config, 'transpose', dtype=bool, required=False, default=None)

def execute(self, data: 'DataFrame', data_mapping: Dict[str, Node], **kwargs) -> 'DataFrame':
"""

:return:
"""
super().execute(data, data_mapping=data_mapping, **kwargs)

# subset to desired columns
if not self.keep_columns: self.keep_columns = list(data.columns)
selected_columns = [ c for c in list(data.columns) if c in self.keep_columns and c not in self.skip_columns ]
debug_data = data[selected_columns]

# construct log message
transformation_name = f"${self.type}s.{self.name.replace('.operations:debug','')}"
rows_str = ' '+str(self.rows) if self.func in ['head','tail'] else ''
transpose_str = ', Transpose' if self.transpose else ''
self.earthmover.logger.info(f"debug ({self.func}{rows_str}{transpose_str}) for {transformation_name}:")

# call function and display debug info
if self.func == 'head':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A warning that .head() will raise a warning if the number of rows in the dataframe are less than those specified. We should emulate the head() behavior in the current debug logic above the conditional.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this and it doesn't seem to... not sure why, or maybe that's configurable.

if self.transpose: print(debug_data.head(self.rows).transpose().reset_index(names="column").to_string(index=False))
tomreitz marked this conversation as resolved.
Show resolved Hide resolved
else: print(debug_data.head(self.rows).to_string(index=False))
elif self.func == 'tail':
if self.transpose: print(debug_data.tail(self.rows).transpose().reset_index(names="column").to_string(index=False))
else: print(debug_data.tail(self.rows).to_string(index=False))
elif self.func == 'describe':
if self.transpose: print(debug_data.compute().describe().transpose().reset_index(names="column").to_string(index=False))
else: print(debug_data.compute().describe())
elif self.func == 'columns':
print(list(data.columns))

# do not actually transform the data
return data
3 changes: 2 additions & 1 deletion earthmover/operations/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __new__(cls, name: str, config: 'YamlMapping', *, earthmover: 'Earthmover'):
operation_mapping = {
'join': dataframe_operations.JoinOperation,
'union': dataframe_operations.UnionOperation,
'debug': dataframe_operations.DebugOperation,

'add_columns': column_operations.AddColumnsOperation,
'modify_columns': column_operations.ModifyColumnsOperation,
Expand Down Expand Up @@ -79,7 +80,7 @@ def execute(self, data: 'DataFrame', *, data_mapping: Dict[str, Node], **kwargs)
:return:
"""
self.error_handler.ctx.update(
file=self.config.__file__, line=self.config.__line__, node=self, operation=None
file=self.config.get("__file__",""), line=self.config.get("__line__",0), node=self, operation=None
)

pass
Expand Down