diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index fb6385c21..ab667c655 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -22,6 +22,9 @@ ) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition +from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( + AbstractStreamStateConverter, +) from airbyte_cdk.sources.types import Record, StreamSlice, StreamState logger = logging.getLogger("airbyte") @@ -72,6 +75,7 @@ def __init__( stream_state: Any, message_repository: MessageRepository, connector_state_manager: ConnectorStateManager, + connector_state_converter: AbstractStreamStateConverter, cursor_field: CursorField, ) -> None: self._global_cursor: Optional[StreamState] = {} @@ -79,6 +83,7 @@ def __init__( self._stream_namespace = stream_namespace self._message_repository = message_repository self._connector_state_manager = connector_state_manager + self._connector_state_converter = connector_state_converter self._cursor_field = cursor_field self._cursor_factory = cursor_factory @@ -301,8 +306,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None: ): # We assume that `stream_state` is in a global format that can be applied to all partitions. # Example: {"global_state_format_key": "global_state_format_value"} - self._global_cursor = deepcopy(stream_state) - self._new_global_cursor = deepcopy(stream_state) + self._set_global_state(stream_state) else: self._use_global_cursor = stream_state.get("use_global_cursor", False) @@ -319,8 +323,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None: # set default state for missing partitions if it is per partition with fallback to global if self._GLOBAL_STATE_KEY in stream_state: - self._global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY]) - self._new_global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY]) + self._set_global_state(stream_state[self._GLOBAL_STATE_KEY]) # Set initial parent state if stream_state.get("parent_state"): @@ -329,6 +332,27 @@ def _set_initial_state(self, stream_state: StreamState) -> None: # Set parent state for partition routers based on parent streams self._partition_router.set_initial_state(stream_state) + def _set_global_state(self, stream_state: Mapping[str, Any]) -> None: + """ + Initializes the global cursor state from the provided stream state. + + If the cursor field key is present in the stream state, its value is parsed, + formatted, and stored as the global cursor. This ensures consistency in state + representation across partitions. + """ + if self.cursor_field.cursor_field_key in stream_state: + global_state_value = stream_state[self.cursor_field.cursor_field_key] + final_format_global_state_value = self._connector_state_converter.output_format( + self._connector_state_converter.parse_value(global_state_value) + ) + + fixed_global_state = { + self.cursor_field.cursor_field_key: final_format_global_state_value + } + + self._global_cursor = deepcopy(fixed_global_state) + self._new_global_cursor = deepcopy(fixed_global_state) + def observe(self, record: Record) -> None: if not self._use_global_cursor and self.limit_reached(): self._use_global_cursor = True diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index ab950cf89..a8736986e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1210,6 +1210,22 @@ def create_concurrent_cursor_from_perpartition_cursor( ) cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + datetime_format = datetime_based_cursor_model.datetime_format + + cursor_granularity = ( + parse_duration(datetime_based_cursor_model.cursor_granularity) + if datetime_based_cursor_model.cursor_granularity + else None + ) + + connector_state_converter: DateTimeStreamStateConverter + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format=datetime_format, + input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats, + is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state + cursor_granularity=cursor_granularity, + ) + # Create the cursor factory cursor_factory = ConcurrentCursorFactory( partial( @@ -1233,6 +1249,7 @@ def create_concurrent_cursor_from_perpartition_cursor( stream_state=stream_state, message_repository=self._message_repository, # type: ignore connector_state_manager=state_manager, + connector_state_converter=connector_state_converter, cursor_field=cursor_field, ) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 3d7f98124..cca92ae46 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -65,7 +65,7 @@ }, "cursor_incremental_sync": { "type": "DatetimeBasedCursor", - "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z", "%ms"], "datetime_format": "%Y-%m-%dT%H:%M:%SZ", "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", "start_datetime": {"datetime": "{{ config.get('start_date')}}"}, @@ -399,13 +399,16 @@ def _run_read( VOTE_200_CREATED_AT = "2024-01-12T00:00:00Z" # Latest vote in partition 20 VOTE_210_CREATED_AT = "2024-01-12T00:00:15Z" # Latest vote in partition 21 VOTE_300_CREATED_AT = "2024-01-10T00:00:00Z" # Latest vote in partition 30 +VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # Latest vote in partition 30 # Initial State Constants PARENT_COMMENT_CURSOR_PARTITION_1 = "2023-01-04T00:00:00Z" # Parent comment cursor (partition) PARENT_POSTS_CURSOR = "2024-01-05T00:00:00Z" # Parent posts cursor (expected in state) INITIAL_STATE_PARTITION_10_CURSOR = "2024-01-02T00:00:01Z" +INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP = 1704153601000 INITIAL_STATE_PARTITION_11_CURSOR = "2024-01-03T00:00:02Z" +INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP = 1704240002000 INITIAL_GLOBAL_CURSOR = INITIAL_STATE_PARTITION_11_CURSOR INITIAL_GLOBAL_CURSOR_DATE = datetime.fromisoformat( INITIAL_STATE_PARTITION_11_CURSOR.replace("Z", "") @@ -596,7 +599,7 @@ def _run_read( { "id": 300, "comment_id": 30, - "created_at": VOTE_300_CREATED_AT, + "created_at": VOTE_300_CREATED_AT_TIMESTAMP, } ] }, @@ -637,7 +640,7 @@ def _run_read( { "comment_id": 30, "comment_updated_at": COMMENT_30_UPDATED_AT, - "created_at": VOTE_300_CREATED_AT, + "created_at": str(VOTE_300_CREATED_AT_TIMESTAMP), "id": 300, }, ], @@ -662,7 +665,7 @@ def _run_read( "id": 10, "parent_slice": {"id": 1, "parent_slice": {}}, }, - "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP}, }, { "partition": { @@ -672,7 +675,7 @@ def _run_read( "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, }, ], - "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP}, "lookback_window": 86400, }, # Expected state @@ -981,7 +984,15 @@ def run_incremental_parent_state_test( # Fetch the first page of votes for comment 30 of post 3 ( f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}", - {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + { + "votes": [ + { + "id": 300, + "comment_id": 30, + "created_at": VOTE_300_CREATED_AT_TIMESTAMP, + } + ] + }, ), # Requests with intermediate states # Fetch votes for comment 10 of post 1 @@ -1018,7 +1029,15 @@ def run_incremental_parent_state_test( # Fetch votes for comment 30 of post 3 ( f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={VOTE_300_CREATED_AT}", - {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + { + "votes": [ + { + "id": 300, + "comment_id": 30, + "created_at": VOTE_300_CREATED_AT_TIMESTAMP, + } + ] + }, ), ], # Expected records @@ -1056,7 +1075,7 @@ def run_incremental_parent_state_test( { "comment_id": 30, "comment_updated_at": COMMENT_30_UPDATED_AT, - "created_at": VOTE_300_CREATED_AT, + "created_at": str(VOTE_300_CREATED_AT_TIMESTAMP), "id": 300, }, ], @@ -1344,7 +1363,15 @@ def test_incremental_parent_state( ( f"https://api.example.com/community/posts/3/comments/30/votes" f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}", - {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + { + "votes": [ + { + "id": 300, + "comment_id": 30, + "created_at": VOTE_300_CREATED_AT_TIMESTAMP, + } + ] + }, ), ], # Expected records @@ -1382,7 +1409,7 @@ def test_incremental_parent_state( { "comment_id": 30, "comment_updated_at": COMMENT_30_UPDATED_AT, - "created_at": VOTE_300_CREATED_AT, + "created_at": str(VOTE_300_CREATED_AT_TIMESTAMP), "id": 300, }, ], @@ -1896,7 +1923,15 @@ def test_incremental_parent_state_no_records( ( f"https://api.example.com/community/posts/3/comments/30/votes" f"?per_page=100&start_time={LOOKBACK_DATE}", - {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + { + "votes": [ + { + "id": 300, + "comment_id": 30, + "created_at": VOTE_300_CREATED_AT_TIMESTAMP, + } + ] + }, ), ], # Expected records @@ -1928,7 +1963,7 @@ def test_incremental_parent_state_no_records( { "comment_id": 30, "comment_updated_at": COMMENT_30_UPDATED_AT, - "created_at": VOTE_300_CREATED_AT, + "created_at": str(VOTE_300_CREATED_AT_TIMESTAMP), "id": 300, }, ],