Skip to content

Commit cd74ed2

Browse files
committed
Fix cursor comparison error
1 parent 3af96dc commit cd74ed2

File tree

3 files changed

+73
-15
lines changed

3 files changed

+73
-15
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
)
2323
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField
2424
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
25+
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
26+
AbstractStreamStateConverter,
27+
)
2528
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState
2629

2730
logger = logging.getLogger("airbyte")
@@ -72,13 +75,15 @@ def __init__(
7275
stream_state: Any,
7376
message_repository: MessageRepository,
7477
connector_state_manager: ConnectorStateManager,
78+
connector_state_converter: AbstractStreamStateConverter,
7579
cursor_field: CursorField,
7680
) -> None:
7781
self._global_cursor: Optional[StreamState] = {}
7882
self._stream_name = stream_name
7983
self._stream_namespace = stream_namespace
8084
self._message_repository = message_repository
8185
self._connector_state_manager = connector_state_manager
86+
self._connector_state_converter = connector_state_converter
8287
self._cursor_field = cursor_field
8388

8489
self._cursor_factory = cursor_factory
@@ -144,11 +149,9 @@ def close_partition(self, partition: Partition) -> None:
144149
partition_key in self._finished_partitions
145150
and self._semaphore_per_partition[partition_key]._value == 0
146151
):
147-
if (
148-
self._new_global_cursor is None
149-
or self._new_global_cursor[self.cursor_field.cursor_field_key]
150-
< cursor.state[self.cursor_field.cursor_field_key]
151-
):
152+
if self._new_global_cursor is None or self._extract_cursor_value_from_state(
153+
self._new_global_cursor
154+
) < self._extract_cursor_value_from_state(cursor.state):
152155
self._new_global_cursor = copy.deepcopy(cursor.state)
153156
if not self._use_global_cursor:
154157
self._emit_state_message()
@@ -372,5 +375,10 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
372375
cursor = self._cursor_per_partition[partition_key]
373376
return cursor
374377

378+
def _extract_cursor_value_from_state(self, state: StreamState) -> Any:
379+
return self._connector_state_converter.parse_value(
380+
state[self.cursor_field.cursor_field_key]
381+
)
382+
375383
def limit_reached(self) -> bool:
376384
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,6 +1202,22 @@ def create_concurrent_cursor_from_perpartition_cursor(
12021202
)
12031203
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))
12041204

