Skip to content

Commit

Permalink
feat(low-code): added condition to TypesMap of DynamicSchemaLoader (#224
Browse files Browse the repository at this point in the history
)

Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
darynaishchenko and octavia-squidington-iii authored Jan 16, 2025
1 parent d08f1ae commit c55fbbe
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1788,6 +1788,10 @@ definitions:
- type: array
items:
type: string
condition:
type: string
interpolation_context:
- raw_schema
SchemaTypeIdentifier:
title: Schema Type Identifier
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ class HttpResponseFilter(BaseModel):
class TypesMap(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]
condition: Optional[str]


class SchemaTypeIdentifier(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,11 @@ def create_inline_schema_loader(

@staticmethod
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
return TypesMap(target_type=model.target_type, current_type=model.current_type)
return TypesMap(
target_type=model.target_type,
current_type=model.current_type,
condition=model.condition if model.condition is not None else "True",
)

def create_schema_type_identifier(
self, model: SchemaTypeIdentifierModel, config: Config, **kwargs: Any
Expand Down
16 changes: 13 additions & 3 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dpath
from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
Expand Down Expand Up @@ -53,6 +54,7 @@ class TypesMap:

target_type: Union[List[str], str]
current_type: Union[List[str], str]
condition: Optional[str]


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Expand Down Expand Up @@ -177,7 +179,7 @@ def _get_type(
if field_type_path
else "string"
)
mapped_field_type = self._replace_type_if_not_valid(raw_field_type)
mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema)
if (
isinstance(mapped_field_type, list)
and len(mapped_field_type) == 2
Expand All @@ -194,14 +196,22 @@ def _get_type(
)

def _replace_type_if_not_valid(
self, field_type: Union[List[str], str]
self,
field_type: Union[List[str], str],
raw_schema: MutableMapping[str, Any],
) -> Union[List[str], str]:
"""
Replaces a field type if it matches a type mapping in `types_map`.
"""
if self.schema_type_identifier.types_mapping:
for types_map in self.schema_type_identifier.types_mapping:
if field_type == types_map.current_type:
# conditional is optional param, setting to true if not provided
condition = InterpolatedBoolean(
condition=types_map.condition if types_map.condition is not None else "True",
parameters={},
).eval(config=self.config, raw_schema=raw_schema)

if field_type == types_map.current_type and condition:
return types_map.target_type
return field_type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
from copy import deepcopy
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -286,3 +287,93 @@ def test_dynamic_schema_loader_manifest_flow():

assert len(actual_catalog.streams) == 1
assert actual_catalog.streams[0].json_schema == expected_schema


def test_dynamic_schema_loader_with_type_conditions():
_MANIFEST_WITH_TYPE_CONDITIONS = deepcopy(_MANIFEST)
_MANIFEST_WITH_TYPE_CONDITIONS["definitions"]["party_members_stream"]["schema_loader"][
"schema_type_identifier"
]["types_mapping"].append(
{
"target_type": "number",
"current_type": "formula",
"condition": "{{ raw_schema['result']['type'] == 'number' }}",
}
)
_MANIFEST_WITH_TYPE_CONDITIONS["definitions"]["party_members_stream"]["schema_loader"][
"schema_type_identifier"
]["types_mapping"].append(
{
"target_type": "number",
"current_type": "formula",
"condition": "{{ raw_schema['result']['type'] == 'currency' }}",
}
)
_MANIFEST_WITH_TYPE_CONDITIONS["definitions"]["party_members_stream"]["schema_loader"][
"schema_type_identifier"
]["types_mapping"].append({"target_type": "array", "current_type": "formula"})

expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": ["null", "integer"]},
"first_name": {"type": ["null", "string"]},
"description": {"type": ["null", "string"]},
"static_field": {"type": ["null", "string"]},
"currency": {"type": ["null", "number"]},
"salary": {"type": ["null", "number"]},
"working_days": {"type": ["null", "array"]},
},
}
source = ConcurrentDeclarativeSource(
source_config=_MANIFEST_WITH_TYPE_CONDITIONS, config=_CONFIG, catalog=None, state=None
)
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/party_members"),
HttpResponse(
body=json.dumps(
[
{
"id": 1,
"first_name": "member_1",
"description": "First member",
"salary": 20000,
"currency": 10.4,
"working_days": ["Monday", "Tuesday"],
},
{
"id": 2,
"first_name": "member_2",
"description": "Second member",
"salary": 22000,
"currency": 10.4,
"working_days": ["Tuesday", "Wednesday"],
},
]
)
),
)
http_mocker.get(
HttpRequest(url="https://api.test.com/party_members/schema"),
HttpResponse(
body=json.dumps(
{
"fields": [
{"name": "Id", "type": "integer"},
{"name": "FirstName", "type": "string"},
{"name": "Description", "type": "singleLineText"},
{"name": "Salary", "type": "formula", "result": {"type": "number"}},
{"name": "Currency", "type": "formula", "result": {"type": "currency"}},
{"name": "WorkingDays", "type": "formula"},
]
}
)
),
)

actual_catalog = source.discover(logger=source.logger, config=_CONFIG)

assert len(actual_catalog.streams) == 1
assert actual_catalog.streams[0].json_schema == expected_schema

0 comments on commit c55fbbe

Please sign in to comment.