Skip to content

Commit

Permalink
feat(low-code): add check dynamic stream (#223)
Browse files Browse the repository at this point in the history
Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
lazebnyi and octavia-squidington-iii authored Jan 16, 2025
1 parent 2185bd9 commit d08f1ae
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 10 deletions.
20 changes: 18 additions & 2 deletions airbyte_cdk/sources/declarative/checks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from typing import Mapping

from pydantic.v1 import BaseModel

from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.models import (
CheckDynamicStream as CheckDynamicStreamModel,
)
from airbyte_cdk.sources.declarative.models import (
CheckStream as CheckStreamModel,
)

COMPONENTS_CHECKER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
"CheckStream": CheckStreamModel,
"CheckDynamicStream": CheckDynamicStreamModel,
}

__all__ = ["CheckStream", "ConnectionChecker"]
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"]
51 changes: 51 additions & 0 deletions airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import logging
import traceback
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Tuple

from airbyte_cdk import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


@dataclass
class CheckDynamicStream(ConnectionChecker):
"""
Checks the connections by checking availability of one or many dynamic streams
Attributes:
stream_count (int): numbers of streams to check
"""

stream_count: int
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
streams = source.streams(config=config)
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"

for stream_index in range(min(self.stream_count, len(streams))):
stream = streams[stream_index]
availability_strategy = HttpAvailabilityStrategy()
try:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
if not stream_is_available:
return False, reason
except Exception as error:
logger.error(
f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}"
)
return False, f"Unable to connect to stream {stream.name} - {error}"
return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ properties:
type: string
enum: [DeclarativeSource]
check:
"$ref": "#/definitions/CheckStream"
anyOf:
- "$ref": "#/definitions/CheckStream"
- "$ref": "#/definitions/CheckDynamicStream"
streams:
type: array
items:
Expand Down Expand Up @@ -303,6 +305,21 @@ definitions:
examples:
- ["users"]
- ["users", "contacts"]
CheckDynamicStream:
title: Dynamic Streams to Check
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.
type: object
required:
- type
- stream_count
properties:
type:
type: string
enum: [CheckDynamicStream]
stream_count:
title: Stream Count
description: Numbers of the streams to try reading from when running a check operation.
type: integer
CompositeErrorHandler:
title: Composite Error Handler
description: Error handler that sequentially iterates over a list of error handlers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ConnectorSpecification,
FailureType,
)
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
Expand Down Expand Up @@ -107,7 +108,7 @@ def connection_checker(self) -> ConnectionChecker:
if "type" not in check:
check["type"] = "CheckStream"
check_stream = self._constructor.create_component(
CheckStreamModel,
COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
check,
dict(),
emit_connector_builder_messages=self._emit_connector_builder_messages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ class CheckStream(BaseModel):
)


class CheckDynamicStream(BaseModel):
type: Literal["CheckDynamicStream"]
stream_count: int = Field(
...,
description="Numbers of the streams to try reading from when running a check operation.",
title="Stream Count",
)


class ConcurrencyLevel(BaseModel):
type: Optional[Literal["ConcurrencyLevel"]] = None
default_concurrency: Union[int, str] = Field(
Expand Down Expand Up @@ -1661,7 +1670,7 @@ class Config:
extra = Extra.forbid

type: Literal["DeclarativeSource"]
check: CheckStream
check: Union[CheckStream, CheckDynamicStream]
streams: List[DeclarativeStream]
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
version: str = Field(
Expand All @@ -1687,7 +1696,7 @@ class Config:
extra = Extra.forbid

type: Literal["DeclarativeSource"]
check: CheckStream
check: Union[CheckStream, CheckDynamicStream]
streams: Optional[List[DeclarativeStream]] = None
dynamic_streams: List[DynamicDeclarativeStream]
version: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
SessionTokenProvider,
TokenProvider,
)
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
Expand Down Expand Up @@ -123,6 +123,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
BearerAuthenticator as BearerAuthenticatorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CheckDynamicStream as CheckDynamicStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CheckStream as CheckStreamModel,
)
Expand Down Expand Up @@ -493,6 +496,7 @@ def _init_mappings(self) -> None:
BasicHttpAuthenticatorModel: self.create_basic_http_authenticator,
BearerAuthenticatorModel: self.create_bearer_authenticator,
CheckStreamModel: self.create_check_stream,
CheckDynamicStreamModel: self.create_check_dynamic_stream,
CompositeErrorHandlerModel: self.create_composite_error_handler,
CompositeRawDecoderModel: self.create_composite_raw_decoder,
ConcurrencyLevelModel: self.create_concurrency_level,
Expand Down Expand Up @@ -846,6 +850,12 @@ def create_bearer_authenticator(
def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any) -> CheckStream:
return CheckStream(stream_names=model.stream_names, parameters={})

