|
1 | | -from typing import Any, Callable, Dict, Mapping, Optional, TypedDict |
| 1 | +from typing import Any, Callable, Dict, Mapping, Optional, TypedDict, Union |
2 | 2 | from uuid import uuid4 |
3 | 3 |
|
4 | 4 | import aiocache # type: ignore |
@@ -37,21 +37,32 @@ def key_builder( |
37 | 37 | return key |
38 | 38 |
|
39 | 39 |
|
40 | | -class OpenIDConfigurationTypeDef(TypedDict): |
| 40 | +class JwksUriConfigurationTypeDef(TypedDict): |
41 | 41 | """ |
42 | | - Type definition for the OpenID configuration values relevant to JWT validation. |
| 42 | + Type definition for an OpenID Connect compatible configuration with a jwks_uri. |
43 | 43 | """ |
44 | 44 |
|
45 | 45 | jwks_uri: str |
46 | 46 |
|
47 | 47 |
|
| 48 | +class JwksUrlConfigurationTypeDef(TypedDict): |
| 49 | + """ |
| 50 | + Type definition for a configuration using jwks_url instead of jwks_uri. |
| 51 | + """ |
| 52 | + |
| 53 | + jwks_url: str |
| 54 | + |
| 55 | + |
| 56 | +ConfigurationTypeDef = Union[JwksUriConfigurationTypeDef, JwksUrlConfigurationTypeDef] |
| 57 | + |
| 58 | + |
48 | 59 | class Provider: |
49 | 60 | def __init__( |
50 | 61 | self, |
51 | 62 | iss: str, |
52 | 63 | http_client: HTTPClient, |
53 | 64 | config_path: str = "/.well-known/openid-configuration", |
54 | | - static_config: Optional[OpenIDConfigurationTypeDef] = None, |
| 65 | + static_config: Optional[ConfigurationTypeDef] = None, |
55 | 66 | ) -> None: |
56 | 67 | self.iss = iss |
57 | 68 | self.http_client = http_client |
@@ -85,17 +96,24 @@ async def get_configuration(self) -> Mapping[str, Any]: |
85 | 96 |
|
86 | 97 | async def _get_jwks_uri(self) -> str: |
87 | 98 | """ |
88 | | - Retrieve the uri to JWKs. |
| 99 | + Retrieve the uri/url to JWKs. |
89 | 100 |
|
90 | | - :return: The uri to the JWKs. |
| 101 | + :return: The uri/url to the JWKs. |
91 | 102 | :raise JWTHTTPFetchError: If there's a problem fetching the data. |
92 | | - :raise JWTProviderConfigError: If the config doesn't contain "jwks_uri". |
| 103 | + :raise JWTProviderConfigError: If the config doesn't contain "jwks_uri" or |
| 104 | + "jwks_url". |
93 | 105 | """ |
94 | 106 | conf = await self.get_configuration() |
| 107 | + jwks_uri: str |
95 | 108 | try: |
96 | | - jwks_uri: str = conf["jwks_uri"] |
| 109 | + jwks_uri = conf["jwks_uri"] |
97 | 110 | except KeyError as e: |
98 | | - raise JWTProviderConfigError("Missing 'jwks_uri' in configuration") from e |
| 111 | + try: |
| 112 | + jwks_uri = conf["jwks_url"] |
| 113 | + except KeyError: |
| 114 | + raise JWTProviderConfigError( |
| 115 | + "Missing 'jwks_uri' and 'jwks_url' in configuration" |
| 116 | + ) from e |
99 | 117 | return jwks_uri |
100 | 118 |
|
101 | 119 | @aiocache.cached(ttl=300, key_builder=key_builder) |
|
0 commit comments