From d62f6933003887449686610967111d2a3fd115aa Mon Sep 17 00:00:00 2001 From: Patrick Titzler Date: Mon, 8 Nov 2021 09:47:47 -0800 Subject: [PATCH] Improve KF authentication handling (#2257) Enables the user to choose an authentication type for Kubeflow Runtime configurations Closes #2240 Closes #2107 Closes #2108 --- docs/source/user_guide/runtime-conf.md | 19 +- elyra/metadata/schemas/kfp.json | 34 +- elyra/metadata/schemasproviders.py | 14 + elyra/pipeline/kfp/kfp_authentication.py | 772 +++++++++++++++++++++++ elyra/pipeline/kfp/kfp_metadata.py | 46 ++ elyra/pipeline/kfp/processor_kfp.py | 139 +--- elyra/pipeline/runtimes_metadata.py | 20 + elyra/templates/kfp/kfp_template.jinja2 | 126 +--- 8 files changed, 921 insertions(+), 249 deletions(-) create mode 100644 elyra/pipeline/kfp/kfp_authentication.py create mode 100644 elyra/pipeline/kfp/kfp_metadata.py create mode 100644 elyra/pipeline/runtimes_metadata.py diff --git a/docs/source/user_guide/runtime-conf.md b/docs/source/user_guide/runtime-conf.md index ba61e35f9..04bdb0a22 100644 --- a/docs/source/user_guide/runtime-conf.md +++ b/docs/source/user_guide/runtime-conf.md @@ -92,6 +92,7 @@ To create a runtime configuration for a Kubeflow Pipelines deployment: elyra-metadata install runtimes \ --display_name="My Kubeflow Pipelines Runtime" \ --api_endpoint=https://kubernetes-service.ibm.com/pipeline \ + --auth_type="DEX_STATIC_PASSWORDS" \ --api_username=username@email.com \ --api_password=mypassword \ --engine=Argo \ @@ -115,6 +116,7 @@ elyra-metadata install runtimes \ --name="my_kubeflow_pipelines_runtime" \ --display_name="My Kubeflow Pipelines Runtime" \ --api_endpoint=https://kubernetes-service.ibm.com/pipeline \ + --auth_type="DEX_STATIC_PASSWORDS" \ --api_username=username@email.com \ --api_password=mynewpassword \ --engine=Argo \ @@ -181,16 +183,23 @@ The namespace used to run your pipeline in Kubeflow Pipelines. This setting is r Example: `anonymous` +##### Kubeflow authentication type (auth_type) +Authentication type Elyra uses to gain access to Kubeflow Pipelines. This setting is required. Supported types are: +- No authentication (`NO_AUTHENTICATION`). +- Kubernetes service account token (`KUBERNETES_SERVICE _ACCOUNT_TOKEN`). This authentication type is only supported if Elyra runs as a pod in Kubernetes, e.g. as a Kubeflow notebook server. You must configure a service account token in Kubernetes, as outlined [here](https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/#multi-user-mode). +- DEX configured for static password authentication (`DEX_STATIC_PASSWORDS`). This authentication requires a username and a password. +- DEX configured for LDAP authentication (`DEX_LDAP`). This authentication requires a username and a password. +- DEX (`DEX_LEGACY`). Use this type only if none of the other authentication types applies or if your Kubeflow deployment is not configured for any other listed type. This authentication requires a username and a password. + ##### Kubeflow Pipelines API endpoint username (api_username) -Username used to access your KubeFlow Pipelines API endpoint. This setting is required if the Kubeflow Pipelines deployment is multi-user, auth enabled. -Currently, only Dex `staticPasswords` and `LDAP Connector` authentication types are supported. -(NOTE: if multiple Dex authentication types are enabled, we will try to use `staticPasswords`) +A username is required for most authentication types. Refer to the Kubeflow authentication type setting for details. Example: `user@example.com` -##### Kubeflow Pipelines API endpoint (api_password) -Password used to access your KubeFlow Pipelines API endpoint. This setting is required if the Kubeflow Pipelines deployment is multi-user, auth enabled. +##### Kubeflow Pipelines API endpoint password (api_password) + +A password is required for most authentication types. Refer to the Kubeflow authentication type setting for details. Example: `mypassword` diff --git a/elyra/metadata/schemas/kfp.json b/elyra/metadata/schemas/kfp.json index f5138697c..a4b5a76f4 100644 --- a/elyra/metadata/schemas/kfp.json +++ b/elyra/metadata/schemas/kfp.json @@ -6,6 +6,7 @@ "display_name": "Kubeflow Pipelines", "schemaspace": "runtimes", "schemaspace_id": "130b8e00-de7c-4b32-b553-b4a52824a3b5", + "metadata_class_name": "elyra.pipeline.kfp.kfp_metadata.KfpMetadata", "uihints": { "title": "Kubeflow Pipelines runtimes", "icon": "elyra:runtimes", @@ -53,6 +54,28 @@ "category": "Kubeflow Pipelines" } }, + "engine": { + "title": "Kubeflow Pipelines engine", + "description": "The Kubeflow Pipelines engine in use", + "type": "string", + "enum": ["Argo", "Tekton"], + "default": "Argo", + "uihints": { + "field_type": "dropdown", + "category": "Kubeflow Pipelines" + } + }, + "auth_type": { + "title": "Authentication Type", + "description": "Authentication type Elyra uses to authenticate with Kubeflow", + "type": "string", + "enum": ["{AUTH_PROVIDER_PLACEHOLDERS}"], + "default": "{DEFAULT_AUTH_PROVIDER_PLACEHOLDER}", + "uihints": { + "field_type": "dropdown", + "category": "Kubeflow Pipelines" + } + }, "api_username": { "title": "Kubeflow Pipelines API Endpoint Username", "description": "The Kubeflow Pipelines API endpoint username", @@ -70,17 +93,6 @@ "category": "Kubeflow Pipelines" } }, - "engine": { - "title": "Kubeflow Pipelines engine", - "description": "The Kubeflow Pipelines engine in use", - "type": "string", - "enum": ["Argo", "Tekton"], - "default": "Argo", - "uihints": { - "field_type": "dropdown", - "category": "Kubeflow Pipelines" - } - }, "cos_endpoint": { "title": "Cloud Object Storage Endpoint", "description": "The Cloud Object Storage endpoint", diff --git a/elyra/metadata/schemasproviders.py b/elyra/metadata/schemasproviders.py index 0a30e518f..5b9877279 100644 --- a/elyra/metadata/schemasproviders.py +++ b/elyra/metadata/schemasproviders.py @@ -33,6 +33,7 @@ from elyra.metadata.schemaspaces import ComponentRegistries from elyra.metadata.schemaspaces import RuntimeImages from elyra.metadata.schemaspaces import Runtimes +from elyra.pipeline.kfp.kfp_authentication import SupportedAuthProviders class ElyraSchemasProvider(SchemasProvider, metaclass=ABCMeta): @@ -96,6 +97,19 @@ def get_schemas(self) -> List[Dict]: if 'Tekton' in engine_enum: engine_enum.remove('Tekton') schema['properties']['metadata']['properties']['engine']['enum'] = engine_enum + + # For KFP schemas replace placeholders: + # - properties.metadata.properties.auth_type.enum ({AUTH_PROVIDER_PLACEHOLDERS}) + # - properties.metadata.properties.auth_type.default ({DEFAULT_AUTH_PROVIDER_PLACEHOLDER}) + auth_type_enum = SupportedAuthProviders.get_provider_names() + auth_type_default = SupportedAuthProviders.get_default_provider().name + + for schema in runtime_schemas: + if schema['name'] == 'kfp': + if schema['properties']['metadata']['properties'].get('auth_type') is not None: + schema['properties']['metadata']['properties']['auth_type']['enum'] = auth_type_enum + schema['properties']['metadata']['properties']['auth_type']['default'] = auth_type_default + return runtime_schemas diff --git a/elyra/pipeline/kfp/kfp_authentication.py b/elyra/pipeline/kfp/kfp_authentication.py new file mode 100644 index 000000000..5b5d33ccc --- /dev/null +++ b/elyra/pipeline/kfp/kfp_authentication.py @@ -0,0 +1,772 @@ +# +# Copyright 2018-2021 Elyra Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from abc import ABC +from abc import abstractmethod +from enum import Enum +from http import HTTPStatus +import os +import re +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from urllib.parse import urlsplit + +from kfp.auth import KF_PIPELINES_SA_TOKEN_ENV +from kfp.auth import KF_PIPELINES_SA_TOKEN_PATH +import requests + + +def _empty_or_whitespaces_only(a_string: str) -> bool: + """ + Utility function: evaluates whether a_string is None or contains + only whitespaces. + + :param a_string: string to be evaluated + :type string: str + :return: True if a_string is None or contains only whitespaces + :rtype: Boolean + """ + if a_string is None or len(a_string.strip()) == 0: + return True + return False + + +class SupportedAuthProviders(Enum): + """ + List of supported authentication providers that is defined + in this module. Each entry in this list must be associated + with an implementation of AbstractAuthenticator. + """ + + # KF is not secured + # (See NoAuthenticationAuthenticator) + NO_AUTHENTICATION = 'No authentication' + # KF is secured using KUBERNETES_SERVICE_ACCOUNT_TOKEN + # (See K8sServiceAccountTokenAuthenticator implementation) + KUBERNETES_SERVICE_ACCOUNT_TOKEN = 'Kubernetes service account token' + # KF is secured using DEX with static id/password + # (See StaticPasswordKFPAuthenticator implementation) + DEX_STATIC_PASSWORDS = 'DEX (static passwords)' + # Supports DEX with LDAP authentication + # (See DEXLDAPAuthenticator implementation) + DEX_LDAP = 'DEX (LDAP)' + # Supports multiple authentication mechanisms + # (See DEXLegacyAuthenticator implementation) + DEX_LEGACY = 'DEX (legacy)' + + @staticmethod + def get_default_provider() -> 'SupportedAuthProviders': + """ + Returns the "default" enum member (provider) + :return: default enum member + :rtype: str + """ + return SupportedAuthProviders.NO_AUTHENTICATION + + @staticmethod + def get_provider_names() -> List[str]: + """ + Returns all enum member (provider) names + :return: List of provider names + :rtype: List[str] + """ + return list(map(lambda c: c.name, SupportedAuthProviders)) + + @staticmethod + def get_instance_by_name(name: str) -> 'SupportedAuthProviders': + """ + Returns an enumeration member of SupportedAuthProviders + corresponding to the given name. + :raises ValueError: name is not a valid enum member name + :return: An enum member of SupportedAuthProviders + :rtype: SupportedAuthProviders + """ + try: + return SupportedAuthProviders[name] + except KeyError: + raise ValueError(f'\'{name}\' is not a valid {SupportedAuthProviders.__name__}') + + @staticmethod + def get_instance_by_value(value: str) -> 'SupportedAuthProviders': + """ + Returns an enumeration member of SupportedAuthProviders + corresponding to the given value. + :raises ValueError: value is not a valid enum member value + :return: An enum member of SupportedAuthProviders + :rtype: SupportedAuthProviders + """ + return SupportedAuthProviders(value) + + @staticmethod + def to_dict() -> Dict: + """ + Convert the enum into a dictionary. Keys are the member + names (internal authentication type id) and values are + the associated user-friendly member values. + + :return: dictionary, comprising all members of the enum + :rtype: Dict + """ + enum_member_dict = {} + for member in SupportedAuthProviders: + enum_member_dict[member.name] = member.value + return enum_member_dict + + +class AuthenticationError(Exception): + """ + Indicates that an error occurred while an authentication request + was being processed. + """ + + def __init__(self, + message: str, + provider: Optional[str] = None, + request_history: Optional[List[Tuple[str, requests.Response]]] = None): + """ + Create a new AuthenticationError exception. The throw-er should + populate the request_history to allow for troubleshooting. List entry key is the (HTTP) + request URL, the value the response object. + + :param message: a user friendly error message + :type message: str + :param provider: if the error is raised by an implementation of AbstractAuthenticator, + use the value of _type; optional, defaults to None + :type provider: Optional[str], optional + :param request_history: , defaults to None + :type request_history: Optional[List[Dict[str, requests.Response]]], optional + """ + self._message = message + self._provider = provider + self._request_history = request_history + + def get_request_history(self) -> Optional[List[Tuple[str, requests.Response]]]: + """ + Returns the HTTP request history that led to this exception. + + :return: A list of tuples, comprising HTTP URL and the response object + :rtype: Optional[List[Tuple[str, requests.Response]]] + """ + return self._request_history + + def request_history_to_string(self) -> Optional[str]: + """ + Dump key HTTP request history into a string for logging purposes + + :return: Formatted HTTP request history, which led to the failure. + :rtype: Optional[str] + """ + output = None + for request_entry in self._request_history or []: + if output is None: + output = f'Request URL: {request_entry[0]} '\ + f'HTTP status code: {request_entry[1].status_code} '\ + f'response URL: {request_entry[1].url}' + else: + output = f'{output}\n'\ + f'Request URL: {request_entry[0]} '\ + f'HTTP status code: {request_entry[1].status_code} '\ + f'response URL: {request_entry[1].url}' + return output + + +class KFPAuthenticator(): + """ + Use this class to authenticate with Kubeflow Pipelines. The authenticate + method delegates the actual authentication to an implementation of the + AbstractAuthenticator class. + """ + + def authenticate(self, + api_endpoint: str, + auth_type_str: str, + runtime_config_name: str, + auth_parm_1: Optional[str] = None, + auth_parm_2: Optional[str] = None) -> Dict[str, Any]: + """ + Try to authenticate with Kubeflow using the provided information. + + :param api_endpoint: Kubeflow Pipelines endpoint URL, as specified in the runtime configuration + :type api_endpoint: str + :param auth_type_str Identifies the authentication type to be performed. If the provided value + is in the SupportedAuthProviders enum, authentication is performed. + :type auth_type: str + :param runtime_config_name: Runtime configuration name where kf_endpoint is specified. + :type runtime_config_name: str + :param auth_parm_1: First authorization parameter from the runtime config, defaults to None + :type auth_parm_1: Optional[str], optional + :param auth_parm_2: Second authorization parameter from the runtime config, defaults to None + :type auth_parm_2: Optional[str], optional + :raises AuthenticationError: Authentication failed due to the provided reason. + :return: A data structure containing information that enables kfp.Client to connect to api_endpoint + :rtype: Dict[str, str] + """ + + kf_url = urlsplit(api_endpoint)._replace(path='').geturl() + + # return data structure for successful requests + auth_info = { + 'api_endpoint': kf_url, # KF API endpoint, source: runtime config + 'auth_type': None, # Authentication type, source: runtime config + 'kf_secured': False, # Indicates whether KF API is secured + 'cookies': None, # passed to KFP SDK client as "cookies" param value + 'existing_token': None # passed to KFP SDK client as "existing_token" param value + } + + try: + auth_type = SupportedAuthProviders.get_instance_by_name(auth_type_str) + auth_info['auth_type'] = auth_type.value + except ValueError: + # the provided authentication type is not supported + raise AuthenticationError(f'Authentication type \'{auth_type_str}\' is not supported. ' + f'Update runtime configuration \'{runtime_config_name}\' and try again.') + + try: + # Process the authentication request using the appropriate authenticator + # implementation. Refer to the class definitions for information how + # the request is processed + + if auth_type == SupportedAuthProviders.NO_AUTHENTICATION: + # No authentication is performed. The authenticator returns None + NoAuthenticationAuthenticator().authenticate(kf_url, + runtime_config_name) + elif auth_type == SupportedAuthProviders.DEX_STATIC_PASSWORDS: + # static id/password checking; the authenticator returns + # a cookie value + auth_info['cookies'] =\ + DEXStaticPasswordAuthenticator().authenticate(kf_url, + runtime_config_name, + username=auth_parm_1, + password=auth_parm_2) + auth_info['kf_secured'] = True + elif auth_type == SupportedAuthProviders.DEX_LEGACY: + # see implementation for details; the authenticator returns + # a cookie value + auth_info['cookies'] =\ + DEXLegacyAuthenticator().authenticate(kf_url, + runtime_config_name, + username=auth_parm_1, + password=auth_parm_2) + if auth_info.get('cookies') is not None: + auth_info['kf_secured'] = True + + elif auth_type == SupportedAuthProviders.DEX_LDAP: + # DEX/LDAP authentication; the authenticator returns + # a cookie value + auth_info['cookies'] =\ + DEXLDAPAuthenticator().authenticate(kf_url, + runtime_config_name, + username=auth_parm_1, + password=auth_parm_2) + if auth_info.get('cookies') is not None: + auth_info['kf_secured'] = True + elif auth_type == SupportedAuthProviders.KUBERNETES_SERVICE_ACCOUNT_TOKEN: + # see implementation for details; the authenticator returns None + K8sServiceAccountTokenAuthenticator().authenticate(kf_url, + runtime_config_name) + auth_info['kf_secured'] = True + else: + # SupportedAuthProviders contains a member that is not yet + # associated with an implementation of AbstractAuthenticator + raise AuthenticationError(f'Support for authentication type \'{auth_type.name}\' is not implemented.') + except AuthenticationError: + raise + except Exception as ex: + raise AuthenticationError(f'Authentication using authentication type ' + f'\'{auth_info["auth_type"]}\' failed: {ex}') + + # sanity check: upon completion auth_info must not contain + # incomplete or conflicting information + if auth_info.get('auth_type') is None or\ + (auth_info.get('cookies') is not None and auth_info.get('existing_token') is not None): + raise AuthenticationError('A potential authentication implementation problem was detected. ' + 'Please create an issue.') + return auth_info + + +class AbstractAuthenticator(ABC): + """ + Abstract base class for authenticator implementations + """ + + _type = None # unique authenticator id + + @abstractmethod + def authenticate(self, + kf_endpoint: str, + runtime_config_name: str) -> Optional[str]: + """ + Attempt to authenticate with the specified Kubeflow endpoint. The caller + expects the implementing method to behave as follows: + - if authentication fails (for any reason), AuthenticationError is raised + - an entity (e.g. a cookie) is returned that kfp.Client can use to access the endpoint + - special case: for authenticators that support unsecured endpoints, None must be returned + + :param kf_endpoint: Kubeflow endpoint URL + :type kf_endpoint: str + :param runtime_config_name: Runtime configuration name where kf_endpoint is specified. + :type runtime_config_name: str + :raises NotImplementedError: This method needs to be implemented. + :raises AuthenticationError: Authentication failed. Details are in the exception. + :return: an entity that provides the Kubeflow Pipelines SDK client access to the specified endpoint + :rtype: Optional[str] + """ + raise NotImplementedError('Method AbstractAuthenticator.authenticate must be implemented.') + + +class NoAuthenticationAuthenticator(AbstractAuthenticator): + """ + Authenticator for Kubeflow servers that are not secured. + """ + + _type = SupportedAuthProviders.NO_AUTHENTICATION + + def authenticate(self, + kf_endpoint: str, + runtime_config_name: str) -> Optional[str]: + """ + Confirms that the specified kf_endpoint can be accessed + without authentication. + + :param kf_endpoint: Kubeflow API endpoint to verify + :type kf_endpoint: str + :param runtime_config_name: Runtime configuration name where kf_endpoint is specified + :type runtime_config_name: str + :raises AuthenticationError: the endpoint is secured or an error occurred during processing. + :return: None if the endpoint is unsecured + :rtype: Optional[str] + """ + # verify that the endpoint is unsecured + get_response = requests.get(kf_endpoint, allow_redirects=True) + + if len(get_response.history) > 0: + raise AuthenticationError(f'Authentication is required for Kubeflow at {kf_endpoint}. ' + f'Update the authentication type setting in runtime configuration ' + f'\'{runtime_config_name}\' and try again.', + provider=self._type) + return None + + +class DEXStaticPasswordAuthenticator(AbstractAuthenticator): + """ + Authenticator for DEX/static passwords + """ + + _type = SupportedAuthProviders.DEX_STATIC_PASSWORDS + + def authenticate(self, + kf_endpoint: str, + runtime_config_name: str, + username: str, + password: str) -> Optional[str]: + """ + Authenticate using static password authentication. An AuthenticationError is raised + if (1) kf_endpoint is unsecured (2) kf_endpoint does not + support static password authentication (3) the credentials are invalid + + :param kf_endpoint: Kubeflow API endpoint to verify + :type kf_endpoint: str + :param runtime_config_name: Runtime configuration name where kf_endpoint is specified + :type runtime_config_name: str + :param username: Id to be used for authentication + :type username: str + :param password: Password to be used for authentication + :type password: str + :raises AuthenticationError: Authentication failed due to the specified error. + :return: A cookie value + """ + + # This code can be removed after the kfp runtime schema enforces that the values + # for username and password are valid + if _empty_or_whitespaces_only(username) or\ + _empty_or_whitespaces_only(password): + raise AuthenticationError(f'Credentials are required to perform this type of authentication. ' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type) + + with requests.Session() as s: + + request_history = [] + + ################ + # Determine if Endpoint is Secured + ################ + resp = s.get(kf_endpoint, allow_redirects=True) + request_history.append((kf_endpoint, resp)) + if resp.status_code != HTTPStatus.OK: + raise AuthenticationError(f'Error detecting whether Kubeflow server at {kf_endpoint} is secured: ' + f'HTTP status code {resp.status_code}' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + + if len(resp.history) == 0: + # if we were NOT redirected, then the endpoint is UNSECURED + # treat this as an error. + raise AuthenticationError(f'The Kubeflow server at {kf_endpoint} is not secured ' + 'using DEX static password. ' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + + ################ + # Get Dex Login URL + ################ + redirect_url_obj = urlsplit(resp.url) + + # if we are at `/auth?=xxxx` path, we need to select the + # static password auth type + if re.search(r"/auth$", redirect_url_obj.path): + redirect_url_obj = redirect_url_obj._replace( + path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path) + ) + else: + # verify that KF is secured by static passwords + m = re.search(r"/auth/([^/]*)/?", redirect_url_obj.path) + if m and m.group(1) != 'local': + raise AuthenticationError( + f'The Kubeflow server at {kf_endpoint} redirected to an unexpected HTTP path ' + f'(\'{redirect_url_obj.path}\'). Verify that Kubeflow is secured using \'{self._type.name}\'' + f' and, if necessary, update the authentication type in runtime configuration ' + f'\'{runtime_config_name}\'.', + provider=self._type, + request_history=request_history) + + # if we are at `/auth/local/login` path, then no further action is needed + # (we can use it for login POST) + if re.search(r"/auth/local/login$", redirect_url_obj.path): + dex_login_url = redirect_url_obj.geturl() + else: + # else, we need to be redirected to the actual login page + # this GET should redirect us to the `/auth/local/login` path + resp = s.get(redirect_url_obj.geturl(), allow_redirects=True) + request_history.append((redirect_url_obj.geturl(), resp)) + if resp.status_code != HTTPStatus.OK: + raise AuthenticationError('Error redirecting to the DEX static password login page: ' + f'HTTP status code {resp.status_code}.', + provider=self._type, + request_history=request_history) + # set the login url + dex_login_url = resp.url + + ################ + # Attempt Dex Login + ################ + resp = s.post( + dex_login_url, + data={"login": username, "password": password}, + allow_redirects=True + ) + request_history.append((dex_login_url, resp)) + + if len(resp.history) == 0: + raise AuthenticationError('The credentials are probably invalid. ' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + + # store the session cookies in a "key1=value1; key2=value2" string + return "; ".join([f"{c.name}={c.value}" for c in s.cookies]) + + # this code should never be reached; raise an error + raise AuthenticationError('An implementation problem was detected for DEX static password authentication. ' + 'Please create an issue.', + provider=self._type, + request_history=request_history) + + +class DEXLDAPAuthenticator(AbstractAuthenticator): + """ + Authenticator for DEX/LDAP. + """ + + _type = SupportedAuthProviders.DEX_LDAP + + def authenticate(self, + kf_endpoint: str, + runtime_config_name: str, + username: str, + password: str) -> Optional[str]: + """ + Authenticate using LDAP. An AuthenticationError is raised + if (1) kf_endpoint is unsecured (2) kf_endpoint does not + support LDAP authentication (3) the credentials are invalid + + :param kf_endpoint: Kubeflow API endpoint to verify + :type kf_endpoint: str + :param runtime_config_name: Runtime configuration name where kf_endpoint is specified + :type runtime_config_name: str + :param username: Id to be used for authentication + :type username: str + :param password: Password to be used for authentication + :type password: str + :raises AuthenticationError: Authentication failed due to the specified error. + :return: A cookie value + """ + + # This code can be removed after the kfp runtime schema enforces that the values + # for username and password are valid + if _empty_or_whitespaces_only(username) or\ + _empty_or_whitespaces_only(password): + raise AuthenticationError(f'Credentials are required to perform this type of authentication. ' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type) + + with requests.Session() as s: + + request_history = [] + + ################ + # Determine if Endpoint is Secured + ################ + resp = s.get(kf_endpoint, allow_redirects=True) + request_history.append((kf_endpoint, resp)) + if resp.status_code != HTTPStatus.OK: + raise AuthenticationError(f'Error detecting whether Kubeflow server at {kf_endpoint} is secured: ' + f'HTTP status code {resp.status_code}' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + + if len(resp.history) == 0: + # if we were NOT redirected, then the endpoint is UNSECURED + # treat this as an error. + raise AuthenticationError(f'The Kubeflow server at {kf_endpoint} is not secured using DEX with LDAP. ' + f'Update the authentication type in runtime configuration ' + f'\'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + + ################ + # Get Dex Login URL + ################ + redirect_url_obj = urlsplit(resp.url) + + # if we are at `/auth?=xxxx` path, we need to select + # the LDAP auth type + if re.search(r"/auth$", redirect_url_obj.path): + redirect_url_obj = redirect_url_obj._replace( + path=re.sub(r"/auth$", "/auth/ldap", redirect_url_obj.path) + ) + else: + # verify that KF is secured by LDAP + m = re.search(r"/auth/([^/]*)/?", redirect_url_obj.path) + if m and m.group(1) != 'ldap': + raise AuthenticationError( + f'The Kubeflow server at {kf_endpoint} redirected to an unexpected HTTP path ' + f'(\'{redirect_url_obj.path}\'). Verify that Kubeflow is configured for \'{self._type.name}\'' + f' and, if necessary, update the authentication type in runtime configuration ' + f'\'{runtime_config_name}\'.', + provider=self._type, + request_history=request_history) + + # if we are at `/auth/ldap/login` path, then no further action is needed + # (we can use it for login POST) + if re.search(r"/auth/ldap/login$", redirect_url_obj.path): + dex_login_url = redirect_url_obj.geturl() + else: + # else, we need to be redirected to the actual login page + # this GET should redirect us to the `/auth/ldap/login` path + resp = s.get(redirect_url_obj.geturl(), allow_redirects=True) + request_history.append((redirect_url_obj.geturl(), resp)) + if resp.status_code != HTTPStatus.OK: + raise AuthenticationError('Error redirecting to the DEX LDAP login page: ' + f'HTTP status code {resp.status_code}.', + provider=self._type, + request_history=request_history) + # set the login url + dex_login_url = resp.url + + ################ + # Attempt Dex Login + ################ + resp = s.post( + dex_login_url, + data={"login": username, "password": password}, + allow_redirects=True + ) + request_history.append((dex_login_url, resp)) + + if len(resp.history) == 0: + raise AuthenticationError('The DEX LDAP credentials are probably invalid. ' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + + # store the session cookies in a "key1=value1; key2=value2" string + return "; ".join([f"{c.name}={c.value}" for c in s.cookies]) + + # this code should never be reached; raise an error + raise AuthenticationError('An implementation problem was detected for LDAP authentication. ' + 'Please create an issue.', + provider=self._type, + request_history=request_history) + + +class K8sServiceAccountTokenAuthenticator(AbstractAuthenticator): + """ + Authenticator for Service Account Tokens on Kubernetes. + """ + + _type = SupportedAuthProviders.KUBERNETES_SERVICE_ACCOUNT_TOKEN + + def authenticate(self, + kf_endpoint: str, + runtime_config_name: str) -> None: + """ + Verify that service account token authentication can be performed. + An AuthenticationError is raised if a problem is encountered that + would likely prevent the KFP client from authenticating successfully. + + :param kf_endpoint: Kubeflow API endpoint to verify + :type kf_endpoint: str + :param runtime_config_name: Runtime configuration name where kf_endpoint is specified + :type runtime_config_name: str + :raises AuthenticationError: A potential issue was detected that will + likely cause a KFP client failure. + """ + + request_history = [] + + """ + Disable connectivity test to avoid false positives. + + # Verify connectivity for the API endpoint + resp = requests.get(kf_endpoint, allow_redirects=True) + request_history.append((kf_endpoint, resp)) + if resp.status_code != HTTPStatus.OK: + raise AuthenticationError(f'Error detecting whether Kubeflow server at {kf_endpoint} is secured: ' + 'HTTP status code {resp.status_code}' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + # If redirected, KF cannot be accessed using service account token. + # This is a likely mismatch between selected Kubeflow auth type and configured auth type. + if len(resp.history) > 0: + raise AuthenticationError(f'Kubeflow server at {kf_endpoint} redirected to an unexpected ' + f'URL \'{resp.url}\'. Service account token access cannot be used ' + 'for authentication. ' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + """ + + # Running in a Kubernetes pod, kfp.Client can use a service account token + # for authentication. Verify that a token file exists in the current environment. + service_account_token_path = os.environ.get(KF_PIPELINES_SA_TOKEN_ENV, + KF_PIPELINES_SA_TOKEN_PATH) + + try: + with open(service_account_token_path, 'r') as token_file: + if len(token_file.read()) == 0: + raise AuthenticationError(f'Kubernetes service account token file ' + f'{service_account_token_path} is empty.', + provider=self._type, + request_history=request_history) + except AuthenticationError: + raise + except Exception as ex: + raise AuthenticationError(f'Kubernetes service account token could not be read ' + f'from {service_account_token_path}: {ex}.', + provider=self._type, + request_history=request_history) + + # Nothing needs to be passed to the KFP client + return None + + +class DEXLegacyAuthenticator(AbstractAuthenticator): + """ + Authenticator for generic/legacy DEX authentication. + """ + + _type = SupportedAuthProviders.DEX_LEGACY + + def authenticate(self, + kf_endpoint: str, + runtime_config_name: str, + username: Optional[str], + password: Optional[str]) -> Optional[str]: + """ + Authentication using the following flow: + - detect wether Kubeflow endpoint is secured + - if endpoint is secured, try to authenticate if a username and password were provided + + :param kf_endpoint: Kubeflow API endpoint to verify + :type kf_endpoint: str + :param runtime_config_name: Runtime configuration name where kf_endpoint is specified + :type runtime_config_name: str + :param username: Id to be used for authentication + :type username: Optional[str] + :param password: Password to be used for authentication + :type password: Optional[str] + :raises AuthenticationError: Authentication failed due to the specified error. + :return: None if kf_endpoint is not secured, a cookie value otherwise + :rtype: Optional[str] + """ + + # keep history of all HTTP requests and responses for troubleshooting + request_history = [] + + # Obtain redirect URL + resp = requests.get(kf_endpoint) + request_history.append((kf_endpoint, resp)) + if resp.status_code != HTTPStatus.OK: + raise AuthenticationError(f'Error detecting whether Kubeflow server at {kf_endpoint} is secured: ' + 'HTTP status code {resp.status_code}' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + + # If KF redirected to '/dex/auth/... + # try to authenticate using the provided credentials + if 'dex/auth' in resp.url: + + if _empty_or_whitespaces_only(username) or _empty_or_whitespaces_only(password): + raise AuthenticationError(f'Kubeflow server at {kf_endpoint} is secured: ' + 'username and password are required. ' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + + # Try to authenticate user by sending a request to the + # redirect URL + session = requests.Session() + auth_url = resp.url + resp = session.post(auth_url, + data={'login': username, + 'password': password}) + request_history.append((auth_url, resp)) + + if resp.status_code != HTTPStatus.OK: + raise AuthenticationError(f'Authentication {auth_url} failed: ' + f'HTTP status code {resp.status_code}' + f'Update runtime configuration \'{runtime_config_name}\' and try again.', + provider=self._type, + request_history=request_history) + # Capture authservice_session cookie, if one was returned + # in the response + cookie_auth_key = 'authservice_session' + cookie_auth_value = session.cookies.get(cookie_auth_key) + + if cookie_auth_value: + return f'{cookie_auth_key}={cookie_auth_value}' + + # The endpoint is not secured. + return None diff --git a/elyra/pipeline/kfp/kfp_metadata.py b/elyra/pipeline/kfp/kfp_metadata.py new file mode 100644 index 000000000..b0f71ea69 --- /dev/null +++ b/elyra/pipeline/kfp/kfp_metadata.py @@ -0,0 +1,46 @@ +# +# Copyright 2018-2021 Elyra Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any + +from elyra.metadata.manager import MetadataManager +from elyra.pipeline.kfp.kfp_authentication import SupportedAuthProviders +from elyra.pipeline.runtimes_metadata import RuntimesMetadata + + +class KfpMetadata(RuntimesMetadata): + """ + Applies changes specific to the kfp schema + """ + + def on_load(self, **kwargs: Any) -> None: + + if self.metadata.get('auth_type') is None: + # Inject auth_type property for metadata persisted using Elyra < 3.3: + # - api_username and api_password present -> use DEX Legacy + # - otherwise -> use no authentication type + if self.metadata.get('api_username') is None or\ + len(self.metadata.get('api_username').strip()) == 0 or\ + self.metadata.get('api_password') is None or\ + len(self.metadata.get('api_password').strip()) == 0: + self.metadata['auth_type'] = SupportedAuthProviders.NO_AUTHENTICATION.name + else: + self.metadata['auth_type'] = SupportedAuthProviders.DEX_LEGACY.name + + # save changes + MetadataManager(schemaspace="runtimes").update(self.name, self, for_migration=True) + + return None diff --git a/elyra/pipeline/kfp/processor_kfp.py b/elyra/pipeline/kfp/processor_kfp.py index ef3a14665..dd169c0e0 100644 --- a/elyra/pipeline/kfp/processor_kfp.py +++ b/elyra/pipeline/kfp/processor_kfp.py @@ -31,7 +31,7 @@ from kfp.dsl import PipelineConf from kfp.aws import use_aws_secret # noqa H306 from kubernetes import client as k8s_client -import requests + try: from kfp_tekton import compiler as kfp_tekton_compiler from kfp_tekton import TektonClient @@ -45,6 +45,8 @@ from elyra.metadata.schemaspaces import RuntimeImages from elyra.metadata.schemaspaces import Runtimes from elyra.pipeline.kfp.component_parser_kfp import KfpComponentParser +from elyra.pipeline.kfp.kfp_authentication import AuthenticationError +from elyra.pipeline.kfp.kfp_authentication import KFPAuthenticator from elyra.pipeline.pipeline import GenericOperation from elyra.pipeline.pipeline import Operation from elyra.pipeline.processor import PipelineProcessor @@ -102,37 +104,39 @@ def process(self, pipeline): cos_endpoint = runtime_configuration.metadata['cos_endpoint'] cos_bucket = runtime_configuration.metadata['cos_bucket'] - ################ - # Istio Auth Session - ################ - try: - auth_session = self._get_istio_auth_session( - url=api_endpoint, - username=api_username, - password=api_password - ) - except Exception as ex: - raise RuntimeError( - f"Failed to create istio auth session for Kubeflow endpoint: '{api_endpoint}' - " - f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'" - ) from ex + # Determine which provider to use to authenticate with Kubeflow + auth_type = runtime_configuration.metadata.get('auth_type') - self.log.debug(f"Kubeflow istio `auth_session` dict: {auth_session}") + try: + auth_info = \ + KFPAuthenticator().authenticate(api_endpoint, + auth_type_str=auth_type, + runtime_config_name=pipeline.runtime_config, + auth_parm_1=api_username, + auth_parm_2=api_password) + self.log.debug(f'Authenticator returned {auth_info}') + except AuthenticationError as ae: + if ae.get_request_history() is not None: + self.log.info('An authentication error was raised. Diagnostic information follows.') + self.log.info(ae.request_history_to_string()) + raise RuntimeError(f'Kubeflow authentication failed: {ae}') ############# - # Kubeflow Client + # Create Kubeflow Client ############# try: if engine == "Tekton": client = TektonClient( host=api_endpoint, - cookies=auth_session['session_cookie'], + cookies=auth_info.get('cookies', None), + existing_token=auth_info.get('existing_token', None), namespace=user_namespace ) else: client = ArgoClient( host=api_endpoint, - cookies=auth_session['session_cookie'], + cookies=auth_info.get('cookies', None), + existing_token=auth_info.get('existing_token', None), namespace=user_namespace ) except Exception as ex: @@ -346,6 +350,14 @@ def process(self, pipeline): f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'" ) from ex + if run is None: + # client.run_pipeline seemed to have encountered an issue + # but didn't raise an exception + raise RuntimeError( + f"Failed to create Kubeflow pipeline run: '{job_name}' - " + f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'" + ) + self.log_pipeline_info( pipeline_name, f"pipeline submitted: {api_endpoint}/#/runs/details/{run.id}", @@ -720,95 +732,6 @@ def _sanitize_operation_name(name: str) -> str: """ return re.sub('-+', '-', re.sub('[^-_0-9A-Za-z ]+', '-', name)).lstrip('-').rstrip('-') - @staticmethod - def _get_istio_auth_session(url: str, username: str, password: str) -> dict: - """ - Determine if the specified URL is secured by Dex and try to obtain a session cookie. - WARNING: only Dex `staticPasswords` and `LDAP` authentication are currently supported - (we default default to using `staticPasswords` if both are enabled) - - :param url: Kubeflow server URL, including protocol - :param username: Dex `staticPasswords` or `LDAP` username - :param password: Dex `staticPasswords` or `LDAP` password - :return: auth session information - """ - # define the default return object - auth_session = { - "endpoint_url": url, # KF endpoint URL - "redirect_url": None, # KF redirect URL, if applicable - "dex_login_url": None, # Dex login URL (for POST of credentials) - "is_secured": None, # True if KF endpoint is secured - "session_cookie": None # Resulting session cookies in the form "key1=value1; key2=value2" - } - - # use a persistent session (for cookies) - with requests.Session() as s: - - ################ - # Determine if Endpoint is Secured - ################ - resp = s.get(url, allow_redirects=True) - if resp.status_code != 200: - raise RuntimeError( - f"HTTP status code '{resp.status_code}' for GET against: {url}" - ) - - auth_session["redirect_url"] = resp.url - - # if we were NOT redirected, then the endpoint is UNSECURED - if len(resp.history) == 0: - auth_session["is_secured"] = False - return auth_session - else: - auth_session["is_secured"] = True - - ################ - # Get Dex Login URL - ################ - redirect_url_obj = urlsplit(auth_session["redirect_url"]) - - # if we are at `/auth?=xxxx` path, we need to select an auth type - if re.search(r"/auth$", redirect_url_obj.path): - # default to "staticPasswords" auth type - redirect_url_obj = redirect_url_obj._replace( - path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path) - ) - - # if we are at `/auth/xxxx/login` path, then no further action is needed (we can use it for login POST) - if re.search(r"/auth/.*/login$", redirect_url_obj.path): - auth_session["dex_login_url"] = redirect_url_obj.geturl() - - # else, we need to be redirected to the actual login page - else: - # this GET should redirect us to the `/auth/xxxx/login` path - resp = s.get(redirect_url_obj.geturl(), allow_redirects=True) - if resp.status_code != 200: - raise RuntimeError( - f"HTTP status code '{resp.status_code}' for GET against: {redirect_url_obj.geturl()}" - ) - - # set the login url - auth_session["dex_login_url"] = resp.url - - ################ - # Attempt Dex Login - ################ - resp = s.post( - auth_session["dex_login_url"], - data={"login": username, "password": password}, - allow_redirects=True - ) - if len(resp.history) == 0: - raise RuntimeError( - f"Login credentials were probably invalid - " - f"No redirect after POST to: {auth_session['dex_login_url']}" - ) - - # store the session cookies in a "key1=value1; key2=value2" string - auth_session["session_cookie"] = "; ".join([f"{c.name}={c.value}" for c in s.cookies]) - - return auth_session - class KfpPipelineProcessorResponse(PipelineProcessorResponse): diff --git a/elyra/pipeline/runtimes_metadata.py b/elyra/pipeline/runtimes_metadata.py new file mode 100644 index 000000000..cf30cc445 --- /dev/null +++ b/elyra/pipeline/runtimes_metadata.py @@ -0,0 +1,20 @@ +# +# Copyright 2018-2021 Elyra Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from elyra.metadata.metadata import Metadata + + +class RuntimesMetadata(Metadata): + pass diff --git a/elyra/templates/kfp/kfp_template.jinja2 b/elyra/templates/kfp/kfp_template.jinja2 index 50aa329ba..ce1b5626c 100644 --- a/elyra/templates/kfp/kfp_template.jinja2 +++ b/elyra/templates/kfp/kfp_template.jinja2 @@ -2,12 +2,6 @@ import kfp {% if engine == "Tekton" %} import kfp_tekton {% endif %} -{% if kf_secured %} -import re -import requests -import sys -from urllib.parse import urlsplit -{% endif %} {% if cos_secret %} from kfp.aws import use_aws_secret {% endif %} @@ -66,128 +60,10 @@ def create_pipeline(): {% endfor %} -{% if kf_secured %} -def get_istio_auth_session(url: str, username: str, password: str) -> dict: - """ - Determine if the specified URL is secured by Dex and try to obtain a session cookie. - WARNING: only Dex `staticPasswords` and `LDAP` authentication are currently supported - (we default default to using `staticPasswords` if both are enabled) - - :param url: Kubeflow server URL, including protocol - :param username: Dex `staticPasswords` or `LDAP` username - :param password: Dex `staticPasswords` or `LDAP` password - :return: auth session information - """ - # define the default return object - auth_session = { - "endpoint_url": url, # KF endpoint URL - "redirect_url": None, # KF redirect URL, if applicable - "dex_login_url": None, # Dex login URL (for POST of credentials) - "is_secured": None, # True if KF endpoint is secured - "session_cookie": None # Resulting session cookies in the form "key1=value1; key2=value2" - } - - # use a persistent session (for cookies) - with requests.Session() as s: - - ################ - # Determine if Endpoint is Secured - ################ - resp = s.get(url, allow_redirects=True) - if resp.status_code != 200: - raise RuntimeError( - f"HTTP status code '{resp.status_code}' for GET against: {url}" - ) - - auth_session["redirect_url"] = resp.url - - # if we were NOT redirected, then the endpoint is UNSECURED - if len(resp.history) == 0: - auth_session["is_secured"] = False - return auth_session - else: - auth_session["is_secured"] = True - - ################ - # Get Dex Login URL - ################ - redirect_url_obj = urlsplit(auth_session["redirect_url"]) - - # if we are at `/auth?=xxxx` path, we need to select an auth type - if re.search(r"/auth$", redirect_url_obj.path): - # default to "staticPasswords" auth type - redirect_url_obj = redirect_url_obj._replace( - path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path) - ) - - # if we are at `/auth/xxxx/login` path, then no further action is needed (we can use it for login POST) - if re.search(r"/auth/.*/login$", redirect_url_obj.path): - auth_session["dex_login_url"] = redirect_url_obj.geturl() - - # else, we need to be redirected to the actual login page - else: - # this GET should redirect us to the `/auth/xxxx/login` path - resp = s.get(redirect_url_obj.geturl(), allow_redirects=True) - if resp.status_code != 200: - raise RuntimeError( - f"HTTP status code '{resp.status_code}' for GET against: {redirect_url_obj.geturl()}" - ) - - # set the login url - auth_session["dex_login_url"] = resp.url - - ################ - # Attempt Dex Login - ################ - resp = s.post( - auth_session["dex_login_url"], - data={"login": username, "password": password}, - allow_redirects=True - ) - if len(resp.history) == 0: - raise RuntimeError( - f"Login credentials were probably invalid - " - f"No redirect after POST to: {auth_session['dex_login_url']}" - ) - - # store the session cookies in a "key1=value1; key2=value2" string - auth_session["session_cookie"] = "; ".join([f"{c.name}={c.value}" for c in s.cookies]) - - return auth_session -{% endif %} - - if __name__ == "__main__": -{% if kf_secured %} - # user id and password are required parameters - if(len(sys.argv) < 3): - print(f'Invocation: python {sys.argv[0]} ' - ' ') - sys.exit(1) - - print('Trying to authenticate with Kubeflow server ...') - - api_endpoint = '{{ api_endpoint }}'.rstrip('/') - api_username = sys.argv[1] - api_password = sys.argv[2] - - try: - # attempt to create a session cookies using the provided `api_username` and `api_password` - auth_session = get_istio_auth_session( - url=api_endpoint, - username=api_username, - password=api_password - ) - except Exception as ex: - raise RuntimeError( - f"Failed to create istio auth session for Kubeflow endpoint: '{api_endpoint}'" - ) from ex - - auth_cookie = auth_session['session_cookie'] -{% else %} + # TODO authentication auth_cookie = None -{% endif %} pipeline_func = create_pipeline pipeline_name = '{{ pipeline_name }}'