Skip to content

refactor common sources to work with dlt+ #624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
418 changes: 200 additions & 218 deletions sources/asana_dlt/__init__.py

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions sources/bing_webmaster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,31 @@
"""

import time
from typing import Iterable, Iterator, List, Sequence
from typing import Dict, Iterator, List, Sequence

import dlt
from dlt.common import logger
from dlt.common.typing import DictStrAny, DictStrStr
from dlt.common.typing import DictStrAny
from dlt.sources import DltResource

from .helpers import get_stats_with_retry, parse_response


@dlt.source(name="bing_webmaster")
def source(
site_urls: List[str] = None, site_url_pages: Iterable[DictStrStr] = None
site_urls: List[str] = dlt.config.value,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be "optional", no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. If they were not present, what page would you want to get the traffic statistics for then?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. But we have to update the docstring of this source then, it makes it seem as though they are optional.

site_url_pages: List[Dict[str, str]] = dlt.config.value,
) -> Sequence[DltResource]:
"""
A dlt source for the Bing Webmaster api.
It groups resources for the APIs which return organic search traffic statistics
Args:
site_urls: List[str]: A list of site_urls, e.g, ["dlthub.com", "dlthub.de"]. Use this if you need the weekly traffic per site_url and page
site_url_pages: Iterable[DictStrStr]: A list of pairs of site_url and page. Use this if you need the weekly traffic per site_url, page, and query
site_urls: List[str]: A list of site_urls to get weekly traffic statistics for
site_url_pages: Iterable[Dict[str, str]]: A list of pairs of site_url and page for which to return weekly traffic per site_url, page, and query
Returns:
Sequence[DltResource]: A sequence of resources that can be selected from including page_stats and page_query_stats.
Examples:
>>> source(site_urls=["dlthub.de", "dlthub.com"], site_url_pages=[{"site_url": "dlthub.com", "page": "https://www.dlthub.com/docs"}])
"""
return (
page_stats(site_urls),
Expand Down Expand Up @@ -70,7 +73,7 @@ def page_stats(
table_name="bing_page_query_stats",
)
def page_query_stats(
site_url_pages: Iterable[DictStrStr],
site_url_pages: List[Dict[str, str]],
api_key: str = dlt.secrets.value,
) -> Iterator[Iterator[DictStrAny]]:
"""
Expand All @@ -80,7 +83,7 @@ def page_query_stats(
https://learn.microsoft.com/en-us/dotnet/api/microsoft.bing.webmaster.api.interfaces.iwebmasterapi.getpagequerystats

Args:
site_url_page (Iterable[DictStrStr]): Iterable of site_url and pages to retrieve statistics for. Can be result of a SQL query, a parsed sitemap, etc.
site_url_page (List[Dict[str,str]]): Iterable of site_url and pages to retrieve statistics for. Can be result of a SQL query, a parsed sitemap, etc.
Yields:
Iterator[Dict[str, Any]]: An iterator over list of organic traffic statistics.
"""
Expand Down
4 changes: 3 additions & 1 deletion sources/chess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

@dlt.source(name="chess")
def source(
players: List[str], start_month: str = None, end_month: str = None
players: List[str] = dlt.config.value,
start_month: str = None,
end_month: str = None,
) -> Sequence[DltResource]:
"""
A dlt source for the chess.com api. It groups several resources (in this case chess.com API endpoints) containing
Expand Down
12 changes: 7 additions & 5 deletions sources/github/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

@dlt.source
def github_reactions(
owner: str,
name: str,
owner: str = dlt.config.value,
name: str = dlt.config.value,
access_token: str = dlt.secrets.value,
items_per_page: int = 100,
max_items: Optional[int] = None,
Expand Down Expand Up @@ -67,7 +67,9 @@ def github_reactions(

@dlt.source(max_table_nesting=2)
def github_repo_events(
owner: str, name: str, access_token: Optional[str] = None
owner: str = dlt.config.value,
name: str = dlt.config.value,
access_token: Optional[str] = None,
) -> DltResource:
"""Gets events for repository `name` with owner `owner` incrementally.

Expand Down Expand Up @@ -112,8 +114,8 @@ def repo_events(

@dlt.source
def github_stargazers(
owner: str,
name: str,
owner: str = dlt.config.value,
name: str = dlt.config.value,
access_token: str = dlt.secrets.value,
items_per_page: int = 100,
max_items: Optional[int] = None,
Expand Down
4 changes: 2 additions & 2 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ def fetch_props(

@dlt.resource
def hubspot_events_for_objects(
object_type: THubspotObjectType,
object_ids: List[str],
object_type: THubspotObjectType = dlt.config.value,
object_ids: List[str] = dlt.config.value,
api_key: str = dlt.secrets.value,
start_date: pendulum.DateTime = STARTDATE,
) -> DltResource:
Expand Down
5 changes: 1 addition & 4 deletions sources/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
standalone=True,
)
def kafka_consumer(
topics: Union[str, List[str]],
topics: List[str] = dlt.config.value,
credentials: Union[KafkaCredentials, Consumer] = dlt.secrets.value,
msg_processor: Optional[
Callable[[Message], Dict[str, Any]]
Expand Down Expand Up @@ -60,9 +60,6 @@ def kafka_consumer(
Yields:
Iterable[TDataItem]: Kafka messages.
"""
if not isinstance(topics, list):
topics = [topics]

if isinstance(credentials, Consumer):
consumer = credentials
elif isinstance(credentials, KafkaCredentials):
Expand Down
4 changes: 2 additions & 2 deletions sources/kafka_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def custom_msg_processor(msg: confluent_kafka.Message) -> Dict[str, Any]:
"data": msg.value().decode("utf-8"),
}

data = kafka_consumer("books", msg_processor=custom_msg_processor)
data = kafka_consumer(["books"], msg_processor=custom_msg_processor)

info = pipeline.run(data)
print(info)
Expand All @@ -63,7 +63,7 @@ def load_starting_from_date() -> None:
)

from_date = datetime(2023, 12, 15)
data = kafka_consumer("books", start_from=from_date)
data = kafka_consumer(["books"], start_from=from_date)

info = pipeline.run(data)
print(info)
Expand Down
7 changes: 4 additions & 3 deletions tests/bing_webmaster/test_bing_webmaster_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def _make_pipeline(destination_name: str) -> dlt.Pipeline:
@pytest.mark.parametrize("destination_name", ["duckdb"])
def test_load_unauthorized_domain(destination_name: str) -> None:
pipeline = _make_pipeline(destination_name)
data = source(site_urls=["wikipedia.org"])
data = source(site_urls=["wikipedia.org"], site_url_pages=[])
with pytest.raises(Exception):
pipeline.run(data.with_resources("page_stats"))

Expand All @@ -28,7 +28,7 @@ def test_load_page_stats(destination_name: str) -> None:
table_name = "bing_page_stats"

# Note: If this test fails: replace this with a site_url you can access
data = source(site_urls=["dlthub.com"])
data = source(site_urls=["dlthub.com"], site_url_pages=[])

info = pipeline.run(data.with_resources("page_stats"))
assert_load_info(info)
Expand Down Expand Up @@ -63,11 +63,12 @@ def test_load_page_query_stats(destination_name: str) -> None:
table_name = "bing_page_query_stats"

data = source(
site_urls=["dlthub.com"],
# Note: If this test fails: replace this with site_url and pages you can access
site_url_pages=[
{"site_url": "dlthub.com", "page": "https://dlthub.com/docs/intro"},
{"site_url": "dlthub.com", "page": "https://dlthub.com/why/"},
]
],
)
info = pipeline.run(data.with_resources("page_query_stats"))
assert_load_info(info)
Expand Down
6 changes: 3 additions & 3 deletions tests/github/test_github_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ def test_github_events(destination_name: str) -> None:
pipeline = dlt.pipeline(
"github_events",
destination=destination_name,
dataset_name="airflow_events",
dataset_name="duckdb_events",
dev_mode=True,
)
data = github_repo_events("apache", "airflow")
data = github_repo_events("duckdb", "duckdb")
load_info = pipeline.run(data)
assert_load_info(load_info)
# all tables must end with event (auto created from event types) or contain "_event__" (child tables)
Expand All @@ -67,7 +67,7 @@ def test_github_events(destination_name: str) -> None:
)
load_table_counts(pipeline, *table_names)
# load again. it could happen that a single event got loaded but surely numbers should not double and events must be unique
data = github_repo_events("apache", "airflow")
data = github_repo_events("duckdb", "duckdb")
pipeline.run(data)
table_names = [
t["name"]
Expand Down
Loading