Skip to content

Commit

Permalink
Add a login button and allow_bypass_login config value in configurati…
Browse files Browse the repository at this point in the history
…on.yaml (#325)

* Added allow_bypass_login configuration

* Update README.md

* Restored default behavior
  • Loading branch information
mpedziwiatr02 authored Aug 14, 2024
1 parent f3b5365 commit 7677712
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 23 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ http:
auth_header:
# Optionally set this if you're not using authentik proxy or oauth2_proxy
# username_header: X-Forwarded-Preferred-Username
# Optionally set this if you don't want to bypass the login prompt
# allow_bypass_login: false
# Optionally enable debug mode to see the headers Home-Assistant gets
# debug: false
# Optionally, if something is not working right, add this block below to get more information
Expand Down
5 changes: 2 additions & 3 deletions custom_components/auth_header/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
{
DOMAIN: vol.Schema(
{
vol.Optional(
"username_header", default="X-Forwarded-Preferred-Username"
): cv.string,
vol.Optional("username_header", default="X-Forwarded-Preferred-Username"): cv.string,
vol.Optional("allow_bypass_login", default=True): cv.boolean,
vol.Optional("debug", default=False): cv.boolean,
}
)
Expand Down
55 changes: 35 additions & 20 deletions custom_components/auth_header/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from homeassistant.core import callback

CONF_USERNAME_HEADER = "username_header"
CONF_ALLOW_BYPASS_LOGIN = "allow_bypass_login"
_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -49,6 +50,7 @@ async def async_login_flow(self, context: Optional[Dict]) -> LoginFlow:
None,
[],
cast(IPAddress, context.get("conn_ip_address")),
self.config[CONF_ALLOW_BYPASS_LOGIN],
)
remote_user = request.headers[header_name].casefold()
# Translate username to id
Expand All @@ -61,6 +63,7 @@ async def async_login_flow(self, context: Optional[Dict]) -> LoginFlow:
remote_user,
available_users,
cast(IPAddress, context.get("conn_ip_address")),
self.config[CONF_ALLOW_BYPASS_LOGIN],
)

async def async_user_meta_for_credentials(
Expand Down Expand Up @@ -119,33 +122,45 @@ def __init__(
remote_user: str,
available_users: List[User],
ip_address: IPAddress,
allow_bypass_login: bool,
) -> None:
"""Initialize the login flow."""
super().__init__(auth_provider)
self._available_users = available_users
self._remote_user = remote_user
self._ip_address = ip_address
self._allow_bypass_login = allow_bypass_login

async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
async def async_step_init(self, user_input=None) -> Dict[str, Any]:
"""Handle the step of the form."""
try:
cast(HeaderAuthProvider, self._auth_provider).async_validate_access(
self._ip_address
)

except InvalidAuthError as exc:
_LOGGER.debug("invalid auth", exc_info=exc)
if user_input is not None or self._allow_bypass_login:
try:
_LOGGER.debug("Validating access for IP: %s", self._ip_address)
cast(HeaderAuthProvider, self._auth_provider).async_validate_access(
self._ip_address
)
except InvalidAuthError as exc:
_LOGGER.debug("Invalid auth: %s", exc)
return self.async_abort(reason="not_allowed")

for user in self._available_users:
_LOGGER.debug("Checking user: %s", user.name)
for cred in user.credentials:
if "username" in cred.data:
_LOGGER.debug("Found username in credentials: %s", cred.data["username"])
if cred.data["username"] == self._remote_user:
_LOGGER.debug("Username match found, finishing login flow")
return await self.async_finish({"user": user.id})
if user.name == self._remote_user:
_LOGGER.debug("User name match found, finishing login flow")
return await self.async_finish({"user": user.id})

_LOGGER.debug("No matching user found")
return self.async_abort(reason="not_allowed")

for user in self._available_users:
for cred in user.credentials:
if "username" in cred.data:
if cred.data["username"] == self._remote_user:
return await self.async_finish({"user": user.id})
if user.name == self._remote_user:
return await self.async_finish({"user": user.id})

_LOGGER.debug("no matching user found")
return self.async_abort(reason="not_allowed")

_LOGGER.debug("Showing login form with remote_user: %s", self._remote_user)
return self.async_show_form(
step_id="init",
data_schema=None,
)

0 comments on commit 7677712

Please sign in to comment.