-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
from contextlib import contextmanager
from typing import Optional, TypeAlias, Union
import kubernetes as k8s
import yaml
from azure.identity import (
AuthorizationCodeCredential,
AzurePowerShellCredential,
InteractiveBrowserCredential,
CertificateCredential,
ChainedTokenCredential,
ClientSecretCredential,
DefaultAzureCredential,
EnvironmentCredential,
ManagedIdentityCredential,
OnBehalfOfCredential,
SharedTokenCacheCredential,
AzureCliCredential,
DeviceCodeCredential,
UsernamePasswordCredential,
VisualStudioCodeCredential,
ClientAssertionCredential,
# WorkloadIdentityCredential,
)
from azure.mgmt.containerservice import ContainerServiceClient
from kubernetes.config.kube_config import KubeConfigLoader
__all__ = (
'get_kubeconfig',
'get_api_client',
)
AzureCredential: TypeAlias = Union[
AuthorizationCodeCredential,
AzurePowerShellCredential,
InteractiveBrowserCredential,
CertificateCredential,
ChainedTokenCredential,
ClientSecretCredential,
DefaultAzureCredential,
EnvironmentCredential,
ManagedIdentityCredential,
OnBehalfOfCredential,
SharedTokenCacheCredential,
AzureCliCredential,
DeviceCodeCredential,
UsernamePasswordCredential,
VisualStudioCodeCredential,
ClientAssertionCredential,
# WorkloadIdentityCredential,
]
_clusters: dict[str, dict] = dict()
def get_clusters(
*,
subscription_id: str,
credential: Optional[AzureCredential] = None,
) -> dict[str, dict]:
credential = credential or DefaultAzureCredential()
aks_client = ContainerServiceClient(
credential, subscription_id,
)
clusters = {
cluster.name: cluster.as_dict()
for cluster in aks_client.managed_clusters.list()
}
return clusters
def get_cluster(
cluster_name: str,
*,
subscription_id: str,
credential: Optional[AzureCredential] = None,
) -> dict:
global _clusters
try:
return _clusters[cluster_name]
except Exception as exc:
_clusters = get_clusters(
subscription_id=subscription_id,
credential=credential,
)
try:
return _clusters[cluster_name]
except KeyError as exc:
msg = (
f"Could not find a cluster named '{cluster_name}'!\n"
f"clusters = {list(_clusters)!r}"
)
raise RuntimeError(msg) from exc
def get_kubeconfig(
cluster_name: str,
*,
subscription_id: str,
credential: Optional[AzureCredential] = None,
) -> k8s.client.Configuration:
credential = credential or DefaultAzureCredential()
aks_client = ContainerServiceClient(
credential, subscription_id,
)
cluster = get_cluster(
cluster_name,
subscription_id=subscription_id,
credential=credential,
)
resource_group = cluster["id"].split('/')[4]
aks_credentials = aks_client.managed_clusters.list_cluster_admin_credentials(
resource_group, cluster_name,
)
kubeconfig = aks_credentials.kubeconfigs[0].value.decode('utf-8')
cfg_dict = yaml.safe_load(kubeconfig)
loader = KubeConfigLoader(cfg_dict)
config = k8s.client.Configuration()
loader.load_and_set(config)
k8s.client.Configuration.set_default(config)
return config
def get_api_client(
cluster_name: str,
*,
subscription_id: str,
credential: Optional[AzureCredential] = None,
) -> k8s.client.Configuration:
config = get_kubeconfig(
cluster_name,
subscription_id=subscription_id,
credential=credential,
)
return k8s.client.ApiClient(config)Metadata
Metadata
Assignees
Labels
No labels