Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Keycloak service account auth capability to ansible-galaxy #83145

Draft
wants to merge 3 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,2 @@
minor_changes:
- ansible-galaxy - Add support for Keycloak service accounts
22 changes: 13 additions & 9 deletions lib/ansible/cli/galaxy.py
Expand Up @@ -76,6 +76,7 @@
('api_version', False, 'int'),
('validate_certs', False, 'bool'),
('client_id', False, 'str'),
('client_secret', False, 'str'),
('timeout', False, 'int'),
]

Expand Down Expand Up @@ -662,6 +663,7 @@ def server_config_def(section, key, required, option_type):
# it doesn't need to be passed as kwarg to GalaxyApi, same for others we pop here
auth_url = server_options.pop('auth_url')
client_id = server_options.pop('client_id')
client_secret = server_options.pop('client_secret')
token_val = server_options['token'] or NoTokenSentinel
username = server_options['username']
api_version = server_options.pop('api_version')
Expand All @@ -687,15 +689,17 @@ def server_config_def(section, key, required, option_type):
if username:
server_options['token'] = BasicAuthToken(username, server_options['password'])
else:
if token_val:
if auth_url:
server_options['token'] = KeycloakToken(access_token=token_val,
auth_url=auth_url,
validate_certs=validate_certs,
client_id=client_id)
else:
# The galaxy v1 / github / django / 'Token'
server_options['token'] = GalaxyToken(token=token_val)
if auth_url:
server_options['token'] = KeycloakToken(
access_token=token_val,
auth_url=auth_url,
validate_certs=validate_certs,
client_id=client_id,
client_secret=client_secret,
)
elif token_val:
# The galaxy v1 / github / django / 'Token'
server_options['token'] = GalaxyToken(token=token_val)

server_options.update(galaxy_options)
config_servers.append(GalaxyAPI(
Expand Down
56 changes: 36 additions & 20 deletions lib/ansible/galaxy/token.py
Expand Up @@ -21,11 +21,15 @@
from __future__ import annotations

import base64
import os
import json
import os
import time
from stat import S_IRUSR, S_IWUSR
from urllib.error import HTTPError
from urllib.parse import urlencode

from ansible import constants as C
from ansible.galaxy.api import GalaxyError
from ansible.galaxy.user_agent import user_agent
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.yaml import yaml_dump, yaml_load
Expand All @@ -49,45 +53,57 @@ class KeycloakToken(object):

token_type = 'Bearer'

def __init__(self, access_token=None, auth_url=None, validate_certs=True, client_id=None):
def __init__(self, access_token=None, auth_url=None, validate_certs=True, client_id=None, client_secret=None):
self.access_token = access_token
self.auth_url = auth_url
self._token = None
self.validate_certs = validate_certs
self.client_id = client_id
if self.client_id is None:
self.client_id = 'cloud-services'
self.client_secret = client_secret
self._expiration = None

def _form_payload(self):
return 'grant_type=refresh_token&client_id=%s&refresh_token=%s' % (self.client_id,
self.access_token)
payload = {
'client_id': self.client_id,
}
if self.client_secret:
payload['client_secret'] = self.client_secret
payload['scope'] = 'openid api.iam.service_accounts'
payload['grant_type'] = 'client_credentials'
else:
payload['refresh_token'] = self.access_token
payload['grant_type'] = 'refresh_token'
return urlencode(payload)

def get(self):
if self._expiration and time.time() >= self._expiration:
self._token = None

if self._token:
return self._token

# - build a request to POST to auth_url
# - body is form encoded
# - 'refresh_token' is the offline token stored in ansible.cfg
# - 'grant_type' is 'refresh_token'
# - 'client_id' is 'cloud-services'
# - should probably be based on the contents of the
# offline_ticket's JWT payload 'aud' (audience)
# or 'azp' (Authorized party - the party to which the ID Token was issued)
payload = self._form_payload()

resp = open_url(to_native(self.auth_url),
data=payload,
validate_certs=self.validate_certs,
method='POST',
http_agent=user_agent())
try:
resp = open_url(to_native(self.auth_url),
data=payload,
validate_certs=self.validate_certs,
method='POST',
http_agent=user_agent())
except HTTPError as e:
raise GalaxyError(e, 'Unable to get access token')

# TODO: handle auth errors
data = json.load(resp)

data = json.loads(to_text(resp.read(), errors='surrogate_or_strict'))
# So that we have a buffer, expire the token in ~2/3 the given value
expires_in = data['expires_in'] // 3 * 2
self._expiration = time.time() + expires_in

# - extract 'access_token'
self._token = data.get('access_token')
if token_type := data.get('token_type'):
self.token_type = token_type

return self._token

Expand Down