Skip to content

Commit f974042

Browse files
authored
šŸ› Source HubSpot: fix infinite loop when iterating through search results (airbytehq#44899)
1 parent 7d0f590 commit f974042

File tree

6 files changed

+111
-13
lines changed

6 files changed

+111
-13
lines changed

ā€Ž.secrets

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
airbyte-integrations/connectors/source-hubspot/.secrets

ā€Žairbyte-integrations/connectors/source-hubspot/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
13-
dockerImageTag: 4.2.20
13+
dockerImageTag: 4.2.21
1414
dockerRepository: airbyte/source-hubspot
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
1616
erdUrl: https://dbdocs.io/airbyteio/source-hubspot?view=relationships

ā€Žairbyte-integrations/connectors/source-hubspot/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "4.2.20"
6+
version = "4.2.21"
77
name = "source-hubspot"
88
description = "Source implementation for HubSpot."
99
authors = [ "Airbyte <[email protected]>",]

ā€Žairbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,13 +1128,31 @@ def _process_search(
11281128
stream_slice: Mapping[str, Any] = None,
11291129
stream_state: Mapping[str, Any] = None,
11301130
next_page_token: Mapping[str, Any] = None,
1131+
last_id=None,
11311132
) -> Tuple[List, requests.Response]:
11321133
stream_records = {}
11331134
properties_list = list(self.properties.keys())
1135+
if last_id == None:
1136+
last_id = 0
1137+
# The search query below uses the following criteria:
1138+
# - Last modified >= timestemp of previous sync
1139+
# - Last modified <= timestamp of current sync to avoid open ended queries
1140+
# - Object primary key <= last_id with initial value 0, then max(last_id) returned from previous pagination loop
1141+
# - Sort results by primary key ASC
1142+
# Note: Although results return out of chronological order, sorting on primary key ensures retrieval of *all* records
1143+
# once the final pagination loop completes. This is preferable to sorting by a non-unique value, such as
1144+
# last modified date, which may result in an infinite loop in some edge cases.
1145+
key = self.primary_key
1146+
if key == "id":
1147+
key = "hs_object_id"
11341148
payload = (
11351149
{
1136-
"filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
1137-
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
1150+
"filters": [
1151+
{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"},
1152+
{"value": int(self._init_sync.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "LTE"},
1153+
{"value": last_id, "propertyName": key, "operator": "GTE"},
1154+
],
1155+
"sorts": [{"propertyName": key, "direction": "ASCENDING"}],
11381156
"properties": properties_list,
11391157
"limit": 100,
11401158
}
@@ -1168,6 +1186,16 @@ def _read_associations(self, records: Iterable) -> Iterable[Mapping[str, Any]]:
11681186
current_record[_slice] = associations_list
11691187
return records_by_pk.values()
11701188

1189+
def get_max(self, val1, val2):
1190+
try:
1191+
# Try to convert both values to integers
1192+
int_val1 = int(val1)
1193+
int_val2 = int(val2)
1194+
return max(int_val1, int_val2)
1195+
except ValueError:
1196+
# If conversion fails, fall back to string comparison
1197+
return max(str(val1), str(val2))
1198+
11711199
def read_records(
11721200
self,
11731201
sync_mode: SyncMode,
@@ -1178,14 +1206,13 @@ def read_records(
11781206
stream_state = stream_state or {}
11791207
pagination_complete = False
11801208
next_page_token = None
1209+
last_id = None
1210+
max_last_id = None
11811211

1182-
latest_cursor = None
11831212
while not pagination_complete:
11841213
if self.state:
11851214
records, raw_response = self._process_search(
1186-
next_page_token=next_page_token,
1187-
stream_state=stream_state,
1188-
stream_slice=stream_slice,
1215+
next_page_token=next_page_token, stream_state=stream_state, stream_slice=stream_slice, last_id=max_last_id
11891216
)
11901217
if self.associations:
11911218
records = self._read_associations(records)
@@ -1200,8 +1227,7 @@ def read_records(
12001227
records = self.record_unnester.unnest(records)
12011228

12021229
for record in records:
1203-
cursor = self._field_to_datetime(record[self.updated_at_field])
1204-
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
1230+
last_id = self.get_max(record[self.primary_key], last_id) if last_id else record[self.primary_key]
12051231
yield record
12061232

12071233
next_page_token = self.next_page_token(raw_response)
@@ -1211,13 +1237,13 @@ def read_records(
12111237
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
12121238
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
12131239
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
1214-
# start a new search query with the latest state that has been collected.
1215-
self._update_state(latest_cursor=latest_cursor)
1240+
# start a new search query with the latest id that has been collected.
1241+
max_last_id = self.get_max(max_last_id, last_id) if max_last_id else last_id
12161242
next_page_token = None
12171243

12181244
# Since Search stream does not have slices is safe to save the latest
12191245
# state as the initial sync date
1220-
self._update_state(latest_cursor=latest_cursor, is_last_record=True)
1246+
self._update_state(latest_cursor=self._init_sync, is_last_record=True)
12211247
# Always return an empty generator just in case no records were ever yielded
12221248
yield from []
12231249

ā€Žairbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55

66
import logging
7+
import random
78
from datetime import timedelta
89
from http import HTTPStatus
910
from unittest.mock import MagicMock
@@ -487,6 +488,75 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
487488
assert len(records) == 11000
488489
assert test_stream.state["updatedAt"] == test_stream._init_sync.to_iso8601_string()
489490

491+
def test_search_based_incremental_stream_should_sort_by_id(requests_mock, common_params, fake_properties_list):
492+
"""
493+
If there are more than 10,000 records that would be returned by the Hubspot search endpoint,
494+
the CRMSearchStream instance should stop at the 10Kth record
495+
"""
496+
# Create test_stream instance with some state
497+
test_stream = Companies(**common_params)
498+
test_stream._init_sync = pendulum.parse("2022-02-24T16:43:11Z")
499+
test_stream.state = {"updatedAt": "2022-01-24T16:43:11Z"}
500+
test_stream.associations = []
501+
502+
def random_date(start, end):
503+
return pendulum.from_timestamp(random.randint(start, end)/1000).to_iso8601_string()
504+
505+
after = 0
506+
507+
# Custom callback to mock search endpoint filter and sort behavior, returns 100 records per request.
508+
# See _process_search in stream.py for details on the structure of the filter amd sort parameters.
509+
# The generated records will have an id that is the sum of the current id and the current "after" value
510+
# and the updatedAt field will be a random date between min_time and max_time.
511+
# Store "after" value in the record to check if it resets after 10k records.
512+
def custom_callback(request, context):
513+
post_data = request.json() # Access JSON data from the request body
514+
after = int(post_data.get("after", 0))
515+
filters = post_data.get("filters", [])
516+
min_time = int(filters[0].get("value", 0))
517+
max_time = int(filters[1].get("value", 0))
518+
id = int(filters[2].get("value", 0))
519+
next = int(after) + 100
520+
results = [
521+
{
522+
"id": f"{y + id}",
523+
"updatedAt": random_date(min_time, max_time),
524+
"after": after
525+
} for y in range(int(after) + 1, next + 1)
526+
]
527+
context.status_code = 200
528+
if ((id + next) < 11000):
529+
return {"results": results, "paging": {"next": {"after": f"{next}"}}}
530+
else:
531+
return {"results": results, "paging": {}} # Last page
532+
533+
properties_response = [
534+
{
535+
"json": [],
536+
"status_code": 200,
537+
}
538+
]
539+
540+
# Mocking Request
541+
test_stream._sync_mode = SyncMode.incremental
542+
requests_mock.register_uri("POST", test_stream.url, json=custom_callback)
543+
# test_stream._sync_mode = None
544+
requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response)
545+
records, _ = read_incremental(test_stream, {})
546+
# The stream should not attempt to get more than 10K records.
547+
# Instead, it should use the new state to start a new search query.
548+
assert len(records) == 11000
549+
# Check that the records are sorted by id and that "after" resets after 10k records
550+
assert records[0]["id"] == "1"
551+
assert records[0]["after"] == 0
552+
assert records[10000 - 1]["id"] == "10000"
553+
assert records[10000 - 1]["after"] == 9900
554+
assert records[10000]["id"] == "10001"
555+
assert records[10000]["after"] == 0
556+
assert records[-1]["id"] == "11000"
557+
assert records[-1]["after"] == 900
558+
assert test_stream.state["updatedAt"] == test_stream._init_sync.to_iso8601_string()
559+
490560

491561
def test_engagements_stream_pagination_works(requests_mock, common_params):
492562
"""

ā€Ždocs/integrations/sources/hubspot.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ The connector is restricted by normal HubSpot [rate limitations](https://legacyd
336336

337337
| Version | Date | Pull Request | Subject |
338338
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
339+
| 4.2.21 | 2024-09-23 | [42688](https://github.com/airbytehq/airbyte/pull/44899) | Fix incremental search to use primary key as placeholder instead of lastModifiedDate |
339340
| 4.2.20 | 2024-09-21 | [45753](https://github.com/airbytehq/airbyte/pull/45753) | Update dependencies |
340341
| 4.2.19 | 2024-09-14 | [45018](https://github.com/airbytehq/airbyte/pull/45018) | Update dependencies |
341342
| 4.2.18 | 2024-08-24 | [43762](https://github.com/airbytehq/airbyte/pull/43762) | Update dependencies |

0 commit comments

Comments
Ā (0)