Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AzureAD] Added configuration: admin_role_ids, allowed_user_role_ids, role_claim #488

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion oauthenticator/azuread.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from jupyterhub.auth import LocalAuthenticator
from tornado.httpclient import HTTPRequest
from traitlets import default
from traitlets import List
from traitlets import Unicode

from .oauth2 import OAuthenticator
Expand All @@ -28,6 +29,20 @@ class AzureAdOAuthenticator(OAuthenticator):

tenant_id = Unicode(config=True, help="The Azure Active Directory Tenant ID")

admin_role_ids = List(
Unicode(),
default_value=[],
config=True,
help="The GUIDs of the Azure Active Directory Groups or Application Roles containing admin users",
)

allowed_user_role_ids = List(
Unicode(),
default_value=[],
config=True,
help="The GUIDs of the Azure Active Direcetory Groups or Application Roles containing allowed users",
)

@default('tenant_id')
def _tenant_id_default(self):
return os.environ.get('AAD_TENANT_ID', '')
Expand All @@ -50,6 +65,23 @@ def _token_url_default(self):
self.tenant_id
)

@default('scope')
def _scope_default(self):
return ['openid']

role_claim = Unicode(config=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be documented with a help string. I'm trying to review this code atm with little knowledge of AzureAD, so having a sentence or two to describe what something is about is critical.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@consideRatio are you referring to the scope part or the role_claim part?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment was about te role_claim part, you can pass help="documentation here" to the Unicode() constructor. It will end up in the documentation website then.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disregard.. it's obvious you mean the role_claim trait.


@default("role_claim")
def _role_claim_default(self):
return 'roles'

def _claim_has_role(self, token, role_ids):
if self.role_claim in token.keys():
for role_id in role_ids:
if role_id in token[self.role_claim]:
return True
return False

async def authenticate(self, handler, data=None):
code = handler.get_argument("code")

Expand All @@ -75,6 +107,8 @@ async def authenticate(self, handler, data=None):

resp_json = await self.fetch(req)

self.log.debug("Azure AD Token Response: %s", resp_json)

access_token = resp_json['access_token']
id_token = resp_json['id_token']

Expand All @@ -94,7 +128,19 @@ async def authenticate(self, handler, data=None):
# results in a decoded JWT for the user data
auth_state['user'] = decoded

return userdict
all_roles = list(self.allowed_user_role_ids)
all_roles.extend(self.admin_role_ids)
if self._claim_has_role(decoded, all_roles) or self.allowed_user_role_ids == []:
self.log.debug("Access to Azure AD User %s is permitted.", userdict["name"])
if self._claim_has_role(decoded, self.admin_role_ids):
userdict["admin"] = True
self.log.debug(
"Azure AD User %s has been granted admin privileges",
userdict["name"],
)
return userdict

return None


class LocalAzureAdOAuthenticator(LocalAuthenticator, AzureAdOAuthenticator):
Expand Down
159 changes: 140 additions & 19 deletions oauthenticator/tests/test_azuread.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@ def test_tenant_id_from_env():
assert aad.tenant_id == tenant_id


def user_model(tenant_id, client_id, name):
def user_model(tenant_id, client_id, name, roles=None):
"""Return a user model"""
# model derived from https://docs.microsoft.com/en-us/azure/active-directory/develop/id-tokens#v20
now = int(time.time())
id_token = jwt.encode(
{
"ver": "2.0",
"iss": f"https://login.microsoftonline.com/{tenant_id}/v2.0",
"sub": "AAAAAAAAAAAAAAAAAAAAAIkzqFVrSaSaFHy782bbtaQ",
"aud": client_id,
"exp": now + 3600,
"iat": now,
"nbf": now,
"name": name,
"preferred_username": name,
"oid": str(uuid.uuid1()),
"tid": tenant_id,
"nonce": "123523",
"aio": "Df2UVXL1ix!lMCWMSOJBcFatzcGfvFGhjKv8q5g0x732dR5MB5BisvGQO7YWByjd8iQDLq!eGbIDakyp5mnOrcdqHeYSnltepQmRp6AIZ8jY",
},
os.urandom(5),
)
token_body = {
"ver": "2.0",
"iss": f"https://login.microsoftonline.com/{tenant_id}/v2.0",
"sub": "AAAAAAAAAAAAAAAAAAAAAIkzqFVrSaSaFHy782bbtaQ",
"aud": client_id,
"exp": now + 3600,
"iat": now,
"nbf": now,
"name": name,
"preferred_username": name,
"oid": str(uuid.uuid1()),
"tid": tenant_id,
"nonce": "123523",
"aio": "Df2UVXL1ix!lMCWMSOJBcFatzcGfvFGhjKv8q5g0x732dR5MB5BisvGQO7YWByjd8iQDLq!eGbIDakyp5mnOrcdqHeYSnltepQmRp6AIZ8jY",
}
if roles:
token_body["roles"] = roles
id_token = jwt.encode(token_body, os.urandom(5))
if not PYJWT_2:
id_token = id_token.decode("ascii")

Expand Down Expand Up @@ -98,3 +98,124 @@ async def test_azuread(username_claim, azure_client):

name = user_info['name']
assert name == jwt_user[authenticator.username_claim]


@pytest.mark.parametrize(
'is_admin',
[
True,
False,
],
)
async def test_azuread_admin(is_admin, azure_client):
authenticator = AzureAdOAuthenticator(
tenant_id=str(uuid.uuid1()),
client_id=str(uuid.uuid1()),
client_secret=str(uuid.uuid1()),
admin_role_ids=[str(uuid.uuid1())],
)

roles = []

if is_admin:
roles.extend(authenticator.admin_role_ids)

handler = azure_client.handler_for_user(
user_model(
tenant_id=authenticator.tenant_id,
client_id=authenticator.client_id,
name="somebody",
roles=(roles, None)[roles == []],
)
)

user_info = await authenticator.authenticate(handler)
auth_state = user_info['auth_state']
has_admin_role = False if 'admin' not in user_info.keys() else user_info["admin"]

assert (
sorted(user_info) == ['admin', 'auth_state', 'name']
if is_admin
else sorted(user_info) == ['auth_state', 'name']
)
assert is_admin == has_admin_role
assert is_admin if has_admin_role else not is_admin
assert not is_admin if not has_admin_role else is_admin


@pytest.mark.parametrize(
'is_allowed',
[
True,
False,
],
)
@pytest.mark.parametrize(
'is_admin',
[
True,
False,
],
)
@pytest.mark.parametrize(
'allowed_role_ids',
[
[],
["somevalue"],
["somevalue", "someothervalue"],
],
)
@pytest.mark.parametrize(
'admin_role_ids',
[
[],
["somevalue"],
["somevalue", "someothervalue"],
],
)
async def test_azuread_allowed(
is_allowed, is_admin, allowed_role_ids, admin_role_ids, azure_client
):
authenticator = AzureAdOAuthenticator(
tenant_id=str(uuid.uuid1()),
client_id=str(uuid.uuid1()),
client_secret=str(uuid.uuid1()),
allowed_user_role_ids=allowed_role_ids,
admin_role_ids=admin_role_ids,
)

roles = []

if is_allowed and allowed_role_ids != []:
roles.append(authenticator.allowed_user_role_ids)

if is_admin and admin_role_ids != []:
roles.append(authenticator.admin_role_ids)

handler = azure_client.handler_for_user(
user_model(
tenant_id=authenticator.tenant_id,
client_id=authenticator.client_id,
name="somebody",
roles=(roles, None)[roles == []],
)
)

user_info = await authenticator.authenticate(handler)
authenticated = user_info != None
auth_state = [] if not authenticated else user_info["auth_state"]
user = [] if not authenticated else auth_state["user"]
user_roles = (
[] if not authenticated or 'roles' not in user.keys() else user["roles"]
)

has_allowed_role = [r for r in allowed_role_ids if r in user_roles] != []
has_admin_role = [r for r in admin_role_ids if r in user_roles] != []
allow_required = authenticator.allowed_user_role_ids != []
allowed_as_admin = (allow_required and has_admin_role) or not allow_required
allowed_as_user = (allow_required and has_allowed_role) or not allow_required

if allowed_as_admin or allowed_as_user:
assert authenticated
elif not allowed_as_admin and not allowed_as_user:
assert not authenticated