diff --git a/ldapauthenticator/ldapauthenticator.py b/ldapauthenticator/ldapauthenticator.py index 4e3a809..1a6844f 100644 --- a/ldapauthenticator/ldapauthenticator.py +++ b/ldapauthenticator/ldapauthenticator.py @@ -86,6 +86,20 @@ def _server_port_default(self): """, ) + use_search_user_to_check_groups = Bool( + False, + config=True, + help=""" + If set to `True` the search user is used to check if the user to be authenticated is in + one of the `allowed_groups`. + + By default the ldap connection of the user to be authenticated is used. This option is + useful in ldap environments where the users itself don't have the permission to access the + ldap groups in which they are members. The configured search user needs the permission to + access ldap groups though. + """ + ) + # FIXME: Use something other than this? THIS IS LAME, akin to websites restricting things you # can use in usernames / passwords to protect from SQL injection! valid_username_regex = Unicode( @@ -229,19 +243,17 @@ def _server_port_default(self): """, ) - def resolve_username(self, username_supplied_by_user): - search_dn = self.lookup_dn_search_user - if self.escape_userdn: - search_dn = escape_filter_chars(search_dn) - conn = self.get_connection( - userdn=search_dn, password=self.lookup_dn_search_password - ) - is_bound = conn.bind() - if not is_bound: - msg = "Failed to connect to LDAP server with search user '{search_dn}'" - self.log.warning(msg.format(search_dn=search_dn)) - return (None, None) + def resolve_username(self, connection, username_supplied_by_user): + """Resolves the user name and user dn of the user being authenticated. + + Args: + connection: the ldap connection to use + username_supplied_by_user: the username supplied by the user + + Returns: + tuple: value of ldap attribute `lookup_dn_user_dn_attribute` on user entry and the user dn + """ search_filter = self.lookup_dn_search_filter.format( login_attr=self.user_attribute, login=username_supplied_by_user ) @@ -260,13 +272,13 @@ def resolve_username(self, username_supplied_by_user): attributes=self.user_attribute, ) ) - conn.search( + connection.search( search_base=self.user_search_base, search_scope=ldap3.SUBTREE, search_filter=search_filter, attributes=[self.lookup_dn_user_dn_attribute], ) - response = conn.response + response = connection.response if len(response) == 0 or "attributes" not in response[0].keys(): msg = ( "No entry found for user '{username}' " @@ -357,8 +369,21 @@ def authenticate(self, handler, data): ) return None + # connect to ldap with search user + search_dn = self.lookup_dn_search_user + if self.escape_userdn: + search_dn = escape_filter_chars(search_dn) + connection_search = self.get_connection( + userdn=search_dn, password=self.lookup_dn_search_password + ) + if not connection_search.bind(): + msg = "Failed to connect to LDAP server with search user '{search_dn}'" + self.log.warning(msg.format(search_dn=search_dn)) + return None + + # lookup dn of user to be authenticated if self.lookup_dn: - username, resolved_dn = self.resolve_username(username) + username, resolved_dn = self.resolve_username(connection_search, username) if not username: return None if str(self.lookup_dn_user_dn_attribute).upper() == "CN": @@ -367,6 +392,7 @@ def authenticate(self, handler, data): if not bind_dn_template: bind_dn_template = [resolved_dn] + # try to authenticate user is_bound = False for dn in bind_dn_template: if not dn: @@ -379,7 +405,7 @@ def authenticate(self, handler, data): self.log.debug(msg.format(username=username, userdn=userdn)) msg = "Status of user bind {username} with {userdn} : {is_bound}" try: - conn = self.get_connection(userdn, password) + connection_user = self.get_connection(userdn, password) except ldap3.core.exceptions.LDAPBindError as exc: is_bound = False msg += "\n{exc_type}: {exc_msg}".format( @@ -387,7 +413,7 @@ def authenticate(self, handler, data): exc_msg=exc.args[0] if exc.args else "", ) else: - is_bound = True if conn.bound else conn.bind() + is_bound = True if connection_user.bound else connection_user.bind() msg = msg.format(username=username, userdn=userdn, is_bound=is_bound) self.log.debug(msg) if is_bound: @@ -398,17 +424,18 @@ def authenticate(self, handler, data): self.log.warning(msg.format(username=username)) return None + # validate user access by applying the search filter if self.search_filter: search_filter = self.search_filter.format( userattr=self.user_attribute, username=username ) - conn.search( + connection_user.search( search_base=self.user_search_base, search_scope=ldap3.SUBTREE, search_filter=search_filter, attributes=self.attributes, ) - n_users = len(conn.response) + n_users = len(connection_user.response) if n_users == 0: msg = "User with '{userattr}={username}' not found in directory" self.log.warning( @@ -427,6 +454,7 @@ def authenticate(self, handler, data): ) return None + # check if user is member in any of the allowed ldap groups if self.allowed_groups: self.log.debug("username:%s Using dn %s", username, userdn) found = False @@ -440,12 +468,21 @@ def authenticate(self, handler, data): ) group_filter = group_filter.format(userdn=userdn, uid=username) group_attributes = ["member", "uniqueMember", "memberUid"] - found = conn.search( - group, - search_scope=ldap3.BASE, - search_filter=group_filter, - attributes=group_attributes, - ) + # check which ldap connection to use: user (default) or search user + if self.use_search_user_to_check_groups is True: + found = connection_search.search( + group, + search_scope=ldap3.BASE, + search_filter=group_filter, + attributes=group_attributes, + ) + else: + found = connection_user.search( + group, + search_scope=ldap3.BASE, + search_filter=group_filter, + attributes=group_attributes, + ) if found: break if not found: @@ -457,7 +494,7 @@ def authenticate(self, handler, data): if not self.use_lookup_dn_username: username = data["username"] - user_info = self.get_user_attributes(conn, userdn) + user_info = self.get_user_attributes(connection_user, userdn) if user_info: self.log.debug("username:%s attributes:%s", username, user_info) return {"name": username, "auth_state": user_info} diff --git a/ldapauthenticator/tests/test_ldapconnection_usage.py b/ldapauthenticator/tests/test_ldapconnection_usage.py new file mode 100755 index 0000000..ca68cdf --- /dev/null +++ b/ldapauthenticator/tests/test_ldapconnection_usage.py @@ -0,0 +1,126 @@ +"""Tests the usage of both ldap connections of the ldap authenticator. +""" +import os +from unittest.mock import MagicMock, call, ANY +import pytest + +from ..ldapauthenticator import LDAPAuthenticator + + +@pytest.fixture +def authenticator_setup(): + """Provides a configured and mocked authenticator as well as a mocked search and user connection. + + Note: don't be confused with the connection settings here, they just serve as valid configuration and are not used. + """ + # configure authenticator + authenticator = LDAPAuthenticator() + authenticator.server_address = "localhost" + authenticator.lookup_dn = False + authenticator.bind_dn_template = "cn={username},ou=people,dc=planetexpress,dc=com" + authenticator.user_search_base = "ou=people,dc=planetexpress,dc=com" + authenticator.user_attribute = "uid" + authenticator.lookup_dn_user_dn_attribute = "cn" + authenticator.escape_userdn = True + authenticator.attributes = ["uid", "cn", "mail", "ou"] + authenticator.use_lookup_dn_username = False + # leela is being authenticated, she's member of that group + authenticator.allowed_groups = [ + "cn=ship_crew,ou=people,dc=planetexpress,dc=com", + ] + # search user is 'hermes' + authenticator.lookup_dn_search_user = 'hermes' + authenticator.lookup_dn_search_password = 'hermes' + + # mock ldap connections: return either the one for the search user or for the user to be authenticated + connection_search_mock = MagicMock() + connection_user_mock = MagicMock() + def connection_mock(*args, **kwargs): + if 'userdn' in kwargs and kwargs['userdn'] == 'hermes': + return connection_search_mock + else: + return connection_user_mock + authenticator.get_connection = MagicMock( side_effect = connection_mock ) + + # 1) search: bind method should return True + connection_search_mock.bind = MagicMock( return_value = True ) + + # 2) search: lookup dn of user to be authenticated » deactivated + + # 3) user: bound should be False so that bind method is called returning True + connection_user_mock.bound = False + connection_user_mock.bind = MagicMock( return_value = True) + + # 4) user: search filter are empty » deactivated + + # 5) search or user: allowed groups » configured in test methods + + # 6) user: get_user_attributes(connection, userdn) » returns dict with entry attributes + authenticator.get_user_attributes = MagicMock( return_value = { 'uid': 'leela', 'cn': 'Turanga Leela' } ) + + return authenticator, connection_search_mock, connection_user_mock + + +async def test_allowed_groups_check_with_user_connection(authenticator_setup): + """Checks the method calls on both ldap connections when the `allowed_groups` are check with the + connection of the user being authenticated (default). + """ + # unpack + assert object setup + authenticator, connection_search_mock, connection_user_mock = authenticator_setup + assert authenticator is not None and connection_search_mock is not None and connection_user_mock is not None + assert authenticator.lookup_dn is False + assert not authenticator.search_filter + + # assert default value + assert authenticator.use_search_user_to_check_groups is False + + # 5) user: allowed groups » simply return True for the one group + connection_user_mock.search = MagicMock( return_value = True ) + + # authenticate leela + result = await authenticator.authenticate( None, { 'username': 'leela', 'password': 'leela' } ) + assert 'name' in result + assert result['name'] == 'leela' + + # assert method calls on mocks + expected_search_mock_calls = [ + call.bind(), + ] + assert connection_search_mock.mock_calls == expected_search_mock_calls + expected_user_mock_calls = [ + call.bind(), + call.search('cn=ship_crew,ou=people,dc=planetexpress,dc=com', search_scope = ANY, search_filter = ANY, attributes = ANY) + ] + assert connection_user_mock.mock_calls == expected_user_mock_calls + +async def test_allowed_groups_check_with_search_connection(authenticator_setup): + """Checks the method calls on both ldap connections when the `allowed_groups` are check with the + connection of the configured search user. + """ + # unpack + assert object setup + authenticator, connection_search_mock, connection_user_mock = authenticator_setup + assert authenticator is not None and connection_search_mock is not None and connection_user_mock is not None + assert authenticator.lookup_dn is False + assert not authenticator.search_filter + + # enable allowed groups check using the search user connection + authenticator.use_search_user_to_check_groups = True + + # 5) search: allowed groups » simply return True for the one group + connection_search_mock.search = MagicMock( return_value = True ) + + # authenticate leela + result = await authenticator.authenticate( None, { 'username': 'leela', 'password': 'leela' } ) + assert 'name' in result + assert result['name'] == 'leela' + + # assert method calls on mocks + expected_search_mock_calls = [ + call.bind(), + call.search('cn=ship_crew,ou=people,dc=planetexpress,dc=com', search_scope = ANY, search_filter = ANY, attributes = ANY) + ] + assert connection_search_mock.mock_calls == expected_search_mock_calls + expected_user_mock_calls = [ + call.bind(), + ] + assert connection_user_mock.mock_calls == expected_user_mock_calls