forked from interTwin-eu/teapot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathalise.py
More file actions
137 lines (119 loc) · 4.61 KB
/
alise.py
File metadata and controls
137 lines (119 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import configparser
import hashlib
import logging
from urllib.parse import quote_plus
import requests
config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
config.read("/etc/teapot/config.ini")
logging.basicConfig(
filename=config["Teapot"]["log_location"],
encoding="utf-8",
filemode="a",
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M",
level=config["Teapot"]["log_level"],
)
logger = logging.getLogger(__name__)
class Alise:
def __init__(self):
self.apikey = config["ALISE"]["APIKEY"]
self.alise_url = config["ALISE"]["INSTANCE"]
self.site = config["ALISE"]["COMPUTING_CENTRE"]
@staticmethod
def hashencode(iss):
hash_method = "sha1"
hash_function = getattr(hashlib, hash_method)()
if not iss:
logger.error("Error: input string for issuer is empty.")
return None
hash_function.update(iss.encode())
hash = hash_function.hexdigest()
return hash
@staticmethod
def urlencode(sub):
"""
URL-encode the given subject claim string.
This function takes an input string sub, encodes it using quote_plus,
and returns the encoded result. If the input is empty, it logs an error
and returns None.
"""
if not sub:
logger.error("Error: input string for subject claim is empty.")
return None
else:
return quote_plus(sub)
@staticmethod
def extract_external_identities(response_json):
"""
Extract all external (sub, iss) pairs from an ALISE API response.
Returns a list of dicts with 'sub' and 'iss' keys, or None on failure.
"""
try:
external_identities = [
{"sub": entry["sub"], "iss": entry["iss"]}
for entry in response_json.get("external", [])
]
logger.debug("External identities found: %s", external_identities)
return external_identities
except (ValueError, KeyError) as e:
logger.error(
"Failed to parse external identities from ALISE response: %s", e
)
return None
except Exception as e:
logger.error("Unexpected error parsing ALISE response: %s", e)
return None
def get_local_username(self, subject_claim, issuer):
"""
Retrieve the local username from the ALISE API.
This function constructs a request to the ALISE API by hashing the issuer
and URL-encoding the subject_claim. It then sends a GET request to retrieve
the corresponding local username. If the request fails or the response is
invalid, it logs an error and returns None.
"""
hash1 = Alise.hashencode(issuer)
hash2 = Alise.urlencode(subject_claim)
link = (
self.alise_url
+ "/api/v1/target/"
+ self.site
+ "/mapping/issuer/"
+ hash1
+ "/user/"
+ hash2
+ "?apikey="
+ self.apikey
)
logger.debug("Assembled ALISE API URL is %s", link)
try:
response = requests.get(link, timeout=20)
response.raise_for_status() # Raise an HTTPError for 4xx/5xx responses
except requests.ConnectionError as e:
logger.error(
"Can't connect to ALISE API. Network-related error was raised: %s", e
)
return None, None
except requests.Timeout as e:
logger.error("Can't connect to ALISE API. Request timed out: %s", e)
return None, None
except requests.RequestException as e:
logger.error("An error occured during request to ALISE API: %s", e)
return None, None
except Exception as e:
logger.error("Request to ALISE API raised an unexpected error: %s", e)
return None, None
try:
response_json = response.json()
logger.debug(
"This is the json of the response received from the ALISE API: %s.",
response_json,
)
local_username = response_json["internal"]["username"]
external_identities = Alise.extract_external_identities(response_json)
except ValueError as e:
logger.error("Decoding JSON has failed: %s", e)
return None, None
except Exception as e:
logger.error("Local username not found in the json response: %s", e)
return None, None
return local_username, external_identities