Skip to content

Commit

Permalink
[task] add option to check allowed groups using the ldap connection o…
Browse files Browse the repository at this point in the history
…f the search user instead of the ldap connection of the user being authenticated, see jupyterhub#183
  • Loading branch information
tobias haubold committed Jan 26, 2022
1 parent 31d70f8 commit 702b475
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 26 deletions.
89 changes: 63 additions & 26 deletions ldapauthenticator/ldapauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ def _server_port_default(self):
""",
)

use_search_user_to_check_groups = Bool(
False,
config=True,
help="""
If set to `True` the search user is used to check if the user to be authenticated is in
one of the `allowed_groups`.
By default the ldap connection of the user to be authenticated is used. This option is
useful in ldap environments where the users itself don't have the permission to access the
ldap groups in which they are members. The configured search user needs the permission to
access ldap groups though.
"""
)

# FIXME: Use something other than this? THIS IS LAME, akin to websites restricting things you
# can use in usernames / passwords to protect from SQL injection!
valid_username_regex = Unicode(
Expand Down Expand Up @@ -229,19 +243,17 @@ def _server_port_default(self):
""",
)

def resolve_username(self, username_supplied_by_user):
search_dn = self.lookup_dn_search_user
if self.escape_userdn:
search_dn = escape_filter_chars(search_dn)
conn = self.get_connection(
userdn=search_dn, password=self.lookup_dn_search_password
)
is_bound = conn.bind()
if not is_bound:
msg = "Failed to connect to LDAP server with search user '{search_dn}'"
self.log.warning(msg.format(search_dn=search_dn))
return (None, None)

def resolve_username(self, connection, username_supplied_by_user):
"""Resolves the user name and user dn of the user being authenticated.
Args:
connection: the ldap connection to use
username_supplied_by_user: the username supplied by the user
Returns:
tuple: value of ldap attribute `lookup_dn_user_dn_attribute` on user entry and the user dn
"""
search_filter = self.lookup_dn_search_filter.format(
login_attr=self.user_attribute, login=username_supplied_by_user
)
Expand All @@ -260,13 +272,13 @@ def resolve_username(self, username_supplied_by_user):
attributes=self.user_attribute,
)
)
conn.search(
connection.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute],
)
response = conn.response
response = connection.response
if len(response) == 0 or "attributes" not in response[0].keys():
msg = (
"No entry found for user '{username}' "
Expand Down Expand Up @@ -357,8 +369,21 @@ def authenticate(self, handler, data):
)
return None

# connect to ldap with search user
search_dn = self.lookup_dn_search_user
if self.escape_userdn:
search_dn = escape_filter_chars(search_dn)
connection_search = self.get_connection(
userdn=search_dn, password=self.lookup_dn_search_password
)
if not connection_search.bind():
msg = "Failed to connect to LDAP server with search user '{search_dn}'"
self.log.warning(msg.format(search_dn=search_dn))
return None

# lookup dn of user to be authenticated
if self.lookup_dn:
username, resolved_dn = self.resolve_username(username)
username, resolved_dn = self.resolve_username(connection_search, username)
if not username:
return None
if str(self.lookup_dn_user_dn_attribute).upper() == "CN":
Expand All @@ -367,6 +392,7 @@ def authenticate(self, handler, data):
if not bind_dn_template:
bind_dn_template = [resolved_dn]

# try to authenticate user
is_bound = False
for dn in bind_dn_template:
if not dn:
Expand All @@ -379,15 +405,15 @@ def authenticate(self, handler, data):
self.log.debug(msg.format(username=username, userdn=userdn))
msg = "Status of user bind {username} with {userdn} : {is_bound}"
try:
conn = self.get_connection(userdn, password)
connection_user = self.get_connection(userdn, password)
except ldap3.core.exceptions.LDAPBindError as exc:
is_bound = False
msg += "\n{exc_type}: {exc_msg}".format(
exc_type=exc.__class__.__name__,
exc_msg=exc.args[0] if exc.args else "",
)
else:
is_bound = True if conn.bound else conn.bind()
is_bound = True if connection_user.bound else connection_user.bind()
msg = msg.format(username=username, userdn=userdn, is_bound=is_bound)
self.log.debug(msg)
if is_bound:
Expand All @@ -398,17 +424,18 @@ def authenticate(self, handler, data):
self.log.warning(msg.format(username=username))
return None

# validate user access by applying the search filter
if self.search_filter:
search_filter = self.search_filter.format(
userattr=self.user_attribute, username=username
)
conn.search(
connection_user.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=self.attributes,
)
n_users = len(conn.response)
n_users = len(connection_user.response)
if n_users == 0:
msg = "User with '{userattr}={username}' not found in directory"
self.log.warning(
Expand All @@ -427,6 +454,7 @@ def authenticate(self, handler, data):
)
return None

# check if user is member in any of the allowed ldap groups
if self.allowed_groups:
self.log.debug("username:%s Using dn %s", username, userdn)
found = False
Expand All @@ -440,12 +468,21 @@ def authenticate(self, handler, data):
)
group_filter = group_filter.format(userdn=userdn, uid=username)
group_attributes = ["member", "uniqueMember", "memberUid"]
found = conn.search(
group,
search_scope=ldap3.BASE,
search_filter=group_filter,
attributes=group_attributes,
)
# check which ldap connection to use: user (default) or search user
if self.use_search_user_to_check_groups is True:
found = connection_search.search(
group,
search_scope=ldap3.BASE,
search_filter=group_filter,
attributes=group_attributes,
)
else:
found = connection_user.search(
group,
search_scope=ldap3.BASE,
search_filter=group_filter,
attributes=group_attributes,
)
if found:
break
if not found:
Expand All @@ -457,7 +494,7 @@ def authenticate(self, handler, data):
if not self.use_lookup_dn_username:
username = data["username"]

