Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parsing of search response #294

Merged
merged 8 commits into from
Nov 5, 2024
86 changes: 47 additions & 39 deletions ldapauthenticator/ldapauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,38 +455,44 @@ def resolve_username(self, username_supplied_by_user):
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute],
)
response = conn.response
if len(response) == 0:

# identify unique search response entry
n_entries = len(conn.entries)
if n_entries == 0:
self.log.warning(
f"Failed to lookup a DN for username '{username_supplied_by_user}'"
f"Login of '{username_supplied_by_user}' denied, failed to lookup a DN"
)
return (None, None)
if len(response) > 1:
self.log.warning(
f"Failed to lookup a unique DN for username '{username_supplied_by_user}'"
if n_entries > 1:
self.log.error(
f"Login of '{username_supplied_by_user}' denied, expected 0 or 1 "
f"search response entries but received {n_entries}. Is lookup_dn_search_filter "
"and user_attribute configured to uniquely match against a DN?"
)
return (None, None)
if "attributes" not in response[0].keys():
self.log.warning(
f"Failed to lookup attribute '{self.lookup_dn_user_dn_attribute}' for username '{username_supplied_by_user}'"
entry = conn.entries[0]

# identify unique attribute value within the entry
attribute_values = entry.entry_attributes_as_dict.get(
self.lookup_dn_user_dn_attribute
)
if not attribute_values:
self.log.error(
f"Login of '{username_supplied_by_user}' denied, failed to lookup attribute "
f"'{self.lookup_dn_user_dn_attribute}'. Is lookup_dn_user_dn_attribute "
"configured correctly?"
)
return (None, None)
if len(attribute_values) > 1:
self.log.error(
f"Login of '{username_supplied_by_user}' denied, lookup of attribute "
f"'{self.lookup_dn_user_dn_attribute}' gave multiple values but only "
"one is expected. Is lookup_dn_user_dn_attribute configured correctly?"
)
return None, None

userdn = response[0]["dn"]
username = response[0]["attributes"][self.lookup_dn_user_dn_attribute]
if isinstance(username, list):
if len(username) == 0:
return (None, None)
elif len(username) == 1:
username = username[0]
else:
self.log.error(
f"A lookup of the username '{username_supplied_by_user}' returned a list "
f"of entries for the attribute '{self.lookup_dn_user_dn_attribute}': "
f"({', '.join(username)})"
)
return None, None

userdn = entry.entry_dn
username = attribute_values[0]
return (username, userdn)

def get_connection(self, userdn, password):
Expand Down Expand Up @@ -539,17 +545,24 @@ def get_connection(self, userdn, password):
return conn

def get_user_attributes(self, conn, userdn):
attrs = {}
if self.auth_state_attributes:
found = conn.search(
conn.search(
search_base=userdn,
search_scope=ldap3.SUBTREE,
search_filter="(objectClass=*)",
attributes=self.auth_state_attributes,
)
if found:
attrs = conn.entries[0].entry_attributes_as_dict
return attrs

# identify unique search response entry
n_entries = len(conn.entries)
if n_entries == 1:
return conn.entries[0].entry_attributes_as_dict
self.log.error(
f"Expected 1 but got {n_entries} search response entries for DN '{userdn}' "
"when looking up attributes configured via auth_state_attributes. The user's "
"auth state will not include any attributes."
)
return {}

async def authenticate(self, handler, data):
"""
Expand Down Expand Up @@ -632,18 +645,13 @@ async def authenticate(self, handler, data):
),
attributes=self.attributes,
)
n_users = len(conn.response)
if n_users == 0:
self.log.warning(
"Configured search_filter found no user associated with "
f"userattr='{self.user_attribute}' and username='{resolved_username}'"
)
return None
if n_users > 1:
n_entries = len(conn.entries)
if n_entries != 1:
self.log.warning(
"Configured search_filter found multiple users associated with "
f"Login of '{login_username}' denied. Configured search_filter "
f"found {n_entries} users associated with "
f"userattr='{self.user_attribute}' and username='{resolved_username}', "
"a unique match is required."
"and a unique match is required."
)
return None

Expand Down