1205+
datetime_format = datetime_based_cursor_model.datetime_format
1206+
1207+
cursor_granularity = (
1208+
parse_duration(datetime_based_cursor_model.cursor_granularity)
1209+
if datetime_based_cursor_model.cursor_granularity
1210+
else None
1211+
)
1212+
1213+
connector_state_converter: DateTimeStreamStateConverter
1214+
connector_state_converter = CustomFormatConcurrentStreamStateConverter(
1215+
datetime_format=datetime_format,
1216+
input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
1217+
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
1218+
cursor_granularity=cursor_granularity,
1219+
)
1220+
12051221
# Create the cursor factory
12061222
cursor_factory = ConcurrentCursorFactory(
12071223
partial(
@@ -1225,6 +1241,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
12251241
stream_state=stream_state,
12261242
message_repository=self._message_repository, # type: ignore
12271243
connector_state_manager=state_manager,
1244+
connector_state_converter=connector_state_converter,
12281245
cursor_field=cursor_field,
12291246
)
12301247

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
},
6666
"cursor_incremental_sync": {
6767
"type": "DatetimeBasedCursor",
68-
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"],
68+
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z", "%ms"],
6969
"datetime_format": "%Y-%m-%dT%H:%M:%SZ",
7070
"cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}",
7171
"start_datetime": {"datetime": "{{ config.get('start_date')}}"},
@@ -399,6 +399,7 @@ def _run_read(
399399
VOTE_200_CREATED_AT = "2024-01-12T00:00:00Z" # Latest vote in partition 20
400400
VOTE_210_CREATED_AT = "2024-01-12T00:00:15Z" # Latest vote in partition 21
401401
VOTE_300_CREATED_AT = "2024-01-10T00:00:00Z" # Latest vote in partition 30
402+
VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # Latest vote in partition 30
402403

403404
# Initial State Constants
404405
PARENT_COMMENT_CURSOR_PARTITION_1 = "2023-01-04T00:00:00Z" # Parent comment cursor (partition)
@@ -596,7 +597,7 @@ def _run_read(
596597
{
597598
"id": 300,
598599
"comment_id": 30,
599-
"created_at": VOTE_300_CREATED_AT,
600+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
600601
}
601602
]
602603
},
@@ -637,7 +638,7 @@ def _run_read(
637638
{
638639
"comment_id": 30,
639640
"comment_updated_at": COMMENT_30_UPDATED_AT,
640-
"created_at": VOTE_300_CREATED_AT,
641+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
641642
"id": 300,
642643
},
643644
],
@@ -981,7 +982,15 @@ def run_incremental_parent_state_test(
981982
# Fetch the first page of votes for comment 30 of post 3
982983
(
983984
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}",
984-
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
985+
{
986+
"votes": [
987+
{
988+
"id": 300,
989+
"comment_id": 30,
990+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
991+
}
992+
]
993+
},
985994
),
986995
# Requests with intermediate states
987996
# Fetch votes for comment 10 of post 1
@@ -1018,7 +1027,15 @@ def run_incremental_parent_state_test(
10181027
# Fetch votes for comment 30 of post 3
10191028
(
10201029
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={VOTE_300_CREATED_AT}",
1021-
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
1030+
{
1031+
"votes": [
1032+
{
1033+
"id": 300,
1034+
"comment_id": 30,
1035+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
1036+
}
1037+
]
1038+
},
10221039
),
10231040
],
10241041
# Expected records
@@ -1056,7 +1073,7 @@ def run_incremental_parent_state_test(
10561073
{
10571074
"comment_id": 30,
10581075
"comment_updated_at": COMMENT_30_UPDATED_AT,
1059-
"created_at": VOTE_300_CREATED_AT,
1076+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
10601077
"id": 300,
10611078
},
10621079
],
@@ -1344,7 +1361,15 @@ def test_incremental_parent_state(
13441361
(
13451362
f"https://api.example.com/community/posts/3/comments/30/votes"
13461363
f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}",
1347-
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
1364+
{
1365+
"votes": [
1366+
{
1367+
"id": 300,
1368+
"comment_id": 30,
1369+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
1370+
}
1371+
]
1372+
},
13481373
),
13491374
],
13501375
# Expected records
@@ -1382,7 +1407,7 @@ def test_incremental_parent_state(
13821407
{
13831408
"comment_id": 30,
13841409
"comment_updated_at": COMMENT_30_UPDATED_AT,
1385-
"created_at": VOTE_300_CREATED_AT,
1410+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
13861411
"id": 300,
13871412
},
13881413
],
@@ -1896,7 +1921,15 @@ def test_incremental_parent_state_no_records(
18961921
(
18971922
f"https://api.example.com/community/posts/3/comments/30/votes"
18981923
f"?per_page=100&start_time={LOOKBACK_DATE}",
1899-
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
1924+
{
1925+
"votes": [
1926+
{
1927+
"id": 300,
1928+
"comment_id": 30,
1929+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
1930+
}
1931+
]
1932+
},
19001933
),
19011934
],
19021935
# Expected records
@@ -1928,7 +1961,7 @@ def test_incremental_parent_state_no_records(
19281961
{
19291962
"comment_id": 30,
19301963
"comment_updated_at": COMMENT_30_UPDATED_AT,
1931-
"created_at": VOTE_300_CREATED_AT,
1964+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
19321965
"id": 300,
19331966
},
19341967
],

0 commit comments

Comments
 (0)