user_info = self.get_user_attributes(conn, userdn)
user_info = self.get_user_attributes(connection_user, userdn)
if user_info:
self.log.debug("username:%s attributes:%s", username, user_info)
return {"name": username, "auth_state": user_info}
Expand Down
126 changes: 126 additions & 0 deletions ldapauthenticator/tests/test_ldapconnection_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Tests the usage of both ldap connections of the ldap authenticator.
"""
import os
from unittest.mock import MagicMock, call, ANY
import pytest

from ..ldapauthenticator import LDAPAuthenticator


@pytest.fixture
def authenticator_setup():
"""Provides a configured and mocked authenticator as well as a mocked search and user connection.
Note: don't be confused with the connection settings here, they just serve as valid configuration and are not used.
"""
# configure authenticator
authenticator = LDAPAuthenticator()
authenticator.server_address = "localhost"
authenticator.lookup_dn = False
authenticator.bind_dn_template = "cn={username},ou=people,dc=planetexpress,dc=com"
authenticator.user_search_base = "ou=people,dc=planetexpress,dc=com"
authenticator.user_attribute = "uid"
authenticator.lookup_dn_user_dn_attribute = "cn"
authenticator.escape_userdn = True
authenticator.attributes = ["uid", "cn", "mail", "ou"]
authenticator.use_lookup_dn_username = False
# leela is being authenticated, she's member of that group
authenticator.allowed_groups = [
"cn=ship_crew,ou=people,dc=planetexpress,dc=com",
]
# search user is 'hermes'
authenticator.lookup_dn_search_user = 'hermes'
authenticator.lookup_dn_search_password = 'hermes'

# mock ldap connections: return either the one for the search user or for the user to be authenticated
connection_search_mock = MagicMock()
connection_user_mock = MagicMock()
def connection_mock(*args, **kwargs):
if 'userdn' in kwargs and kwargs['userdn'] == 'hermes':
return connection_search_mock
else:
return connection_user_mock
authenticator.get_connection = MagicMock( side_effect = connection_mock )

# 1) search: bind method should return True
connection_search_mock.bind = MagicMock( return_value = True )

# 2) search: lookup dn of user to be authenticated » deactivated

# 3) user: bound should be False so that bind method is called returning True
connection_user_mock.bound = False
connection_user_mock.bind = MagicMock( return_value = True)

# 4) user: search filter are empty » deactivated

# 5) search or user: allowed groups » configured in test methods

# 6) user: get_user_attributes(connection, userdn) » returns dict with entry attributes
authenticator.get_user_attributes = MagicMock( return_value = { 'uid': 'leela', 'cn': 'Turanga Leela' } )

return authenticator, connection_search_mock, connection_user_mock


async def test_allowed_groups_check_with_user_connection(authenticator_setup):
"""Checks the method calls on both ldap connections when the `allowed_groups` are check with the
connection of the user being authenticated (default).
"""
# unpack + assert object setup
authenticator, connection_search_mock, connection_user_mock = authenticator_setup
assert authenticator is not None and connection_search_mock is not None and connection_user_mock is not None
assert authenticator.lookup_dn is False
assert not authenticator.search_filter

# assert default value
assert authenticator.use_search_user_to_check_groups is False

# 5) user: allowed groups » simply return True for the one group
connection_user_mock.search = MagicMock( return_value = True )

# authenticate leela
result = await authenticator.authenticate( None, { 'username': 'leela', 'password': 'leela' } )
assert 'name' in result
assert result['name'] == 'leela'

# assert method calls on mocks
expected_search_mock_calls = [
call.bind(),
]
assert connection_search_mock.mock_calls == expected_search_mock_calls
expected_user_mock_calls = [
call.bind(),
call.search('cn=ship_crew,ou=people,dc=planetexpress,dc=com', search_scope = ANY, search_filter = ANY, attributes = ANY)
]
assert connection_user_mock.mock_calls == expected_user_mock_calls

async def test_allowed_groups_check_with_search_connection(authenticator_setup):
"""Checks the method calls on both ldap connections when the `allowed_groups` are check with the
connection of the configured search user.
"""
# unpack + assert object setup
authenticator, connection_search_mock, connection_user_mock = authenticator_setup
assert authenticator is not None and connection_search_mock is not None and connection_user_mock is not None
assert authenticator.lookup_dn is False
assert not authenticator.search_filter

# enable allowed groups check using the search user connection
authenticator.use_search_user_to_check_groups = True

# 5) search: allowed groups » simply return True for the one group
connection_search_mock.search = MagicMock( return_value = True )

# authenticate leela
result = await authenticator.authenticate( None, { 'username': 'leela', 'password': 'leela' } )
assert 'name' in result
assert result['name'] == 'leela'

# assert method calls on mocks
expected_search_mock_calls = [
call.bind(),
call.search('cn=ship_crew,ou=people,dc=planetexpress,dc=com', search_scope = ANY, search_filter = ANY, attributes = ANY)
]
assert connection_search_mock.mock_calls == expected_search_mock_calls
expected_user_mock_calls = [
call.bind(),
]
assert connection_user_mock.mock_calls == expected_user_mock_calls

0 comments on commit 702b475

Please sign in to comment.