Skip to content

Commit

Permalink
Purge DeliveryCfg
Browse files Browse the repository at this point in the history
Was mved to delivery-service in order to (further) decouple
ocm-gear from cc-utils.
  • Loading branch information
zkdev committed Dec 10, 2024
1 parent df075a9 commit 0020c5b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 169 deletions.
23 changes: 0 additions & 23 deletions delivery/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import dacite
import jwt

import model.delivery

logger = logging.getLogger(__name__)
JWT_KEY = 'bearer_token'
Expand Down Expand Up @@ -66,28 +65,6 @@ def from_dict(data: dict) -> typing.Self:
),
)

@staticmethod
def from_signing_cfg(signing_cfg: model.delivery.SigningCfg) -> typing.Self:
algorithm = Algorithm(signing_cfg.algorithm().upper())
use = Use.SIGNATURE
kid = signing_cfg.id()

if algorithm == Algorithm.RS256:
public_key = Crypto.PublicKey.RSA.import_key(signing_cfg.public_key())

return RSAPublicKey(
use=use,
kid=kid,
n=encodeBase64urlUInt(public_key.n),
e=encodeBase64urlUInt(public_key.e),
)
elif algorithm == Algorithm.HS256:
return SymmetricKey(
use=use,
kid=kid,
k=encodeBase64url(signing_cfg.secret().encode('utf-8')),
)


@dataclasses.dataclass(frozen=True, kw_only=True)
class RSAPublicKey(JSONWebKey):
Expand Down
128 changes: 2 additions & 126 deletions model/delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,134 +3,10 @@
# SPDX-License-Identifier: Apache-2.0


import collections.abc
import enum
import model.base

from model.base import (
ModelBase,
NamedModelElement,
)


class DeliveryConfig(NamedModelElement):
'''
Not intended to be instantiated by users of this module
'''
def auth(self):
return Auth(self.raw.get('auth'))

def service(self):
return DeliverySvcCfg(self.raw.get('service'))

def dashboard(self):
return DeliveryDashboardCfg(self.raw.get('dashboard'))

def db_cfg_name(self):
return self.raw.get('db_cfg_name')

def _optional_attributes(self):
yield from super()._optional_attributes()
yield from [
'db_cfg_name',
]

def _required_attributes(self):
yield from super()._required_attributes()
yield from [
'service',
]


class OAuthType(enum.Enum):
GITHUB = 'github'


class Auth(ModelBase):
def oauth_cfgs(self):
return [OAuth(raw) for raw in self.raw.get('oauth_cfgs')]

def oauth_cfg(self, github_cfg):
for oc in self.oauth_cfgs():
if oc.github_cfg() == github_cfg:
return oc
raise KeyError(f'no oauth cfg for {github_cfg}')


class OAuth(ModelBase):
def name(self) -> str:
return self.raw.get('name')

def type(self):
return OAuthType(self.raw.get('type'))

def github_cfg(self):
return self.raw.get('github_cfg')

def oauth_url(self):
return self.raw.get('oauth_url')

def token_url(self):
return self.raw.get('token_url')

def client_id(self):
return self.raw.get('client_id')

def client_secret(self):
return self.raw.get('client_secret')

def scope(self):
return self.raw.get('scope')

def role_bindings(self):
return self.raw.get('role_bindings', [])


class SigningCfg(ModelBase):
def id(self) -> str:
return self.raw.get('id')

def algorithm(self):
return self.raw.get('algorithm', 'HS256')

def secret(self):
return self.raw.get('secret')

def public_key(self):
return self.raw.get('public_key')

def purpose_labels(self) -> list[str]:
return self.raw.get('purpose_labels', [])


class DeliveryDashboardCfg(ModelBase):
def deployment_name(self):
return self.raw.get('deployment_name', 'delivery-dashboard')


class DeliverySvcCfg(ModelBase):
def deployment_name(self):
return self.raw.get('deployment_name', 'delivery-service')

def signing_cfgs(
self,
purpose_label: str = None,
) -> collections.abc.Generator[SigningCfg, None, None]:
cfgs = self.raw.get('signing')

if not purpose_label:
yield from [
SigningCfg(raw) for raw in cfgs
]
return

for raw_signing_cfg in cfgs:
signing_cfg = SigningCfg(raw_signing_cfg)

if purpose_label in signing_cfg.purpose_labels():
yield signing_cfg


class DeliveryEndpointsCfg(NamedModelElement):
class DeliveryEndpointsCfg(model.base.NamedModelElement):
def service_host(self):
return self.raw['service_host']

Expand Down
38 changes: 18 additions & 20 deletions test/delivery/jwt_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,28 @@
import jwt

import delivery.jwt
import model.delivery


ISSUER = 'test-issuer'
private_key = Crypto.PublicKey.RSA.generate(4096)
public_key = private_key.public_key()


@pytest.fixture
def signing_cfg() -> model.delivery.SigningCfg:
private_key = Crypto.PublicKey.RSA.generate(4096)

return model.delivery.SigningCfg({
'id': '0',
'algorithm': delivery.jwt.Algorithm.RS256,
'secret': private_key.export_key(format='PEM'),
'public_key': private_key.public_key().export_key(format='PEM'),
'purpose_labels': [],
})
def json_web_key() -> delivery.jwt.JSONWebKey:
return delivery.jwt.JSONWebKey.from_dict(
data={
'use': delivery.jwt.Use.SIGNATURE,
'kid': 'foo',
'alg': delivery.jwt.Algorithm.RS256,
'n': delivery.jwt.encodeBase64urlUInt(public_key.n),
'e': delivery.jwt.encodeBase64urlUInt(private_key.e)
}
)


@pytest.fixture
def token(signing_cfg) -> str:
def token() -> str:
now = datetime.datetime.now(tz=datetime.timezone.utc)
time_delta = datetime.timedelta(days=730) # 2 years

Expand All @@ -39,21 +41,17 @@ def token(signing_cfg) -> str:
'email_address': '[email protected]',
},
'exp': int((now + time_delta).timestamp()),
'key_id': signing_cfg.id(),
'key_id': '0',
}

return jwt.encode(
payload=token,
key=signing_cfg.secret(),
algorithm=signing_cfg.algorithm(),
key=private_key.export_key(format='PEM'),
algorithm=delivery.jwt.Algorithm.RS256,
)


def test_jwt(signing_cfg, token):
json_web_key = delivery.jwt.JSONWebKey.from_signing_cfg(
signing_cfg=signing_cfg,
)

def test_jwt(json_web_key, token):
# token was just created and thus is not expired yet
assert not delivery.jwt.is_jwt_token_expired(
token=token,
Expand Down

0 comments on commit 0020c5b

Please sign in to comment.