diff --git a/ldapauthenticator/ldapauthenticator.py b/ldapauthenticator/ldapauthenticator.py index 3d26b4d..969cf02 100644 --- a/ldapauthenticator/ldapauthenticator.py +++ b/ldapauthenticator/ldapauthenticator.py @@ -455,38 +455,49 @@ def resolve_username(self, username_supplied_by_user): search_filter=search_filter, attributes=[self.lookup_dn_user_dn_attribute], ) - response = conn.response - if len(response) == 0: - self.log.warning( - f"Failed to lookup a DN for username '{username_supplied_by_user}'" - ) - return (None, None) - if len(response) > 1: - self.log.warning( - f"Failed to lookup a unique DN for username '{username_supplied_by_user}'" - ) + + # identify unique search response entry + n_entries = len(conn.entries) + if n_entries == 0: + self.log.warning(f"No response looking up '{username_supplied_by_user}'") return (None, None) - if "attributes" not in response[0].keys(): - self.log.warning( - f"Failed to lookup attribute '{self.lookup_dn_user_dn_attribute}' for username '{username_supplied_by_user}'" + if n_entries > 1: + self.log.error( + f"Looking up '{username_supplied_by_user}' gave multiple entries, " + f"expected 0 or 1 search response entries but received {n_entries}. " + "Is lookup_dn_search_filter and user_attribute configured to get a " + "unique match?" ) return (None, None) + entry = conn.entries[0] - userdn = response[0]["dn"] - username = response[0]["attributes"][self.lookup_dn_user_dn_attribute] - if isinstance(username, list): - if len(username) == 0: - return (None, None) - elif len(username) == 1: - username = username[0] + # identify unique attribute value within the entry + attribute_values = entry.entry_attributes_as_dict.get( + self.lookup_dn_user_dn_attribute + ) + if not attribute_values: + if attribute_values is None: + self.log.error( + f"No attribute '{self.lookup_dn_user_dn_attribute}' found. " + "Is lookup_dn_user_dn_attribute configured correctly?" + ) else: self.log.error( - f"A lookup of the username '{username_supplied_by_user}' returned a list " - f"of entries for the attribute '{self.lookup_dn_user_dn_attribute}': " - f"({', '.join(username)})" + f"No attribute values for '{self.lookup_dn_user_dn_attribute}'. " + "Is lookup_dn_user_dn_attribute configured correctly?" ) - return None, None + return (None, None) + if len(attribute_values) > 1: + self.log.error( + f"Attribute '{self.lookup_dn_user_dn_attribute}' had multiple values, " + f"expected one attribute value but it had {len(attribute_values)} " + f"({';'.join(attribute_values)}). " + "Is lookup_dn_user_dn_attribute configured correctly?" + ) + return None, None + userdn = entry.entry_dn + username = attribute_values[0] return (username, userdn) def get_connection(self, userdn, password): @@ -539,17 +550,24 @@ def get_connection(self, userdn, password): return conn def get_user_attributes(self, conn, userdn): - attrs = {} if self.auth_state_attributes: - found = conn.search( + conn.search( search_base=userdn, search_scope=ldap3.SUBTREE, search_filter="(objectClass=*)", attributes=self.auth_state_attributes, ) - if found: - attrs = conn.entries[0].entry_attributes_as_dict - return attrs + + # identify unique search response entry + n_entries = len(conn.entries) + if n_entries == 1: + return conn.entries[0].entry_attributes_as_dict + self.log.error( + f"Expected 1 but got {n_entries} search response entries for DN '{userdn}' " + "when looking up attributes configured via auth_state_attributes. The user's " + "auth state will not include any attributes." + ) + return {} async def authenticate(self, handler, data): """ @@ -584,6 +602,9 @@ async def authenticate(self, handler, data): if self.lookup_dn: resolved_username, resolved_dn = self.resolve_username(login_username) if not resolved_dn: + self.log.warning( + "username:%s Login denied for failed lookup", login_username + ) return None if not bind_dn_template: bind_dn_template = [resolved_dn] @@ -632,18 +653,13 @@ async def authenticate(self, handler, data): ), attributes=self.attributes, ) - n_users = len(conn.response) - if n_users == 0: - self.log.warning( - "Configured search_filter found no user associated with " - f"userattr='{self.user_attribute}' and username='{resolved_username}'" - ) - return None - if n_users > 1: + n_entries = len(conn.entries) + if n_entries != 1: self.log.warning( - "Configured search_filter found multiple users associated with " + f"Login of '{login_username}' denied. Configured search_filter " + f"found {n_entries} users associated with " f"userattr='{self.user_attribute}' and username='{resolved_username}', " - "a unique match is required." + "and a unique match is required." ) return None