Skip to content

Commit

Permalink
feat(low-code): add DpathFlattenFields (#227)
Browse files Browse the repository at this point in the history
Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
darynaishchenko and octavia-squidington-iii authored Jan 20, 2025
1 parent 34a978d commit 17dd71f
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 0 deletions.
29 changes: 29 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/DpathFlattenFields"
- "$ref": "#/definitions/KeysReplace"
state_migrations:
title: State Migrations
Expand Down Expand Up @@ -1866,6 +1867,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/DpathFlattenFields"
- "$ref": "#/definitions/KeysReplace"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
Expand Down Expand Up @@ -1970,6 +1972,33 @@ definitions:
$parameters:
type: object
additionalProperties: true
DpathFlattenFields:
title: Dpath Flatten Fields
description: A transformation that flatten field values to the to top of the record.
type: object
required:
- type
- field_path
properties:
type:
type: string
enum: [DpathFlattenFields]
field_path:
title: Field Path
description: A path to field that needs to be flattened.
type: array
items:
- type: string
examples:
- ["data"]
- ["data", "*", "field"]
delete_origin_value:
title: Delete Origin Value
description: Whether to delete the origin value or keep it. Default is False.
type: boolean
$parameters:
type: object
additionalProperties: true
KeysReplace:
title: Keys Replace
description: A transformation that replaces symbols in keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,25 @@ class FlattenFields(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DpathFlattenFields(BaseModel):
type: Literal["DpathFlattenFields"]
field_path: List[str] = Field(
...,
description="A path to field that needs to be flattened.",
examples=[
["data"],
["data", "*", "field"],
],
title="Field Path",
)
delete_origin_value: Optional[bool] = Field(
False,
description="Whether to delete the origin value or keep it. Default is False.",
title="Delete Origin Value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
Expand Down Expand Up @@ -1819,6 +1838,7 @@ class Config:
KeysToLower,
KeysToSnakeCase,
FlattenFields,
DpathFlattenFields,
KeysReplace,
]
]
Expand Down Expand Up @@ -1994,6 +2014,7 @@ class DynamicSchemaLoader(BaseModel):
KeysToLower,
KeysToSnakeCase,
FlattenFields,
DpathFlattenFields,
KeysReplace,
]
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DpathExtractor as DpathExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DpathFlattenFields as DpathFlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DynamicSchemaLoader as DynamicSchemaLoaderModel,
)
Expand Down Expand Up @@ -434,6 +437,9 @@
RemoveFields,
)
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
DpathFlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
Expand Down Expand Up @@ -542,6 +548,7 @@ def _init_mappings(self) -> None:
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
KeysReplaceModel: self.create_keys_replace_transformation,
FlattenFieldsModel: self.create_flatten_fields,
DpathFlattenFieldsModel: self.create_dpath_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
Expand Down Expand Up @@ -677,6 +684,19 @@ def create_flatten_fields(
flatten_lists=model.flatten_lists if model.flatten_lists is not None else True
)

def create_dpath_flatten_fields(
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
) -> DpathFlattenFields:
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
return DpathFlattenFields(
config=config,
field_path=model_field_path,
delete_origin_value=model.delete_origin_value
if model.delete_origin_value is not None
else False,
parameters=model.parameters or {},
)

@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping, Optional, Union

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class DpathFlattenFields(RecordTransformation):
"""
Flatten fields only for provided path.
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
"""

config: Config
field_path: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
delete_origin_value: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters=parameters
)

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
matched = dpath.values(record, path)
extracted = matched[0] if matched else None
else:
extracted = dpath.get(record, path, default=[])

if isinstance(extracted, dict):
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
record.update(extracted)
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import pytest

from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields

_ANY_VALUE = -1
_DELETE_ORIGIN_VALUE = True
_DO_NOT_DELETE_ORIGIN_VALUE = False


@pytest.mark.parametrize(
[
"input_record",
"config",
"field_path",
"delete_origin_value",
"expected_record",
],
[
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
id="flatten by dpath, delete origin value",
),
pytest.param(
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
},
{},
["field2", "*", "field4"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
"field5": _ANY_VALUE,
},
id="flatten by dpath with *, don't delete origin value",
),
pytest.param(
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
},
{},
["field2", "*", "field4"],
_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
id="flatten by dpath with *, delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{"field_path": "field2"},
["{{ config['field_path'] }}"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath from config, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["*", "non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath with *, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
),
],
)
def test_dpath_flatten_lists(
input_record, config, field_path, delete_origin_value, expected_record
):
flattener = DpathFlattenFields(
field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value
)
flattener.transform(input_record)
assert input_record == expected_record

0 comments on commit 17dd71f

Please sign in to comment.