diff --git a/sources/asana_dlt/__init__.py b/sources/asana_dlt/__init__.py index ebf9d10a6..fcb9acdb1 100644 --- a/sources/asana_dlt/__init__.py +++ b/sources/asana_dlt/__init__.py @@ -7,10 +7,11 @@ """ import typing as t -from typing import Any, Iterable +from typing import Any, Iterable, Sequence import dlt from dlt.common.typing import TDataItem +from dlt.extract.resource import DltResource from .helpers import get_client from .settings import ( @@ -28,237 +29,218 @@ @dlt.source -def asana_source() -> Any: # should be Sequence[DltResource]: +def asana_source(access_token: str = dlt.secrets.value) -> Sequence[DltResource]: """ The main function that runs all the other functions to fetch data from Asana. - Returns: - Sequence[DltResource]: A sequence of DltResource objects containing the fetched data. - """ - return [ - workspaces, - projects, - sections, - tags, - tasks, - stories, - teams, - users, - ] - - -@dlt.resource(write_disposition="replace") -def workspaces( - access_token: str = dlt.secrets.value, fields: Iterable[str] = WORKSPACE_FIELDS -) -> Iterable[TDataItem]: - """ - Fetches and returns a list of workspaces from Asana. - Args: - access_token (str): The access token to authenticate the Asana API client, provided in the secrets file - fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. - Yields: - dict: The workspace data. - """ - yield from get_client(access_token).workspaces.find_all(opt_fields=",".join(fields)) - - -@dlt.transformer( - data_from=workspaces, - write_disposition="replace", -) -@dlt.defer -def projects( - workspace: TDataItem, - access_token: str = dlt.secrets.value, - fields: Iterable[str] = PROJECT_FIELDS, -) -> Iterable[TDataItem]: - """ - Fetches and returns a list of projects for a given workspace from Asana. Args: - workspace (dict): The workspace data. access_token (str): The access token to authenticate the Asana API client, provided in the secrets file - fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. Returns: - list[dict]: The project data for the given workspace. - """ - return list( - get_client(access_token).projects.find_all( - workspace=workspace["gid"], - timeout=REQUEST_TIMEOUT, - opt_fields=",".join(fields), - ) - ) - - -@dlt.transformer( - data_from=projects, - write_disposition="replace", -) -@dlt.defer -def sections( - project_array: t.List[TDataItem], - access_token: str = dlt.secrets.value, - fields: Iterable[str] = SECTION_FIELDS, -) -> Iterable[TDataItem]: - """ - Fetches all sections for a given project from Asana. - Args: - project_array (list): The project data. - access_token (str): The access token to authenticate the Asana API client, provided in the secrets file - fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. - Returns: - list[dict]: The sections data for the given project. + Sequence[DltResource]: A sequence of DltResource objects containing the fetched data. """ - return [ - section - for project in project_array - for section in get_client(access_token).sections.get_sections_for_project( - project_gid=project["gid"], - timeout=REQUEST_TIMEOUT, - opt_fields=",".join(fields), - ) - ] - -@dlt.transformer(data_from=workspaces, write_disposition="replace") -@dlt.defer -def tags( - workspace: TDataItem, - access_token: str = dlt.secrets.value, - fields: Iterable[str] = TAG_FIELDS, -) -> Iterable[TDataItem]: - """ - Fetches all tags for a given workspace from Asana. - Args: - workspace (dict): The workspace data. - access_token (str): The access token to authenticate the Asana API client, provided in the secrets file - fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. - Returns: - list[dict]: The tags data for the given workspace. - """ - return [ - tag - for tag in get_client(access_token).tags.find_all( - workspace=workspace["gid"], - timeout=REQUEST_TIMEOUT, - opt_fields=",".join(fields), + @dlt.resource(write_disposition="replace") + def workspaces(fields: Iterable[str] = WORKSPACE_FIELDS) -> Iterable[TDataItem]: + """ + Fetches and returns a list of workspaces from Asana. + Args: + fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. + Yields: + dict: The workspace data. + """ + yield from get_client(access_token).workspaces.find_all( + opt_fields=",".join(fields) ) - ] - -@dlt.transformer(data_from=projects, write_disposition="merge", primary_key="gid") -def tasks( - project_array: t.List[TDataItem], - access_token: str = dlt.secrets.value, - modified_at: dlt.sources.incremental[str] = dlt.sources.incremental( - "modified_at", initial_value=DEFAULT_START_DATE - ), - fields: Iterable[str] = TASK_FIELDS, -) -> Iterable[TDataItem]: - """ - Fetches all tasks for a given project from Asana. - Args: - project_array (list): The project data. - access_token (str): The access token to authenticate the Asana API client, provided in the secrets file - - modified_at (str): The date from which to fetch modified tasks. - fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. - Yields: - dict: The task data for the given project. - """ - yield from ( - task - for project in project_array - for task in get_client(access_token).tasks.find_all( - project=project["gid"], - timeout=REQUEST_TIMEOUT, - modified_since=modified_at.start_value, - opt_fields=",".join(fields), - ) + @dlt.transformer( + data_from=workspaces, + write_disposition="replace", ) - - -@dlt.transformer( - data_from=tasks, - write_disposition="append", -) -@dlt.defer -def stories( - task: TDataItem, - access_token: str = dlt.secrets.value, - fields: Iterable[str] = STORY_FIELDS, -) -> Iterable[TDataItem]: - """ - Fetches stories for a task from Asana. - Args: - task (dict): The task data. - access_token (str): The access token to authenticate the Asana API client, provided in the secrets file - fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. - Returns: - list[dict]: The stories data for the given task. - """ - return [ - story - for story in get_client(access_token).stories.get_stories_for_task( - task_gid=task["gid"], - timeout=REQUEST_TIMEOUT, - opt_fields=",".join(fields), + @dlt.defer + def projects( + workspace: TDataItem, + fields: Iterable[str] = PROJECT_FIELDS, + ) -> Iterable[TDataItem]: + """ + Fetches and returns a list of projects for a given workspace from Asana. + Args: + workspace (dict): The workspace data. + fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. + Returns: + list[dict]: The project data for the given workspace. + """ + return list( + get_client(access_token).projects.find_all( + workspace=workspace["gid"], + timeout=REQUEST_TIMEOUT, + opt_fields=",".join(fields), + ) ) - ] - -@dlt.transformer( - data_from=workspaces, - write_disposition="replace", -) -@dlt.defer -def teams( - workspace: TDataItem, - access_token: str = dlt.secrets.value, - fields: Iterable[str] = TEAMS_FIELD, -) -> Iterable[TDataItem]: - """ - Fetches all teams for a given workspace from Asana. - Args: - workspace (dict): The workspace data. - access_token (str): The access token to authenticate the Asana API client, provided in the secrets file - fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. - Returns: - list[dict]: The teams data for the given workspace. - """ - return [ - team - for team in get_client(access_token).teams.find_by_organization( - organization=workspace["gid"], - timeout=REQUEST_TIMEOUT, - opt_fields=",".join(fields), + @dlt.transformer( + data_from=projects, + write_disposition="replace", + ) + @dlt.defer + def sections( + project_array: t.List[TDataItem], + fields: Iterable[str] = SECTION_FIELDS, + ) -> Iterable[TDataItem]: + """ + Fetches all sections for a given project from Asana. + Args: + project_array (list): The project data. + fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. + Returns: + list[dict]: The sections data for the given project. + """ + return [ + section + for project in project_array + for section in get_client(access_token).sections.get_sections_for_project( + project_gid=project["gid"], + timeout=REQUEST_TIMEOUT, + opt_fields=",".join(fields), + ) + ] + + @dlt.transformer(data_from=workspaces, write_disposition="replace") + @dlt.defer + def tags( + workspace: TDataItem, + fields: Iterable[str] = TAG_FIELDS, + ) -> Iterable[TDataItem]: + """ + Fetches all tags for a given workspace from Asana. + Args: + workspace (dict): The workspace data. + fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. + Returns: + list[dict]: The tags data for the given workspace. + """ + return [ + tag + for tag in get_client(access_token).tags.find_all( + workspace=workspace["gid"], + timeout=REQUEST_TIMEOUT, + opt_fields=",".join(fields), + ) + ] + + @dlt.transformer(data_from=projects, write_disposition="merge", primary_key="gid") + def tasks( + project_array: t.List[TDataItem], + modified_at: dlt.sources.incremental[str] = dlt.sources.incremental( + "modified_at", initial_value=DEFAULT_START_DATE + ), + fields: Iterable[str] = TASK_FIELDS, + ) -> Iterable[TDataItem]: + """ + Fetches all tasks for a given project from Asana. + Args: + project_array (list): The project data. + + modified_at (str): The date from which to fetch modified tasks. + fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. + Yields: + dict: The task data for the given project. + """ + yield from ( + task + for project in project_array + for task in get_client(access_token).tasks.find_all( + project=project["gid"], + timeout=REQUEST_TIMEOUT, + modified_since=modified_at.start_value, + opt_fields=",".join(fields), + ) ) - ] + @dlt.transformer( + data_from=tasks, + write_disposition="append", + ) + @dlt.defer + def stories( + task: TDataItem, + fields: Iterable[str] = STORY_FIELDS, + ) -> Iterable[TDataItem]: + """ + Fetches stories for a task from Asana. + Args: + task (dict): The task data. + fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. + Returns: + list[dict]: The stories data for the given task. + """ + return [ + story + for story in get_client(access_token).stories.get_stories_for_task( + task_gid=task["gid"], + timeout=REQUEST_TIMEOUT, + opt_fields=",".join(fields), + ) + ] + + @dlt.transformer( + data_from=workspaces, + write_disposition="replace", + ) + @dlt.defer + def teams( + workspace: TDataItem, + fields: Iterable[str] = TEAMS_FIELD, + ) -> Iterable[TDataItem]: + """ + Fetches all teams for a given workspace from Asana. + Args: + workspace (dict): The workspace data. + fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. + Returns: + list[dict]: The teams data for the given workspace. + """ + return [ + team + for team in get_client(access_token).teams.find_by_organization( + organization=workspace["gid"], + timeout=REQUEST_TIMEOUT, + opt_fields=",".join(fields), + ) + ] + + @dlt.transformer( + data_from=workspaces, + write_disposition="replace", + ) + @dlt.defer + def users( + workspace: TDataItem, + fields: Iterable[str] = USER_FIELDS, + ) -> Iterable[TDataItem]: + """ + Fetches all users for a given workspace from Asana. + Args: + workspace (dict): The workspace data. + access_token (str): The access token to authenticate the Asana API client, provided in the secrets file + fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. + Returns: + list[dict]: The user data for the given workspace. + """ + return [ + user + for user in get_client(access_token).users.find_all( + workspace=workspace["gid"], + timeout=REQUEST_TIMEOUT, + opt_fields=",".join(fields), + ) + ] -@dlt.transformer( - data_from=workspaces, - write_disposition="replace", -) -@dlt.defer -def users( - workspace: TDataItem, - access_token: str = dlt.secrets.value, - fields: Iterable[str] = USER_FIELDS, -) -> Iterable[TDataItem]: - """ - Fetches all users for a given workspace from Asana. - Args: - workspace (dict): The workspace data. - access_token (str): The access token to authenticate the Asana API client, provided in the secrets file - fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API. - Returns: - list[dict]: The user data for the given workspace. - """ return [ - user - for user in get_client(access_token).users.find_all( - workspace=workspace["gid"], - timeout=REQUEST_TIMEOUT, - opt_fields=",".join(fields), - ) + workspaces, + projects, + sections, + tags, + tasks, + stories, + teams, + users, ] diff --git a/sources/bing_webmaster/__init__.py b/sources/bing_webmaster/__init__.py index 8025e6fba..0946b79e8 100644 --- a/sources/bing_webmaster/__init__.py +++ b/sources/bing_webmaster/__init__.py @@ -6,11 +6,11 @@ """ import time -from typing import Iterable, Iterator, List, Sequence +from typing import Dict, Iterator, List, Sequence import dlt from dlt.common import logger -from dlt.common.typing import DictStrAny, DictStrStr +from dlt.common.typing import DictStrAny from dlt.sources import DltResource from .helpers import get_stats_with_retry, parse_response @@ -18,16 +18,19 @@ @dlt.source(name="bing_webmaster") def source( - site_urls: List[str] = None, site_url_pages: Iterable[DictStrStr] = None + site_urls: List[str] = dlt.config.value, + site_url_pages: List[Dict[str, str]] = dlt.config.value, ) -> Sequence[DltResource]: """ A dlt source for the Bing Webmaster api. It groups resources for the APIs which return organic search traffic statistics Args: - site_urls: List[str]: A list of site_urls, e.g, ["dlthub.com", "dlthub.de"]. Use this if you need the weekly traffic per site_url and page - site_url_pages: Iterable[DictStrStr]: A list of pairs of site_url and page. Use this if you need the weekly traffic per site_url, page, and query + site_urls: List[str]: A list of site_urls to get weekly traffic statistics for + site_url_pages: Iterable[Dict[str, str]]: A list of pairs of site_url and page for which to return weekly traffic per site_url, page, and query Returns: Sequence[DltResource]: A sequence of resources that can be selected from including page_stats and page_query_stats. + Examples: + >>> source(site_urls=["dlthub.de", "dlthub.com"], site_url_pages=[{"site_url": "dlthub.com", "page": "https://www.dlthub.com/docs"}]) """ return ( page_stats(site_urls), @@ -70,7 +73,7 @@ def page_stats( table_name="bing_page_query_stats", ) def page_query_stats( - site_url_pages: Iterable[DictStrStr], + site_url_pages: List[Dict[str, str]], api_key: str = dlt.secrets.value, ) -> Iterator[Iterator[DictStrAny]]: """ @@ -80,7 +83,7 @@ def page_query_stats( https://learn.microsoft.com/en-us/dotnet/api/microsoft.bing.webmaster.api.interfaces.iwebmasterapi.getpagequerystats Args: - site_url_page (Iterable[DictStrStr]): Iterable of site_url and pages to retrieve statistics for. Can be result of a SQL query, a parsed sitemap, etc. + site_url_page (List[Dict[str,str]]): Iterable of site_url and pages to retrieve statistics for. Can be result of a SQL query, a parsed sitemap, etc. Yields: Iterator[Dict[str, Any]]: An iterator over list of organic traffic statistics. """ diff --git a/sources/chess/__init__.py b/sources/chess/__init__.py index 3915abe5c..f0004f053 100644 --- a/sources/chess/__init__.py +++ b/sources/chess/__init__.py @@ -14,7 +14,9 @@ @dlt.source(name="chess") def source( - players: List[str], start_month: str = None, end_month: str = None + players: List[str] = dlt.config.value, + start_month: str = None, + end_month: str = None, ) -> Sequence[DltResource]: """ A dlt source for the chess.com api. It groups several resources (in this case chess.com API endpoints) containing diff --git a/sources/github/__init__.py b/sources/github/__init__.py index 4a1834528..b893dfeb7 100644 --- a/sources/github/__init__.py +++ b/sources/github/__init__.py @@ -12,8 +12,8 @@ @dlt.source def github_reactions( - owner: str, - name: str, + owner: str = dlt.config.value, + name: str = dlt.config.value, access_token: str = dlt.secrets.value, items_per_page: int = 100, max_items: Optional[int] = None, @@ -67,7 +67,9 @@ def github_reactions( @dlt.source(max_table_nesting=2) def github_repo_events( - owner: str, name: str, access_token: Optional[str] = None + owner: str = dlt.config.value, + name: str = dlt.config.value, + access_token: Optional[str] = None, ) -> DltResource: """Gets events for repository `name` with owner `owner` incrementally. @@ -112,8 +114,8 @@ def repo_events( @dlt.source def github_stargazers( - owner: str, - name: str, + owner: str = dlt.config.value, + name: str = dlt.config.value, access_token: str = dlt.secrets.value, items_per_page: int = 100, max_items: Optional[int] = None, diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 1edbc1591..16cdac90c 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -477,8 +477,8 @@ def fetch_props( @dlt.resource def hubspot_events_for_objects( - object_type: THubspotObjectType, - object_ids: List[str], + object_type: THubspotObjectType = dlt.config.value, + object_ids: List[str] = dlt.config.value, api_key: str = dlt.secrets.value, start_date: pendulum.DateTime = STARTDATE, ) -> DltResource: diff --git a/sources/kafka/__init__.py b/sources/kafka/__init__.py index 846b35621..5036fd20e 100644 --- a/sources/kafka/__init__.py +++ b/sources/kafka/__init__.py @@ -27,7 +27,7 @@ standalone=True, ) def kafka_consumer( - topics: Union[str, List[str]], + topics: List[str] = dlt.config.value, credentials: Union[KafkaCredentials, Consumer] = dlt.secrets.value, msg_processor: Optional[ Callable[[Message], Dict[str, Any]] @@ -60,9 +60,6 @@ def kafka_consumer( Yields: Iterable[TDataItem]: Kafka messages. """ - if not isinstance(topics, list): - topics = [topics] - if isinstance(credentials, Consumer): consumer = credentials elif isinstance(credentials, KafkaCredentials): diff --git a/sources/kafka_pipeline.py b/sources/kafka_pipeline.py index af9476cad..0dc08b304 100644 --- a/sources/kafka_pipeline.py +++ b/sources/kafka_pipeline.py @@ -47,7 +47,7 @@ def custom_msg_processor(msg: confluent_kafka.Message) -> Dict[str, Any]: "data": msg.value().decode("utf-8"), } - data = kafka_consumer("books", msg_processor=custom_msg_processor) + data = kafka_consumer(["books"], msg_processor=custom_msg_processor) info = pipeline.run(data) print(info) @@ -63,7 +63,7 @@ def load_starting_from_date() -> None: ) from_date = datetime(2023, 12, 15) - data = kafka_consumer("books", start_from=from_date) + data = kafka_consumer(["books"], start_from=from_date) info = pipeline.run(data) print(info) diff --git a/tests/bing_webmaster/test_bing_webmaster_source.py b/tests/bing_webmaster/test_bing_webmaster_source.py index fce979ae8..6eaddac8f 100644 --- a/tests/bing_webmaster/test_bing_webmaster_source.py +++ b/tests/bing_webmaster/test_bing_webmaster_source.py @@ -17,7 +17,7 @@ def _make_pipeline(destination_name: str) -> dlt.Pipeline: @pytest.mark.parametrize("destination_name", ["duckdb"]) def test_load_unauthorized_domain(destination_name: str) -> None: pipeline = _make_pipeline(destination_name) - data = source(site_urls=["wikipedia.org"]) + data = source(site_urls=["wikipedia.org"], site_url_pages=[]) with pytest.raises(Exception): pipeline.run(data.with_resources("page_stats")) @@ -28,7 +28,7 @@ def test_load_page_stats(destination_name: str) -> None: table_name = "bing_page_stats" # Note: If this test fails: replace this with a site_url you can access - data = source(site_urls=["dlthub.com"]) + data = source(site_urls=["dlthub.com"], site_url_pages=[]) info = pipeline.run(data.with_resources("page_stats")) assert_load_info(info) @@ -63,11 +63,12 @@ def test_load_page_query_stats(destination_name: str) -> None: table_name = "bing_page_query_stats" data = source( + site_urls=["dlthub.com"], # Note: If this test fails: replace this with site_url and pages you can access site_url_pages=[ {"site_url": "dlthub.com", "page": "https://dlthub.com/docs/intro"}, {"site_url": "dlthub.com", "page": "https://dlthub.com/why/"}, - ] + ], ) info = pipeline.run(data.with_resources("page_query_stats")) assert_load_info(info) diff --git a/tests/github/test_github_source.py b/tests/github/test_github_source.py index 5533de201..53afdd5e7 100644 --- a/tests/github/test_github_source.py +++ b/tests/github/test_github_source.py @@ -54,10 +54,10 @@ def test_github_events(destination_name: str) -> None: pipeline = dlt.pipeline( "github_events", destination=destination_name, - dataset_name="airflow_events", + dataset_name="duckdb_events", dev_mode=True, ) - data = github_repo_events("apache", "airflow") + data = github_repo_events("duckdb", "duckdb") load_info = pipeline.run(data) assert_load_info(load_info) # all tables must end with event (auto created from event types) or contain "_event__" (child tables) @@ -67,7 +67,7 @@ def test_github_events(destination_name: str) -> None: ) load_table_counts(pipeline, *table_names) # load again. it could happen that a single event got loaded but surely numbers should not double and events must be unique - data = github_repo_events("apache", "airflow") + data = github_repo_events("duckdb", "duckdb") pipeline.run(data) table_names = [ t["name"]