Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,46 @@ earthmover run
earthmover run -p '{"DB_USERNAME":"myuser", "DB_PASSWORD":"mypa$$w0rd"}'
```

#### Optional Fields
Optional fields can be defined to be added as null columns if not present in the DataFrame. You can specify optional fields in two ways:

1. Directly in the configuration:
```yaml
sources:
mydata:
file: ./data/mydata.csv
optional_fields:
- optional_field1
- optional_field2
```

2. Through an external YAML file:
```yaml
sources:
mydata:
file: ./data/mydata.csv
optional_fields: ./optional_fields.yml
```

The external YAML file must follow this structure:
```yaml
version: 2

optional_fields:
source1:
- optional_field1
- optional_field2
source2:
- optional_field3
- optional_field4
```

The YAML file must:
- Define optional fields under the `optional_fields` key
- Group fields by source name
- List fields under each source

The path to the external file can be absolute or relative to the location of the earthmover configuration file.

### `transformations`

Expand Down
71 changes: 70 additions & 1 deletion earthmover/nodes/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pandas as pd
import re
import yaml

from earthmover.nodes.node import Node
from earthmover import util
Expand Down Expand Up @@ -61,7 +62,75 @@ def __init__(self, *args, **kwargs):
self.optional: bool = self.config.get('optional', False)

# Optional fields can be defined to be added as null columns if not present in the DataFrame.
self.optional_fields: List[str] = self.config.get('optional_fields', [])
# This allows for them to be specified directly in the config or referenced from an external file (yaml file)
optional_fields_config = self.config.get('optional_fields', [])
# If the optional_fields_config is a string, assume a file path and load the optional fields from the yaml file.
if isinstance(optional_fields_config, str):
self.optional_fields = self.load_optional_fields_from_file(optional_fields_config)
# Otherwise, assume a list of field names and proceed with original logic.
else:
self.optional_fields = optional_fields_config

def load_optional_fields_from_file(self, file_path: str) -> List[str]:
"""
Load optional fields from an external YAML file.

The YAML file should have the following structure:
version: 2
optional_fields:
source_name:
- field1
- field2

:param file_path: Path to the file containing optional fields.
:return: List of optional field names.
"""
# Make relative paths absolute based on the location of the config file based on earthmover's package structure.
if not os.path.isabs(file_path):
file_path = os.path.abspath(os.path.join(os.path.dirname(self.config.__file__), file_path))

# Check if the file exists.
if not os.path.exists(file_path):
self.error_handler.throw(f"Optional fields file not found: {file_path}")
raise

# Check if the file is a yaml file.
file_ext = file_path.lower().split('.')[-1]
if file_ext not in ('yml', 'yaml'):
self.error_handler.throw(f"Unsupported file extension for optional fields: {file_ext}. Use .yml or .yaml")
raise

# Load the yaml file.
try:
with open(file_path, 'r') as f:
yaml_content = yaml.safe_load(f)

# Validate YAML structure
if not isinstance(yaml_content, dict):
self.error_handler.throw(f"Optional fields YAML file must be a dictionary, found {type(yaml_content)}")
raise

if 'optional_fields' not in yaml_content:
self.error_handler.throw("Optional fields YAML file must have an 'optional_fields' key")
raise

optional_fields = yaml_content['optional_fields']
if not isinstance(optional_fields, dict):
self.error_handler.throw(f"'optional_fields' must be a dictionary, found {type(optional_fields)}")
raise

# Get fields for this source
fields = optional_fields.get(self.name, [])
# If the fields are not a list, throw an error.
if not isinstance(fields, list):
self.error_handler.throw(f"Optional fields for source '{self.name}' must be a list, found {type(fields)}")
raise

return fields

except Exception as e:
self.error_handler.throw(f"Error reading optional fields from {file_path}: {str(e)}")
raise

def post_execute(self, **kwargs):
"""
Expand Down