@@ -149,9 +149,11 @@ def close_partition(self, partition: Partition) -> None:
149
149
partition_key in self ._finished_partitions
150
150
and self ._semaphore_per_partition [partition_key ]._value == 0
151
151
):
152
- if self ._new_global_cursor is None or self ._extract_cursor_value_from_state (
153
- self ._new_global_cursor
154
- ) < self ._extract_cursor_value_from_state (cursor .state ):
152
+ if (
153
+ self ._new_global_cursor is None
154
+ or self ._new_global_cursor [self .cursor_field .cursor_field_key ]
155
+ < cursor .state [self .cursor_field .cursor_field_key ]
156
+ ):
155
157
self ._new_global_cursor = copy .deepcopy (cursor .state )
156
158
if not self ._use_global_cursor :
157
159
self ._emit_state_message ()
@@ -304,8 +306,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
304
306
):
305
307
# We assume that `stream_state` is in a global format that can be applied to all partitions.
306
308
# Example: {"global_state_format_key": "global_state_format_value"}
307
- self ._global_cursor = deepcopy (stream_state )
308
- self ._new_global_cursor = deepcopy (stream_state )
309
+ self ._set_global_state (stream_state )
309
310
310
311
else :
311
312
self ._use_global_cursor = stream_state .get ("use_global_cursor" , False )
@@ -322,8 +323,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
322
323
323
324
# set default state for missing partitions if it is per partition with fallback to global
324
325
if self ._GLOBAL_STATE_KEY in stream_state :
325
- self ._global_cursor = deepcopy (stream_state [self ._GLOBAL_STATE_KEY ])
326
- self ._new_global_cursor = deepcopy (stream_state [self ._GLOBAL_STATE_KEY ])
326
+ self ._set_global_state (stream_state [self ._GLOBAL_STATE_KEY ])
327
327
328
328
# Set initial parent state
329
329
if stream_state .get ("parent_state" ):
@@ -332,6 +332,27 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
332
332
# Set parent state for partition routers based on parent streams
333
333
self ._partition_router .set_initial_state (stream_state )
334
334
335
+ def _set_global_state (self , stream_state : Mapping [str , Any ]) -> None :
336
+ """
337
+ Initializes the global cursor state from the provided stream state.
338
+
339
+ If the cursor field key is present in the stream state, its value is parsed,
340
+ formatted, and stored as the global cursor. This ensures consistency in state
341
+ representation across partitions.
342
+ """
343
+ if self .cursor_field .cursor_field_key in stream_state :
344
+ global_state_value = stream_state [self .cursor_field .cursor_field_key ]
345
+ final_format_global_state_value = self ._connector_state_converter .output_format (
346
+ self ._connector_state_converter .parse_value (global_state_value )
347
+ )
348
+
349
+ fixed_global_state = {
350
+ self .cursor_field .cursor_field_key : final_format_global_state_value
351
+ }
352
+
353
+ self ._global_cursor = deepcopy (fixed_global_state )
354
+ self ._new_global_cursor = deepcopy (fixed_global_state )
355
+
335
356
def observe (self , record : Record ) -> None :
336
357
if not self ._use_global_cursor and self .limit_reached ():
337
358
self ._use_global_cursor = True
0 commit comments