Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest sloan courses via api #1487

Merged
merged 5 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ol_orchestrate/assets/sloan_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
multi_asset,
)

from ol_orchestrate.resources.openedx import OpenEdxApiClientFactory
from ol_orchestrate.resources.oauth import OAuthApiClientFactory


@multi_asset(
Expand All @@ -35,7 +35,7 @@
},
)
def sloan_course_metadata(
context: AssetExecutionContext, sloan_api: OpenEdxApiClientFactory
context: AssetExecutionContext, sloan_api: OAuthApiClientFactory
):
data_retrieval_timestamp = datetime.now(tz=UTC).isoformat()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS
from ol_orchestrate.lib.dagster_helpers import default_io_manager
from ol_orchestrate.lib.utils import authenticate_vault, s3_uploads_bucket
from ol_orchestrate.resources.openedx import OpenEdxApiClientFactory
from ol_orchestrate.resources.oauth import OAuthApiClientFactory

vault = authenticate_vault(DAGSTER_ENV, VAULT_ADDRESS)

Expand All @@ -30,9 +30,7 @@
),
"vault": vault,
"s3": S3Resource(),
"sloan_api": OpenEdxApiClientFactory(
deployment="sloan-executive-education", vault=vault
),
"sloan_api": OAuthApiClientFactory(deployment="sloan", vault=vault),
},
assets=[sloan_course_metadata],
schedules=[extract_api_daily_schedule],
Expand Down
2 changes: 1 addition & 1 deletion src/ol_orchestrate/repositories/open_edx.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def open_edx_export_irx_job_config(
edx_resource_config = {
"client_id": edx_creds["id"],
"client_secret": edx_creds["secret"],
"lms_url": edx_creds["url"],
"base_url": edx_creds["url"],
"studio_url": edx_creds["studio_url"],
"token_type": "JWT",
"token_url": f"{edx_creds['url']}/oauth2/access_token",
Expand Down
158 changes: 158 additions & 0 deletions src/ol_orchestrate/resources/oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import time
from collections.abc import Generator
from contextlib import contextmanager
from datetime import UTC, datetime, timedelta
from typing import Any, Optional, Self

import httpx
from dagster import ConfigurableResource, InitResourceContext, ResourceDependency
from pydantic import Field, PrivateAttr, ValidationError, validator

from ol_orchestrate.resources.secrets.vault import Vault

TOO_MANY_REQUESTS = 429


class OAuthApiClient(ConfigurableResource):
client_id: str = Field(description="OAUTH2 client ID")
client_secret: str = Field(description="OAUTH2 client secret")
token_type: str = Field(
default="JWT",
description="Token type to generate for use with authenticated requests",
)
token_url: str = Field(
description="URL to request token. e.g. https://lms.mitx.mit.edu/oauth2/access_token",
)
base_url: str = Field(
description="Base URL of OAuth API client being queries. e.g. https://lms.mitx.mit.edu/",
)
http_timeout: int = Field(
default=60,
description=(
"Time (in seconds) to allow for requests to complete before timing out."
),
)
_access_token: Optional[str] = PrivateAttr(default=None)
_access_token_expires: Optional[datetime] = PrivateAttr(default=None)
_http_client: httpx.Client = PrivateAttr(default=None)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._initialize_client()

def _initialize_client(self) -> None:
if self._http_client is not None:
return
timeout = httpx.Timeout(self.http_timeout, connect=10)
self._http_client = httpx.Client(timeout=timeout)

@validator("token_type")
def validate_token_type(cls, token_type): # noqa: N805
if token_type.lower() not in ["jwt", "bearer"]:
raise ValidationError
return token_type

def _fetch_access_token(self) -> Optional[str]:
now = datetime.now(tz=UTC)
if self._access_token is None or (self._access_token_expires or now) <= now:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"token_type": self.token_type,
}
response = self._http_client.post(self.token_url, data=payload)
response.raise_for_status()
self._access_token = response.json()["access_token"]
self._access_token_expires = now + timedelta(
seconds=response.json()["expires_in"]
)
return self._access_token

@property
def _username(self) -> str:
response = self._http_client.get(
f"{self.base_url}/api/user/v1/me",
headers={"Authorization": f"JWT {self._fetch_access_token()}"},
)
response.raise_for_status()
return response.json()["username"]

def _fetch_with_auth(
self,
request_url: str,
page_size: int = 100,
extra_params: dict[str, Any] | None = None,
) -> dict[Any, Any]:
if self.token_url == f"{self.base_url}/oauth2/access_token":
request_params = {"username": self._username, "page_size": page_size}
else:
request_params = {}

response = self._http_client.get(
request_url,
headers={"Authorization": f"JWT {self._fetch_access_token()}"},
params=httpx.QueryParams(**request_params),
)

try:
response.raise_for_status()
except httpx.HTTPStatusError as error_response:
if error_response.response.status_code == TOO_MANY_REQUESTS:
retry_after = error_response.response.headers.get("Retry-After", 60)
delay = int(retry_after) if retry_after.isdigit() else 60
time.sleep(delay)
return self._fetch_with_auth(
request_url, page_size=page_size, extra_params=extra_params
)
raise
return response.json()

def get_sloan_courses(self):
"""
Retrieve the course data from their API as JSON

returns: JSON document representing an array of course objects
"""
course_url = "https://mit-unified-portal-prod-78eeds.43d8q2.usa-e2.cloudhub.io/api/courses"
return self._fetch_with_auth(course_url)

def get_sloan_course_offerings(self):
"""
Retrieve the course offerings data from their API as JSON

returns: JSON document representing an array of course offering objects
"""
course_offering_url = "https://mit-unified-portal-prod-78eeds.43d8q2.usa-e2.cloudhub.io/api/course-offerings"
return self._fetch_with_auth(course_offering_url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These method seem like they're unnecessary in this context. If we wanted to make a Sloan API specific resource we could put them there, but I think it's easy enough to just use the fetch_with_auth and pass the URL in question. Another reason to not have these methods here is that they will then be inherited by the edX resource (which won't break anything, but is a leaky abstraction)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I realized that they don't belong here. I moved them to the asset file so we don't need to create a new resource file for these two simple methods.



class OAuthApiClientFactory(ConfigurableResource):
deployment: str = Field(description="The name of the deployment")
_client: OAuthApiClient = PrivateAttr()
vault: ResourceDependency[Vault]

def _initialize_client(self) -> OAuthApiClient:
client_secrets = self.vault.client.secrets.kv.v1.read_secret(
mount_point="secret-data",
path=f"pipelines/{self.deployment}/oauth-client",
)["data"]

self._client = OAuthApiClient(
client_id=client_secrets["id"],
client_secret=client_secrets["secret"],
base_url=client_secrets["url"],
token_url=client_secrets.get(
"token_url", f"{client_secrets['url']}/oauth2/access_token"
),
)
return self._client

@property
def client(self) -> OAuthApiClient:
return self._client

@contextmanager
def yield_for_execution(self, context: InitResourceContext) -> Generator[Self]: # noqa: ARG002
self._initialize_client()
yield self
131 changes: 8 additions & 123 deletions src/ol_orchestrate/resources/openedx.py
Original file line number Diff line number Diff line change
@@ -1,123 +1,26 @@
import time
from collections.abc import Generator
from contextlib import contextmanager
from datetime import UTC, datetime, timedelta
from typing import Any, Optional, Self
from typing import Optional, Self
from urllib.parse import parse_qs

import httpx
from dagster import ConfigurableResource, InitResourceContext, ResourceDependency
from pydantic import Field, PrivateAttr, ValidationError, validator
from pydantic import Field, PrivateAttr

from ol_orchestrate.resources.oauth import OAuthApiClient
from ol_orchestrate.resources.secrets.vault import Vault

TOO_MANY_REQUESTS = 429


class OpenEdxApiClient(ConfigurableResource):
client_id: str = Field(description="OAUTH2 client ID for Open edX installation")
client_secret: str = Field(
description="OAUTH2 client secret for Open edX installation"
)
lms_url: str = Field(
description="Base URL of edX instance LMS being queried, including protocol. "
"e.g. https://lms.mitx.mit.edu"
)
class OpenEdxApiClient(OAuthApiClient):
studio_url: Optional[str] = Field(
default=None,
description="Base URL of edx instance Studio being queried, including protocol."
" e.g. https://studio.mitx.mit.edu",
)
token_type: str = Field(
default="JWT",
description="Token type to generate for use with authenticated requests",
)
token_url: str = Field(
description="URL to request token. e.g. https://lms.mitx.mit.edu/oauth2/access_token",
)
http_timeout: int = Field(
default=60,
description=(
"Time (in seconds) to allow for requests to complete before timing out."
),
)
_access_token: Optional[str] = PrivateAttr(default=None)
_access_token_expires: Optional[datetime] = PrivateAttr(default=None)
_http_client: httpx.Client = PrivateAttr(default=None)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._initialize_client()

@validator("token_type")
def validate_token_type(cls, token_type): # noqa: N805
if token_type.lower() not in ["jwt", "bearer"]:
raise ValidationError
return token_type

def _initialize_client(self) -> None:
if self._http_client is not None:
return
timeout = httpx.Timeout(self.http_timeout, connect=10)
self._http_client = httpx.Client(timeout=timeout)

def _fetch_access_token(self) -> Optional[str]:
now = datetime.now(tz=UTC)
if self._access_token is None or (self._access_token_expires or now) <= now:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"token_type": self.token_type,
}
response = self._http_client.post(self.token_url, data=payload)
response.raise_for_status()
self._access_token = response.json()["access_token"]
self._access_token_expires = now + timedelta(
seconds=response.json()["expires_in"]
)
return self._access_token

@property
def _username(self) -> str:
response = self._http_client.get(
f"{self.lms_url}/api/user/v1/me",
headers={"Authorization": f"JWT {self._fetch_access_token()}"},
)
response.raise_for_status()
return response.json()["username"]

def _fetch_with_auth(
self,
request_url: str,
page_size: int = 100,
extra_params: dict[str, Any] | None = None,
) -> dict[Any, Any]:
if self.token_url == f"{self.lms_url}/oauth2/access_token":
request_params = {"username": self._username, "page_size": page_size}
else:
request_params = {}

request_params.update(**(extra_params or {}))

response = self._http_client.get(
request_url,
headers={"Authorization": f"JWT {self._fetch_access_token()}"},
params=httpx.QueryParams(**request_params),
)

try:
response.raise_for_status()
except httpx.HTTPStatusError as error_response:
if error_response.response.status_code == TOO_MANY_REQUESTS:
retry_after = error_response.response.headers.get("Retry-After", 60)
delay = int(retry_after) if retry_after.isdigit() else 60
time.sleep(delay)
return self._fetch_with_auth(
request_url, page_size=page_size, extra_params=extra_params
)
raise
return response.json()

def get_edx_course_ids(
self, page_size: int = 100
Expand All @@ -130,7 +33,7 @@ def get_edx_course_ids(
:yield: A generator for walking the paginated list of courses returned from
the API
"""
request_url = f"{self.lms_url}/api/courses/v1/courses/"
request_url = f"{self.base_url}/api/courses/v1/courses/"
response_data = self._fetch_with_auth(request_url, page_size=page_size)
course_data = response_data["results"]
next_page = response_data["pagination"].get("next")
Expand Down Expand Up @@ -175,12 +78,12 @@ def get_course_structure_document(self, course_id: str):
:returns: JSON document representing the hierarchical structure of the target
course.
"""
request_url = f"{self.lms_url}/api/course-structure/v0/{course_id}/"
request_url = f"{self.base_url}/api/course-structure/v0/{course_id}/"
return self._fetch_with_auth(request_url)

def get_course_outline(self, course_id: str):
request_url = (
f"{self.lms_url}/api/learning_sequences/v1/course_outline/{course_id}"
f"{self.base_url}/api/learning_sequences/v1/course_outline/{course_id}"
)
return self._fetch_with_auth(request_url)

Expand Down Expand Up @@ -226,24 +129,6 @@ def get_edxorg_mitx_courses(self):
next_page = response_data["next"]
yield response_data["results"]

def get_sloan_courses(self):
"""
Retrieve the course data from their API as JSON

returns: JSON document representing an array of course objects
"""
course_url = "https://mit-unified-portal-prod-78eeds.43d8q2.usa-e2.cloudhub.io/api/courses"
return self._fetch_with_auth(course_url)

def get_sloan_course_offerings(self):
"""
Retrieve the course offerings data from their API as JSON

returns: JSON document representing an array of course offering objects
"""
course_offering_url = "https://mit-unified-portal-prod-78eeds.43d8q2.usa-e2.cloudhub.io/api/course-offerings"
return self._fetch_with_auth(course_offering_url)


class OpenEdxApiClientFactory(ConfigurableResource):
deployment: str = Field(
Expand All @@ -262,7 +147,7 @@ def _initialize_client(self) -> OpenEdxApiClient:
self._client = OpenEdxApiClient(
client_id=client_secrets["id"],
client_secret=client_secrets["secret"],
lms_url=client_secrets["url"],
base_url=client_secrets["url"],
studio_url=client_secrets["studio_url"],
token_url=client_secrets.get(
"token_url", f"{client_secrets['url']}/oauth2/access_token"
Expand Down