diff --git a/src/ol_orchestrate/assets/sloan_api.py b/src/ol_orchestrate/assets/sloan_api.py new file mode 100644 index 000000000..5b9708fea --- /dev/null +++ b/src/ol_orchestrate/assets/sloan_api.py @@ -0,0 +1,120 @@ +# Call MIT Sloan Executive Education APIs to get courses and course-offerings data +# Model the different asset objects according to their type and data structure +import hashlib +import json +from datetime import UTC, datetime +from pathlib import Path + +import jsonlines +from dagster import ( + AssetExecutionContext, + AssetKey, + AssetOut, + DataVersion, + Output, + multi_asset, +) + +from ol_orchestrate.resources.oauth import OAuthApiClientFactory + + +@multi_asset( + group_name="sloan_executive_education", + outs={ + "course_metadata": AssetOut( + description="The metadata for courses extracted from sloan course API", + io_manager_key="s3file_io_manager", + key=AssetKey(("sloan_executive_education", "course_metadata")), + ), + "course_offering_metadata": AssetOut( + description="The metadata for course offerings extracted from sloan course " + "offering API", + io_manager_key="s3file_io_manager", + key=AssetKey(("sloan_executive_education", "course_offering_metadata")), + ), + }, +) +def sloan_course_metadata( + context: AssetExecutionContext, sloan_api: OAuthApiClientFactory +): + data_retrieval_timestamp = datetime.now(tz=UTC).isoformat() + + sloan_courses = sloan_api.client.fetch_with_auth( + "https://mit-unified-portal-prod-78eeds.43d8q2.usa-e2.cloudhub.io/api/courses" + ) + courses = [ + { + "course_id": course["Course_Id"], + "title": course["Title"], + "description": course["Description"], + "course_url": course["URL"], + "certification_type": course["Certification_Type"], + "topics": course["Topics"], + "image_url": course["Image_Src"], + "created": course["SourceCreateDate"], + "modified": course["SourceLastModifiedDate"], + "retrieved_at": data_retrieval_timestamp, + } + for course in sloan_courses + ] + + context.log.info("Total extracted %d Sloan courses....", len(courses)) + + sloan_course_offerings = sloan_api.client.fetch_with_auth( + "https://mit-unified-portal-prod-78eeds.43d8q2.usa-e2.cloudhub.io/api/course-offerings" + ) + course_offerings = [ + { + "title": course_offering["CO_Title"], + "course_id": course_offering["Course_Id"], + "start_date": course_offering["Start_Date"], + "end_date": course_offering["End_Date"], + "delivery": course_offering["Delivery"], + "duration": course_offering["Duration"], + "price": course_offering["Price"], + "continuing_ed_credits": course_offering["Continuing_Ed_Credits"], + "time_commitment": course_offering["Time_Commitment"], + "location": course_offering["Location"], + "tuition_cost_non_usd": course_offering["Tuition_Cost(non-USD)"], + "currency": course_offering["Currency"], + "faculty": course_offering["Faculty_Name"], + "retrieved_at": data_retrieval_timestamp, + } + for course_offering in sloan_course_offerings + ] + context.log.info( + "Total extracted %d Sloan course offerings....", len(course_offerings) + ) + + course_data_version = hashlib.sha256( + json.dumps(courses).encode("utf-8") + ).hexdigest() + course_offering_data_version = hashlib.sha256( + json.dumps(course_offerings).encode("utf-8") + ).hexdigest() + + course_file = Path(f"course_{course_data_version}.json") + course_offering_file = Path(f"course_offering_{course_offering_data_version}.json") + course_object_key = f"{'/'.join(context.asset_key_for_output('course_metadata').path)}/{course_data_version}.json" # noqa: E501 + course_offering_object_key = f"{'/'.join(context.asset_key_for_output('course_offering_metadata').path)}/{course_offering_data_version}.json" # noqa: E501 + + with ( + jsonlines.open(course_file, mode="w") as course, + jsonlines.open(course_offering_file, mode="w") as offering, + ): + course.write_all(courses) + offering.write_all(course_offerings) + + yield Output( + (course_file, course_object_key), + output_name="course_metadata", + data_version=DataVersion(course_data_version), + metadata={"object_key": course_object_key}, + ) + + yield Output( + (course_offering_file, course_offering_object_key), + output_name="course_offering_metadata", + data_version=DataVersion(course_offering_data_version), + metadata={"object_key": course_offering_object_key}, + ) diff --git a/src/ol_orchestrate/definitions/learning_resource/__init__.py b/src/ol_orchestrate/definitions/learning_resource/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/ol_orchestrate/definitions/learning_resource/extract_api_data.py b/src/ol_orchestrate/definitions/learning_resource/extract_api_data.py new file mode 100644 index 000000000..63971674d --- /dev/null +++ b/src/ol_orchestrate/definitions/learning_resource/extract_api_data.py @@ -0,0 +1,37 @@ +from dagster import ( + AssetSelection, + Definitions, + ScheduleDefinition, +) +from dagster_aws.s3 import S3Resource + +from ol_orchestrate.assets.sloan_api import sloan_course_metadata +from ol_orchestrate.io_managers.filepath import S3FileObjectIOManager +from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS +from ol_orchestrate.lib.dagster_helpers import default_io_manager +from ol_orchestrate.lib.utils import authenticate_vault, s3_uploads_bucket +from ol_orchestrate.resources.oauth import OAuthApiClientFactory + +vault = authenticate_vault(DAGSTER_ENV, VAULT_ADDRESS) + +extract_api_daily_schedule = ScheduleDefinition( + name="learning_resource_api_schedule", + target=AssetSelection.assets(sloan_course_metadata), + cron_schedule="@daily", + execution_timezone="Etc/UTC", +) + +extract_api_data = Definitions( + resources={ + "io_manager": default_io_manager(DAGSTER_ENV), + "s3file_io_manager": S3FileObjectIOManager( + bucket=s3_uploads_bucket(DAGSTER_ENV)["bucket"], + path_prefix=s3_uploads_bucket(DAGSTER_ENV)["prefix"], + ), + "vault": vault, + "s3": S3Resource(), + "sloan_api": OAuthApiClientFactory(deployment="sloan", vault=vault), + }, + assets=[sloan_course_metadata], + schedules=[extract_api_daily_schedule], +) diff --git a/src/ol_orchestrate/lib/utils.py b/src/ol_orchestrate/lib/utils.py new file mode 100644 index 000000000..f30bfae9c --- /dev/null +++ b/src/ol_orchestrate/lib/utils.py @@ -0,0 +1,49 @@ +from typing import Any, Literal + +from ol_orchestrate.resources.secrets.vault import Vault + + +def authenticate_vault(dagster_env: str, vault_address: str) -> Vault: + """ + Authenticate with Vault based on the dagster environment and authentication method. + + Parameters: + dagster_env (str): The environment in which the Dagster service is running. + vault_address (str): The address of the Vault server. + + Returns: + Vault: An authenticated Vault client. + """ + if dagster_env == "dev": + vault = Vault(vault_addr=vault_address, vault_auth_type="github") + vault._auth_github() # noqa: SLF001 + else: + vault = Vault( + vault_addr=vault_address, vault_role="dagster-server", aws_auth_mount="aws" + ) + vault._auth_aws_iam() # noqa: SLF001 + + return vault + + +def s3_uploads_bucket( + dagster_env: Literal["dev", "qa", "production"], +) -> dict[str, Any]: + """ + Return the S3 bucket configuration based on the environment. + + Parameters: + dagster_env (Literal): Environment name, one of "dev", "qa", or "production". + + Returns: + dict: A dictionary with the S3 bucket and prefix for the specified environment. + """ + bucket_map = { + "dev": {"bucket": "ol-devops-sandbox", "prefix": "pipeline-storage"}, + "qa": {"bucket": "ol-data-lake-landing-zone-qa", "prefix": ""}, + "production": { + "bucket": "ol-data-lake-landing-zone-production", + "prefix": "", + }, + } + return bucket_map[dagster_env] diff --git a/src/ol_orchestrate/repositories/open_edx.py b/src/ol_orchestrate/repositories/open_edx.py index 9395d45f1..ae96d57d3 100644 --- a/src/ol_orchestrate/repositories/open_edx.py +++ b/src/ol_orchestrate/repositories/open_edx.py @@ -63,7 +63,7 @@ def open_edx_export_irx_job_config( edx_resource_config = { "client_id": edx_creds["id"], "client_secret": edx_creds["secret"], - "lms_url": edx_creds["url"], + "base_url": edx_creds["url"], "studio_url": edx_creds["studio_url"], "token_type": "JWT", "token_url": f"{edx_creds['url']}/oauth2/access_token", diff --git a/src/ol_orchestrate/resources/oauth.py b/src/ol_orchestrate/resources/oauth.py new file mode 100644 index 000000000..f34c94046 --- /dev/null +++ b/src/ol_orchestrate/resources/oauth.py @@ -0,0 +1,140 @@ +import time +from collections.abc import Generator +from contextlib import contextmanager +from datetime import UTC, datetime, timedelta +from typing import Any, Optional, Self + +import httpx +from dagster import ConfigurableResource, InitResourceContext, ResourceDependency +from pydantic import Field, PrivateAttr, ValidationError, validator + +from ol_orchestrate.resources.secrets.vault import Vault + +TOO_MANY_REQUESTS = 429 + + +class OAuthApiClient(ConfigurableResource): + client_id: str = Field(description="OAUTH2 client ID") + client_secret: str = Field(description="OAUTH2 client secret") + token_type: str = Field( + default="JWT", + description="Token type to generate for use with authenticated requests", + ) + token_url: str = Field( + description="URL to request token. e.g. https://lms.mitx.mit.edu/oauth2/access_token", + ) + base_url: str = Field( + description="Base URL of OAuth API client being queries. e.g. https://lms.mitx.mit.edu/", + ) + http_timeout: int = Field( + default=60, + description=( + "Time (in seconds) to allow for requests to complete before timing out." + ), + ) + _access_token: Optional[str] = PrivateAttr(default=None) + _access_token_expires: Optional[datetime] = PrivateAttr(default=None) + _http_client: httpx.Client = PrivateAttr(default=None) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._initialize_client() + + def _initialize_client(self) -> None: + if self._http_client is not None: + return + timeout = httpx.Timeout(self.http_timeout, connect=10) + self._http_client = httpx.Client(timeout=timeout) + + @validator("token_type") + def validate_token_type(cls, token_type): # noqa: N805 + if token_type.lower() not in ["jwt", "bearer"]: + raise ValidationError + return token_type + + def _fetch_access_token(self) -> Optional[str]: + now = datetime.now(tz=UTC) + if self._access_token is None or (self._access_token_expires or now) <= now: + payload = { + "grant_type": "client_credentials", + "client_id": self.client_id, + "client_secret": self.client_secret, + "token_type": self.token_type, + } + response = self._http_client.post(self.token_url, data=payload) + response.raise_for_status() + self._access_token = response.json()["access_token"] + self._access_token_expires = now + timedelta( + seconds=response.json()["expires_in"] + ) + return self._access_token + + @property + def _username(self) -> str: + response = self._http_client.get( + f"{self.base_url}/api/user/v1/me", + headers={"Authorization": f"JWT {self._fetch_access_token()}"}, + ) + response.raise_for_status() + return response.json()["username"] + + def fetch_with_auth( + self, + request_url: str, + page_size: int = 100, + extra_params: dict[str, Any] | None = None, + ) -> dict[Any, Any]: + if self.token_url == f"{self.base_url}/oauth2/access_token": + request_params = {"username": self._username, "page_size": page_size} + else: + request_params = {} + + response = self._http_client.get( + request_url, + headers={"Authorization": f"JWT {self._fetch_access_token()}"}, + params=httpx.QueryParams(**request_params), + ) + + try: + response.raise_for_status() + except httpx.HTTPStatusError as error_response: + if error_response.response.status_code == TOO_MANY_REQUESTS: + retry_after = error_response.response.headers.get("Retry-After", 60) + delay = int(retry_after) if retry_after.isdigit() else 60 + time.sleep(delay) + return self.fetch_with_auth( + request_url, page_size=page_size, extra_params=extra_params + ) + raise + return response.json() + + +class OAuthApiClientFactory(ConfigurableResource): + deployment: str = Field(description="The name of the deployment") + _client: OAuthApiClient = PrivateAttr() + vault: ResourceDependency[Vault] + + def _initialize_client(self) -> OAuthApiClient: + client_secrets = self.vault.client.secrets.kv.v1.read_secret( + mount_point="secret-data", + path=f"pipelines/{self.deployment}/oauth-client", + )["data"] + + self._client = OAuthApiClient( + client_id=client_secrets["id"], + client_secret=client_secrets["secret"], + base_url=client_secrets["url"], + token_url=client_secrets.get( + "token_url", f"{client_secrets['url']}/oauth2/access_token" + ), + ) + return self._client + + @property + def client(self) -> OAuthApiClient: + return self._client + + @contextmanager + def yield_for_execution(self, context: InitResourceContext) -> Generator[Self]: # noqa: ARG002 + self._initialize_client() + yield self diff --git a/src/ol_orchestrate/resources/openedx.py b/src/ol_orchestrate/resources/openedx.py index c506df356..a8b626178 100644 --- a/src/ol_orchestrate/resources/openedx.py +++ b/src/ol_orchestrate/resources/openedx.py @@ -1,123 +1,24 @@ -import time from collections.abc import Generator from contextlib import contextmanager -from datetime import UTC, datetime, timedelta -from typing import Any, Optional, Self +from typing import Optional, Self from urllib.parse import parse_qs -import httpx from dagster import ConfigurableResource, InitResourceContext, ResourceDependency -from pydantic import Field, PrivateAttr, ValidationError, validator +from pydantic import Field, PrivateAttr +from ol_orchestrate.resources.oauth import OAuthApiClient from ol_orchestrate.resources.secrets.vault import Vault -TOO_MANY_REQUESTS = 429 - -class OpenEdxApiClient(ConfigurableResource): - client_id: str = Field(description="OAUTH2 client ID for Open edX installation") - client_secret: str = Field( - description="OAUTH2 client secret for Open edX installation" - ) - lms_url: str = Field( - description="Base URL of edX instance LMS being queried, including protocol. " - "e.g. https://lms.mitx.mit.edu" - ) +class OpenEdxApiClient(OAuthApiClient): studio_url: Optional[str] = Field( default=None, description="Base URL of edx instance Studio being queried, including protocol." " e.g. https://studio.mitx.mit.edu", ) - token_type: str = Field( - default="JWT", - description="Token type to generate for use with authenticated requests", - ) - token_url: str = Field( - description="URL to request token. e.g. https://lms.mitx.mit.edu/oauth2/access_token", - ) - http_timeout: int = Field( - default=60, - description=( - "Time (in seconds) to allow for requests to complete before timing out." - ), - ) - _access_token: Optional[str] = PrivateAttr(default=None) - _access_token_expires: Optional[datetime] = PrivateAttr(default=None) - _http_client: httpx.Client = PrivateAttr(default=None) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._initialize_client() - - @validator("token_type") - def validate_token_type(cls, token_type): # noqa: N805 - if token_type.lower() not in ["jwt", "bearer"]: - raise ValidationError - return token_type - - def _initialize_client(self) -> None: - if self._http_client is not None: - return - timeout = httpx.Timeout(self.http_timeout, connect=10) - self._http_client = httpx.Client(timeout=timeout) - - def _fetch_access_token(self) -> Optional[str]: - now = datetime.now(tz=UTC) - if self._access_token is None or (self._access_token_expires or now) <= now: - payload = { - "grant_type": "client_credentials", - "client_id": self.client_id, - "client_secret": self.client_secret, - "token_type": self.token_type, - } - response = self._http_client.post(self.token_url, data=payload) - response.raise_for_status() - self._access_token = response.json()["access_token"] - self._access_token_expires = now + timedelta( - seconds=response.json()["expires_in"] - ) - return self._access_token - - @property - def _username(self) -> str: - response = self._http_client.get( - f"{self.lms_url}/api/user/v1/me", - headers={"Authorization": f"JWT {self._fetch_access_token()}"}, - ) - response.raise_for_status() - return response.json()["username"] - - def _fetch_with_auth( - self, - request_url: str, - page_size: int = 100, - extra_params: dict[str, Any] | None = None, - ) -> dict[Any, Any]: - if self.token_url == f"{self.lms_url}/oauth2/access_token": - request_params = {"username": self._username, "page_size": page_size} - else: - request_params = {} - - request_params.update(**(extra_params or {})) - - response = self._http_client.get( - request_url, - headers={"Authorization": f"JWT {self._fetch_access_token()}"}, - params=httpx.QueryParams(**request_params), - ) - - try: - response.raise_for_status() - except httpx.HTTPStatusError as error_response: - if error_response.response.status_code == TOO_MANY_REQUESTS: - retry_after = error_response.response.headers.get("Retry-After", 60) - delay = int(retry_after) if retry_after.isdigit() else 60 - time.sleep(delay) - return self._fetch_with_auth( - request_url, page_size=page_size, extra_params=extra_params - ) - raise - return response.json() def get_edx_course_ids( self, page_size: int = 100 @@ -130,13 +31,13 @@ def get_edx_course_ids( :yield: A generator for walking the paginated list of courses returned from the API """ - request_url = f"{self.lms_url}/api/courses/v1/courses/" - response_data = self._fetch_with_auth(request_url, page_size=page_size) + request_url = f"{self.base_url}/api/courses/v1/courses/" + response_data = self.fetch_with_auth(request_url, page_size=page_size) course_data = response_data["results"] next_page = response_data["pagination"].get("next") yield course_data while next_page: - response_data = self._fetch_with_auth( + response_data = self.fetch_with_auth( request_url, page_size=page_size, extra_params=parse_qs(next_page) ) next_page = response_data["pagination"].get("next") @@ -165,7 +66,7 @@ def check_course_export_status( self, course_id: str, task_id: str ) -> dict[str, str]: request_url = f"{self.studio_url}/api/courses/v0/export/{course_id}/" - return self._fetch_with_auth(request_url, extra_params={"task_id": task_id}) + return self.fetch_with_auth(request_url, extra_params={"task_id": task_id}) def get_course_structure_document(self, course_id: str): """Retrieve the course structure for an active course as JSON. @@ -175,14 +76,14 @@ def get_course_structure_document(self, course_id: str): :returns: JSON document representing the hierarchical structure of the target course. """ - request_url = f"{self.lms_url}/api/course-structure/v0/{course_id}/" - return self._fetch_with_auth(request_url) + request_url = f"{self.base_url}/api/course-structure/v0/{course_id}/" + return self.fetch_with_auth(request_url) def get_course_outline(self, course_id: str): request_url = ( - f"{self.lms_url}/api/learning_sequences/v1/course_outline/{course_id}" + f"{self.base_url}/api/learning_sequences/v1/course_outline/{course_id}" ) - return self._fetch_with_auth(request_url) + return self.fetch_with_auth(request_url) def get_edxorg_programs(self): """ @@ -194,13 +95,13 @@ def get_edxorg_programs(self): """ request_url = "https://discovery.edx.org/api/v1/programs/" - response_data = self._fetch_with_auth(request_url) + response_data = self.fetch_with_auth(request_url) results = response_data["results"] next_page = response_data["next"] count = response_data["count"] yield count, results while next_page: - response_data = self._fetch_with_auth( + response_data = self.fetch_with_auth( request_url, extra_params=parse_qs(next_page) ) next_page = response_data["next"] @@ -214,13 +115,13 @@ def get_edxorg_mitx_courses(self): Yield: A generator for walking the paginated list of courses """ course_catalog_url = "https://discovery.edx.org/api/v1/catalogs/10/courses/" - response_data = self._fetch_with_auth(course_catalog_url) + response_data = self.fetch_with_auth(course_catalog_url) results = response_data["results"] next_page = response_data["next"] count = response_data["count"] yield count, results while next_page: - response_data = self._fetch_with_auth( + response_data = self.fetch_with_auth( course_catalog_url, extra_params=parse_qs(next_page) ) next_page = response_data["next"] @@ -244,7 +145,7 @@ def _initialize_client(self) -> OpenEdxApiClient: self._client = OpenEdxApiClient( client_id=client_secrets["id"], client_secret=client_secrets["secret"], - lms_url=client_secrets["url"], + base_url=client_secrets["url"], studio_url=client_secrets["studio_url"], token_url=client_secrets.get( "token_url", f"{client_secrets['url']}/oauth2/access_token" diff --git a/src/ol_orchestrate/workspace.yaml b/src/ol_orchestrate/workspace.yaml index 34f9daa9a..23a0acaf2 100644 --- a/src/ol_orchestrate/workspace.yaml +++ b/src/ol_orchestrate/workspace.yaml @@ -4,6 +4,7 @@ load_from: - python_module: ol_orchestrate.definitions.edx.openedx_data_extract - python_module: ol_orchestrate.definitions.edx.retrieve_edxorg_raw_data - python_module: ol_orchestrate.definitions.edx.sync_program_credential_reports +- python_module: ol_orchestrate.definitions.learning_resource.extract_api_data - python_module: ol_orchestrate.definitions.lakehouse.elt - python_module: ol_orchestrate.definitions.platform.notification - python_module: ol_orchestrate.repositories.edx_gcs_courses