Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Intercom: Update with latest CDK features, remove custom incremental sync components #53187

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ acceptance_tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/incremental_catalog.json"
future_state:
future_state_path: "integration_tests/abnormal_state.json"
bypass_reason: "Starting from CDK version 6+, the cursor value type has changed from integers to strings."
# future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
tests:
- config_path: "secrets/config.json"
Expand Down
226 changes: 2 additions & 224 deletions airbyte-integrations/connectors/source-intercom/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,242 +2,20 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from dataclasses import dataclass
from functools import wraps
from time import sleep
from typing import Any, Iterable, List, Mapping, Optional, Union
from typing import Mapping, Optional, Union

import requests

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.incremental import DeclarativeCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution


RequestInput = Union[str, Mapping[str, str]]


@dataclass
class IncrementalSingleSliceCursor(DeclarativeCursor):
cursor_field: Union[InterpolatedString, str]
config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]):
self._state = {}
self._cursor = None
self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)

def get_request_params(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# Current implementation does not provide any options to update request params.
# Returns empty dict
return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

def get_request_headers(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# Current implementation does not provide any options to update request headers.
# Returns empty dict
return self._get_request_option(RequestOptionType.header, stream_slice)

def get_request_body_data(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# Current implementation does not provide any options to update body data.
# Returns empty dict
return self._get_request_option(RequestOptionType.body_data, stream_slice)

def get_request_body_json(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
# Current implementation does not provide any options to update body json.
# Returns empty dict
return self._get_request_option(RequestOptionType.body_json, stream_slice)

def _get_request_option(self, option_type: RequestOptionType, stream_slice: StreamSlice):
return {}

def get_stream_state(self) -> StreamState:
return self._state.copy()

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
return self.get_stream_state()

def set_initial_state(self, stream_state: StreamState):
cursor_field = self.cursor_field.eval(self.config)
cursor_value = stream_state.get(cursor_field)
if cursor_value:
self._state[cursor_field] = cursor_value
self._state["prior_state"] = self._state.copy()
self._cursor = cursor_value

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

:param stream_slice: The current slice, which may or may not contain the most recently observed record
:param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
"""
record_cursor_value = record.get(self.cursor_field.eval(self.config))
if not record_cursor_value:
return

if self.is_greater_than_or_equal(record, self._state):
self._cursor = record_cursor_value

def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
cursor_field = self.cursor_field.eval(self.config)
self._state[cursor_field] = self._cursor

def stream_slices(self) -> Iterable[Mapping[str, Any]]:
yield StreamSlice(partition={}, cursor_slice={})

def should_be_synced(self, record: Record) -> bool:
"""
Evaluating if a record should be synced allows for filtering and stop condition on pagination
"""
record_cursor_value = record.get(self.cursor_field.eval(self.config))
return bool(record_cursor_value)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
"""
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
"""
cursor_field = self.cursor_field.eval(self.config)
first_cursor_value = first.get(cursor_field) if first else None
second_cursor_value = second.get(cursor_field) if second else None
if first_cursor_value and second_cursor_value:
return first_cursor_value > second_cursor_value
elif first_cursor_value:
return True
else:
return False


@dataclass
class IncrementalSubstreamSlicerCursor(IncrementalSingleSliceCursor):
parent_stream_configs: List[ParentStreamConfig]
parent_complete_fetch: bool = field(default=False)

def __post_init__(self, parameters: Mapping[str, Any]):
super().__post_init__(parameters)

if not self.parent_stream_configs:
raise ValueError("IncrementalSubstreamSlicer needs at least 1 parent stream")

# parent stream parts
self.parent_config: ParentStreamConfig = self.parent_stream_configs[0]
self.parent_stream: Stream = self.parent_config.stream
self.parent_stream_name: str = self.parent_stream.name
self.parent_cursor_field: str = self.parent_stream.cursor_field
self.parent_sync_mode: SyncMode = SyncMode.incremental if self.parent_stream.supports_incremental is True else SyncMode.full_refresh
self.substream_slice_field: str = self.parent_stream_configs[0].partition_field.eval(self.config)
self.parent_field: str = self.parent_stream_configs[0].parent_key.eval(self.config)
self._parent_cursor: Optional[str] = None

def set_initial_state(self, stream_state: StreamState):
super().set_initial_state(stream_state=stream_state)
if self.parent_stream_name in stream_state and stream_state.get(self.parent_stream_name, {}).get(self.parent_cursor_field):
parent_stream_state = {
self.parent_cursor_field: stream_state[self.parent_stream_name][self.parent_cursor_field],
}
self._state[self.parent_stream_name] = parent_stream_state
if "prior_state" in self._state:
self._state["prior_state"][self.parent_stream_name] = parent_stream_state

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Extended the default method to be able to track the parent STATE.
"""

# save parent cursor value (STATE) from slice
parent_cursor = stream_slice.get(self.parent_stream_name)
if parent_cursor:
self._parent_cursor = parent_cursor.get(self.parent_cursor_field)

# observe the substream
super().observe(stream_slice, record)

def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
super().close_slice(stream_slice, *args)

def stream_slices(self) -> Iterable[Mapping[str, Any]]:
parent_state = (self._state or {}).get(self.parent_stream_name, {})
slices_generator: Iterable[StreamSlice] = self.read_parent_stream(self.parent_sync_mode, self.parent_cursor_field, parent_state)
yield from [slice for slice in slices_generator] if self.parent_complete_fetch else slices_generator

def track_parent_cursor(self, parent_record: dict) -> None:
"""
Tracks the Parent Stream Cursor, using `parent_cursor_field`.
"""
self._parent_cursor = parent_record.get(self.parent_cursor_field)
if self._parent_cursor:
self._state[self.parent_stream_name] = {self.parent_cursor_field: self._parent_cursor}

def read_parent_stream(
self,
sync_mode: SyncMode,
cursor_field: Optional[str],
stream_state: Mapping[str, Any],
) -> Iterable[Mapping[str, Any]]:
self.parent_stream.state = stream_state

parent_stream_slices_gen = self.parent_stream.stream_slices(
sync_mode=sync_mode,
cursor_field=cursor_field,
stream_state=stream_state,
)

for parent_slice in parent_stream_slices_gen:
parent_records_gen = self.parent_stream.read_records(
sync_mode=sync_mode,
cursor_field=cursor_field,
stream_slice=parent_slice,
stream_state=stream_state,
)

for parent_record in parent_records_gen:
# update parent cursor
self.track_parent_cursor(parent_record)
substream_slice_value = parent_record.get(self.parent_field)
if substream_slice_value:
cursor_field = self.cursor_field.eval(self.config)
substream_cursor_value = self._state.get(cursor_field)
parent_cursor_value = self._state.get(self.parent_stream_name, {}).get(self.parent_cursor_field)
yield StreamSlice(
partition={
self.substream_slice_field: substream_slice_value,
},
cursor_slice={
cursor_field: substream_cursor_value,
self.parent_stream_name: {
self.parent_cursor_field: parent_cursor_value,
},
},
)


@dataclass
class IntercomRateLimiter:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"name": "segments"
},
"stream_state": {
"updated_at": 7626086649
"updated_at": "7626086649"
}
}
},
Expand All @@ -17,7 +17,7 @@
"name": "companies"
},
"stream_state": {
"updated_at": 7626086649
"updated_at": "7626086649"
}
}
},
Expand All @@ -28,9 +28,9 @@
"name": "company_segments"
},
"stream_state": {
"updated_at": 7626086649,
"updated_at": "7626086649",
"companies": {
"updated_at": 1676517777
"updated_at": "1676517777"
}
}
}
Expand All @@ -42,7 +42,7 @@
"name": "conversations"
},
"stream_state": {
"updated_at": 7626086649
"updated_at": "7626086649"
}
}
},
Expand All @@ -53,9 +53,9 @@
"name": "conversation_parts"
},
"stream_state": {
"updated_at": 7626086649,
"updated_at": "7626086649",
"conversations": {
"updated_at": 1676462031
"updated_at": "1676462031"
}
}
}
Expand All @@ -67,7 +67,7 @@
"name": "contacts"
},
"stream_state": {
"updated_at": 7626086649
"updated_at": "7626086649"
}
}
}
Expand Down
Loading
Loading