1616import os
1717import tempfile
1818import time
19+ import warnings
1920
2021import datetime
2122from typing import Mapping , Callable , Optional
2728
2829from .compiler import TektonCompiler
2930from .compiler .pipeline_utils import TektonPipelineConf
31+ from kfp ._auth import get_auth_token , get_gcp_access_token
3032
3133import json
3234import logging
@@ -106,7 +108,8 @@ def __init__(self,
106108 ssl_ca_cert = None ,
107109 kube_context = None ,
108110 credentials = None ,
109- ui_host = None ):
111+ ui_host = None ,
112+ verify_ssl = None ):
110113 """Create a new instance of kfp client."""
111114 host = host or os .environ .get (KF_PIPELINES_ENDPOINT_ENV )
112115 self ._uihost = os .environ .get (KF_PIPELINES_UI_ENDPOINT_ENV , ui_host or
@@ -120,7 +123,7 @@ def __init__(self,
120123
121124 config = self ._load_config (host , client_id , namespace , other_client_id ,
122125 other_client_secret , existing_token , proxy ,
123- ssl_ca_cert , kube_context , credentials )
126+ ssl_ca_cert , kube_context , credentials , verify_ssl )
124127 # Save the loaded API client configuration, as a reference if update is
125128 # needed.
126129 self ._load_context_setting_or_default ()
@@ -162,7 +165,106 @@ def __init__(self,
162165 except FileNotFoundError :
163166 logging .info (
164167 'Failed to automatically set namespace.' , exc_info = False )
165-
168+
169+ def _load_config (self , host , client_id , namespace , other_client_id ,
170+ other_client_secret , existing_token , proxy , ssl_ca_cert ,
171+ kube_context , credentials , verify_ssl ):
172+ config = kfp_server_api .configuration .Configuration ()
173+
174+ if proxy :
175+ # https://github.com/kubeflow/pipelines/blob/c6ac5e0b1fd991e19e96419f0f508ec0a4217c29/backend/api/python_http_client/kfp_server_api/rest.py#L100
176+ config .proxy = proxy
177+ if verify_ssl is not None :
178+ config .verify_ssl = verify_ssl
179+
180+ if ssl_ca_cert :
181+ config .ssl_ca_cert = ssl_ca_cert
182+
183+ host = host or ''
184+
185+ # Defaults to 'https' if host does not contain 'http' or 'https' protocol.
186+ if host and not host .startswith ('http' ):
187+ warnings .warn (
188+ 'The host %s does not contain the "http" or "https" protocol.'
189+ ' Defaults to "https".' % host )
190+ host = 'https://' + host
191+
192+ # Preprocess the host endpoint to prevent some common user mistakes.
193+ if not client_id :
194+ # always preserving the protocol (http://localhost requires it)
195+ host = host .rstrip ('/' )
196+
197+ if host :
198+ config .host = host
199+
200+ token = None
201+
202+ # "existing_token" is designed to accept token generated outside of SDK. Here is an example.
203+ #
204+ # https://cloud.google.com/functions/docs/securing/function-identity
205+ # https://cloud.google.com/endpoints/docs/grpc/service-account-authentication
206+ #
207+ # import requests
208+ # import kfp
209+ #
210+ # def get_access_token():
211+ # url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token'
212+ # r = requests.get(url, headers={'Metadata-Flavor': 'Google'})
213+ # r.raise_for_status()
214+ # access_token = r.json()['access_token']
215+ # return access_token
216+ #
217+ # client = kfp.Client(host='<KFPHost>', existing_token=get_access_token())
218+ #
219+ if existing_token :
220+ token = existing_token
221+ self ._is_refresh_token = False
222+ elif client_id :
223+ token = get_auth_token (client_id , other_client_id ,
224+ other_client_secret )
225+ self ._is_refresh_token = True
226+ elif self ._is_inverse_proxy_host (host ):
227+ token = get_gcp_access_token ()
228+ self ._is_refresh_token = False
229+ elif credentials :
230+ config .api_key ['authorization' ] = 'placeholder'
231+ config .api_key_prefix ['authorization' ] = 'Bearer'
232+ config .refresh_api_key_hook = credentials .refresh_api_key_hook
233+
234+ if token :
235+ config .api_key ['authorization' ] = token
236+ config .api_key_prefix ['authorization' ] = 'Bearer'
237+ return config
238+
239+ if host :
240+ # if host is explicitly set with auth token, it's probably a port forward address.
241+ return config
242+
243+ import kubernetes as k8s
244+ in_cluster = True
245+ try :
246+ k8s .config .load_incluster_config ()
247+ except :
248+ in_cluster = False
249+ pass
250+
251+ if in_cluster :
252+ config .host = TektonClient .IN_CLUSTER_DNS_NAME .format (namespace )
253+ config = self ._get_config_with_default_credentials (config )
254+ return config
255+
256+ try :
257+ k8s .config .load_kube_config (
258+ client_configuration = config , context = kube_context )
259+ except :
260+ print ('Failed to load kube config.' )
261+ return config
262+
263+ if config .host :
264+ config .host = config .host + '/' + TektonClient .KUBE_PROXY_PATH .format (
265+ namespace )
266+ return config
267+
166268 def wait_for_run_completion (self , run_id : str , timeout : int ):
167269 """Waits for a run to complete.
168270
0 commit comments