@staticmethod
def create_check_dynamic_stream(
model: CheckDynamicStreamModel, config: Config, **kwargs: Any
) -> CheckDynamicStream:
return CheckDynamicStream(stream_count=model.stream_count, parameters={})

def create_composite_error_handler(
self, model: CompositeErrorHandlerModel, config: Config, **kwargs: Any
) -> CompositeErrorHandler:
Expand Down
5 changes: 2 additions & 3 deletions airbyte_cdk/sources/declarative/requesters/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
- Components marked as optional are not required and can be ignored.
- if `url_requester` is not provided, `urls_extractor` will get urls from the `polling_job_response`
- interpolation_context, e.g. `create_job_response` or `polling_job_response` can be obtained from stream_slice


```mermaid
---
title: AsyncHttpJobRepository Sequence Diagram
---
sequenceDiagram
sequenceDiagram
participant AsyncHttpJobRepository as AsyncOrchestrator
participant CreationRequester as creation_requester
participant PollingRequester as polling_requester
Expand Down Expand Up @@ -54,4 +53,4 @@ sequenceDiagram
DeleteRequester -->> AsyncHttpJobRepository: Confirmation
```
```
159 changes: 159 additions & 0 deletions unit_tests/sources/declarative/checks/test_check_dynamic_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import json
import logging

import pytest

from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse

logger = logging.getLogger("test")

_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"}

_MANIFEST = {
"version": "6.7.0",
"type": "DeclarativeSource",
"check": {"type": "CheckDynamicStream", "stream_count": 1},
"dynamic_streams": [
{
"type": "DynamicDeclarativeStream",
"stream_template": {
"type": "DeclarativeStream",
"name": "",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"ABC": {"type": "number"},
"AED": {"type": "number"},
},
"type": "object",
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"$parameters": {"item_id": ""},
"url_base": "https://api.test.com",
"path": "/items/{{parameters['item_id']}}",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
},
},
"components_resolver": {
"type": "HttpComponentsResolver",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "items",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
},
"components_mapping": [
{
"type": "ComponentMappingDefinition",
"field_path": ["name"],
"value": "{{components_values['name']}}",
},
{
"type": "ComponentMappingDefinition",
"field_path": [
"retriever",
"requester",
"$parameters",
"item_id",
],
"value": "{{components_values['id']}}",
},
],
},
}
],
}


@pytest.mark.parametrize(
"response_code, available_expectation, expected_messages",
[
pytest.param(
404,
False,
["Not found. The requested resource was not found on the server."],
id="test_stream_unavailable_unhandled_error",
),
pytest.param(
403,
False,
["Forbidden. You don't have permission to access this resource."],
id="test_stream_unavailable_handled_error",
),
pytest.param(200, True, [], id="test_stream_available"),
pytest.param(
401,
False,
["Unauthorized. Please ensure you are authenticated correctly."],
id="test_stream_unauthorized_error",
),
],
)
def test_check_dynamic_stream(response_code, available_expectation, expected_messages):
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/items"),
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "item_1"},
{"id": 2, "name": "item_2"},
]
)
),
)
http_mocker.get(
HttpRequest(url="https://api.test.com/items/1"),
HttpResponse(body=json.dumps(expected_messages), status_code=response_code),
)

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST,
config=_CONFIG,
catalog=None,
state=None,
)

stream_is_available, reason = source.check_connection(logger, _CONFIG)

assert stream_is_available == available_expectation
for message in expected_messages:
assert message in reason

0 comments on commit d08f1ae

Please sign in to comment.