-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[source-tiktok-marketing] - Removes stream_state
interpolation, custom cursor
#53645
[source-tiktok-marketing] - Removes stream_state
interpolation, custom cursor
#53645
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
stream_state
interpolation, custom cursor
end_datetime: | ||
type: MinMaxDatetime | ||
datetime: "{{ config.get('end_date', today_utc()) }}" | ||
datetime_format: "%Y-%m-%d" | ||
datetime: "{{ format_datetime((str_to_datetime(config.get('end_date')) if config.get('end_date') else now_utc()) + duration('PT23H'), '%Y-%m-%d %H:%M:%S') }}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 @brianjlai This duration (PT23H) added differs from the GH issue (P1D) but I think this solved the issue we ran into with mock server tests.
I noticed the partitions that were generated looked like this, assuming start date of 2024-01-01 and end date of 2024-01-02:
{'advertiser_id': '872746382648', 'parent_slice': {}, 'start_time': '2024-01-01 00:00:00', 'end_time': '2024-01-01 23:00:00'}
{'advertiser_id': '872746382648', 'parent_slice': {}, 'start_time': '2024-01-02 00:00:00', 'end_time': '2024-01-03 00:00:00'}
When changing it to "PT23H", the partitions changed to the following, and the mock server tests' second request was mocked and matched correctly:
{'advertiser_id': '872746382648', 'parent_slice': {}, 'start_time': '2024-01-01 00:00:00', 'end_time': '2024-01-01 23:00:00'}
{'advertiser_id': '872746382648', 'parent_slice': {}, 'start_time': '2024-01-02 00:00:00', 'end_time': '2024-01-02 23:00:00'}
I'll need to dive a bit deeper into the concurrent code but I think it's because the end date of the last partition was out of range relative to the actual end date and therefore the last request was not made. That's why the mock server test was previously trying to match the same request to two separate mocked requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cursor granularity is not applied on the last slice. I'm not sure why exactly. I would have assumed that it was to keep the same behavior as the datetime based cursor but this is not what I see here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This is outdated and irrelevant. Turns out I was experiencing this issue only in mock server tests due to failing to pass state when initializing the source, which is the only way to generate slices w/ a concurrent streams for a concurrent source. Once I correctly passed state, the stream slices were created as expected w/ "P1D" duration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall direction looks good but I would like to confirm the change on the tests, see the CATs pass and have regression test results as well
@@ -4314,7 +4319,7 @@ spec: | |||
# If time allows it, we can definitely try to scope this number a bit better empirically. | |||
concurrency_level: | |||
type: ConcurrencyLevel | |||
default_concurrency: 3 | |||
default_concurrency: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default_concurrency == 1
is dangerous because threads are shared between partition generator and partition reader with a limit in the number of partition generator so that we can read those partitions and they don't just grow over time. With only one thread, there would be no partition reader and this might lead to deadlock where we can't enqueue partitions anymore but nobody reads to clean the queue. Can we put 2
there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the catch -- I set this just for testing purposes (when I hypothesizes that concurrency was causing rate limit issues). Will revert!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 I've updated this to be
default_concurrency: "{{ config.get('concurrency_level', 3) }}"
This is because the mocks server tests are failing only when concurrency level > 1. I've set the mock server config to concurrency_level = 1.
@@ -1,6 +1,8 @@ | |||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | |||
|
|||
import asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what are those imports? They don't seem used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah some failed experimentation on my part. Removed.
@@ -167,7 +169,7 @@ def test_read_with_state(self, http_mocker: HttpMocker): | |||
state=self.state(), | |||
) | |||
|
|||
assert len(output.records) == 1 | |||
assert len(output.records) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have more records being returned? It would seem to indicate that the filtering is not working fine or that the setup/given part of this test is not valid anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch. I didn't initialize the source w/ state in the mock server tests.
end_datetime: | ||
type: MinMaxDatetime | ||
datetime: "{{ config.get('end_date', today_utc()) }}" | ||
datetime_format: "%Y-%m-%d" | ||
datetime: "{{ format_datetime((str_to_datetime(config.get('end_date')) if config.get('end_date') else now_utc()) + duration('PT23H'), '%Y-%m-%d %H:%M:%S') }}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cursor granularity is not applied on the last slice. I'm not sure why exactly. I would have assumed that it was to keep the same behavior as the datetime based cursor but this is not what I see here
...ntegrations/connectors/source-tiktok-marketing/unit_tests/integration/test_reports_hourly.py
Outdated
Show resolved
Hide resolved
condition: "{{ record['dimensions']['stat_time_hour'] >= stream_state.get('stat_time_hour', config.get('start_date', '')) }}" | ||
$parameters: | ||
partition_field: advertiser_id | ||
type: RecordFilter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use is_client_side_incremental
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This filter actually only applies to the very "first" slice. We request a day at a time and 24 reports are returned. (one for each hour). If our state value was 01-01-2025 @ 10AM then we would request 01-01-2025, but only return the reports from 10AM onward. For every requested day, we would return all 24 reports or as many reports as there have been hours in the day. Therefore I don't actually think client side incremental is necessary.
Hey @pnilan , there issues are referenced here which aren't accessible We've been encountering problems with this connector where daily report and daily country report are constantly missing data, so we resorted to using hourly report which seemed to be reliable instead. I am curious whether this PR resolves issue with daily reports missing data, thanks! |
@pnilan To whom it may concerns. I've created a variation of this connector that solves the daily report issues 100%, unaware you were already working on fixing it. Should this not work, I'll gladly contribute my fix |
Hey @sheinbergon , it looks like your changes are for apple search source, not Tiktok master...sheinbergon:airbyte:sheinbergon/apple-search-ads-add-missing-paginators Or am I misunderstanding something here? |
This was 1 of 3 fixes meant to make the Apple Search Ads connector function as expected (and it now does, also being used in production). I'm talking about a fix I applied locally to the TikTok Marketing connector in order to get valid daily sync results. I also came across issues with the current version(s) of the connector, and was able to solve them by tampering with the incremental stream / end date settings, up to to the point my results are now on par with what the platform reports. I'm using a locally built version of the connector in a local K8s Airbyte deployment |
Understood, could you share those local changes by any chance or submit a PR to this repo? 👀 |
@jaskaAd Of course. Are you an Airbyte maintainer, BTW? |
Nope, just a mere spectator who hit into the issue of missing data from daily tiktok reports. |
Here you go (changes in the You'll need to:
|
What
stream_state
as an interpolation variable. Now referencesstream_interval
How
Review guide
manifest.yaml
unit_tests/integration/test_reports_hourly.py
User Impact
Can this PR be safely reverted and rolled back?