From 43733b3012299361dcf643fbd57cf02c30c64029 Mon Sep 17 00:00:00 2001 From: Mohit Gupta Date: Wed, 6 Dec 2023 00:11:30 +0000 Subject: [PATCH] starts migrating to computed_fields --- icekube/models/base.py | 7 +- icekube/models/clusterrole.py | 18 ++- icekube/models/clusterrolebinding.py | 31 ++--- icekube/models/pod.py | 113 +++++++------------ icekube/models/role.py | 18 ++- icekube/models/rolebinding.py | 32 +++--- icekube/models/secret.py | 41 ++++--- icekube/models/securitycontextconstraints.py | 38 +++---- icekube/models/serviceaccount.py | 23 ++-- 9 files changed, 130 insertions(+), 191 deletions(-) diff --git a/icekube/models/base.py b/icekube/models/base.py index a21a9da..271c376 100644 --- a/icekube/models/base.py +++ b/icekube/models/base.py @@ -3,7 +3,8 @@ import json import logging import traceback -from typing import Any, Dict, List, Optional, Tuple, Type, Union +from functools import cached_property +from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast from icekube.models._helpers import load, save from icekube.relationships import Relationship @@ -51,6 +52,10 @@ def __eq__(self, other) -> bool: return all(getattr(self, x) == getattr(other, x) for x in comparison_points) + @cached_property + def data(self) -> Dict[str, Any]: + return cast(Dict[str, Any], json.loads(self.raw or "{}")) + @model_validator(mode="before") def inject_missing_required_fields(cls, values): if not all(load(values, x) for x in ["apiVersion", "kind", "plural"]): diff --git a/icekube/models/clusterrole.py b/icekube/models/clusterrole.py index 1dc5c6b..93fc813 100644 --- a/icekube/models/clusterrole.py +++ b/icekube/models/clusterrole.py @@ -1,30 +1,26 @@ from __future__ import annotations -import json +from functools import cached_property from typing import List -from icekube.models._helpers import load, save from icekube.models.base import Resource from icekube.models.policyrule import PolicyRule -from pydantic import model_validator -from pydantic.fields import Field +from pydantic import computed_field class ClusterRole(Resource): - rules: List[PolicyRule] = Field(default_factory=list) supported_api_groups: List[str] = [ "rbac.authorization.k8s.io", "authorization.openshift.io", ] - @model_validator(mode="before") - def inject_rules(cls, values): - data = json.loads(load(values, "raw", "{}")) - - raw_rules = data.get("rules") or [] + @computed_field + @cached_property + def rules(self) -> List[PolicyRule]: rules = [] + raw_rules = self.data.get("rules") or [] for rule in raw_rules: rules.append(PolicyRule(**rule)) - return save(values, "rules", rules) + return rules diff --git a/icekube/models/clusterrolebinding.py b/icekube/models/clusterrolebinding.py index a19c6de..1c2e480 100644 --- a/icekube/models/clusterrolebinding.py +++ b/icekube/models/clusterrolebinding.py @@ -1,9 +1,8 @@ from __future__ import annotations -import json +from functools import cached_property from typing import Any, Dict, List, Optional, Union -from icekube.models._helpers import load, save from icekube.models.base import RELATIONSHIP, Resource from icekube.models.clusterrole import ClusterRole from icekube.models.group import Group @@ -11,8 +10,7 @@ from icekube.models.serviceaccount import ServiceAccount from icekube.models.user import User from icekube.relationships import Relationship -from pydantic import model_validator -from pydantic.fields import Field +from pydantic import computed_field def get_role( @@ -62,29 +60,24 @@ def get_subjects( class ClusterRoleBinding(Resource): - role: Union[ClusterRole, Role] - subjects: List[Union[ServiceAccount, User, Group]] = Field(default_factory=list) supported_api_groups: List[str] = [ "rbac.authorization.k8s.io", "authorization.openshift.io", ] - @model_validator(mode="before") - def inject_role_and_subjects(cls, values): - data = json.loads(load(values, "raw", "{}")) - - role_ref = data.get("roleRef") + @computed_field + @cached_property + def role(self) -> Union[ClusterRole, Role]: + role_ref = self.data.get("roleRef") if role_ref: - role = get_role(role_ref) + return get_role(role_ref) else: - role = ClusterRole(name="") - - subjects = get_subjects(data.get("subjects", [])) - - values = save(values, "role", role) - values = save(values, "subjects", subjects) + return ClusterRole(name="") - return values + @computed_field + @cached_property + def subjects(self) -> List[Union[ServiceAccount, User, Group]]: + return get_subjects(self.data.get("subjects", [])) def relationships( self, diff --git a/icekube/models/pod.py b/icekube/models/pod.py index cd8021b..bef47a0 100644 --- a/icekube/models/pod.py +++ b/icekube/models/pod.py @@ -1,17 +1,17 @@ from __future__ import annotations import json +from functools import cached_property from itertools import product from pathlib import Path from typing import Any, Dict, List, Optional, cast -from icekube.models._helpers import load, save from icekube.models.base import RELATIONSHIP, Resource from icekube.models.node import Node from icekube.models.secret import Secret from icekube.models.serviceaccount import ServiceAccount from icekube.relationships import Relationship -from pydantic import model_validator +from pydantic import computed_field CAPABILITIES = [ "AUDIT_CONTROL", @@ -59,60 +59,40 @@ class Pod(Resource): - service_account: Optional[ServiceAccount] - node: Optional[Node] - containers: List[Dict[str, Any]] - capabilities: List[str] - host_path_volumes: List[str] - privileged: bool - hostPID: bool - hostNetwork: bool supported_api_groups: List[str] = [""] - @model_validator(mode="before") - def inject_service_account(cls, values): - data = json.loads(load(values, "raw", "{}")) - sa = data.get("spec", {}).get("serviceAccountName") + @computed_field + @cached_property + def service_account(self) -> Optional[ServiceAccount]: + sa = self.data.get("spec", {}).get("serviceAccountName") + if sa: - values = save( - values, - "service_account", - ServiceAccount( - name=sa, - namespace=values.get("namespace"), - ), - ) + return ServiceAccount(name=sa, namespace=self.namespace) else: - values = save(values, "service_account", None) + return None - return values + @computed_field + @cached_property + def node(self) -> Optional[Node]: + node = self.data.get("spec", {}).get("nodeName") - @model_validator(mode="before") - def inject_node(cls, values): - data = json.loads(load(values, "raw", "{}")) - node = data.get("spec", {}).get("nodeName") if node: - values = save(values, "node", Node(name=node)) + return Node(name=node) else: - values = save(values, "node", None) - - return values - - @model_validator(mode="before") - def inject_containers(cls, values): - data = json.loads(load(values, "raw", "{}")) - - return save(values, "containers", data.get("spec", {}).get("containers", [])) + return None - @model_validator(mode="before") - def inject_capabilities(cls, values): - data = json.loads(load(values, "raw", "{}")) + @computed_field + @cached_property + def containers(self) -> List[Dict[str, Any]]: + return self.data.get("spec", {}).get("containers", []) - containers = data.get("spec", {}).get("containers", []) + @computed_field + @cached_property + def capabilities(self) -> List[str]: capabilities = set() - for container in containers: - security_context = container.get("securityContext") or {} + for container in self.containers: + security_context = container.get("security_context") or {} caps = security_context.get("capabilities") or {} addl = caps.get("add") or [] addl = [x.upper() for x in addl] @@ -124,13 +104,12 @@ def inject_capabilities(cls, values): capabilities.update(add) - return save(values, "capabilities", list(capabilities)) - - @model_validator(mode="before") - def inject_privileged(cls, values): - data = json.loads(load(values, "raw", "{}")) + return list(capabilities) - containers = data.get("spec", {}).get("containers", []) + @computed_field + @cached_property + def privileged(self) -> bool: + containers = self.data.get("spec", {}).get("containers", []) privileged = False for container in containers: context = container.get("securityContext") or {} @@ -138,31 +117,25 @@ def inject_privileged(cls, values): if context.get("privileged", False): privileged = True - return save(values, "privileged", privileged) + return privileged - @model_validator(mode="before") - def inject_host_path_volumes(cls, values): - data = json.loads(load(values, "raw", "{}")) - volumes = data.get("spec", {}).get("volumes") or [] + @computed_field + @cached_property + def host_path_volumes(self) -> List[str]: + volumes = self.data.get("spec", {}).get("volumes") or [] host_volumes = [x for x in volumes if "hostPath" in x and x["hostPath"]] - return save( - values, "host_path_volumes", [x["hostPath"]["path"] for x in host_volumes] - ) - - @model_validator(mode="before") - def inject_host_pid(cls, values): - data = json.loads(load(values, "raw", "{}")) - - return save(values, "hostPID", data.get("spec", {}).get("hostPID") or False) + return [x["hostPath"]["path"] for x in host_volumes] - @model_validator(mode="before") - def inject_host_network(cls, values): - data = json.loads(load(values, "raw", "{}")) + @computed_field + @cached_property + def host_pid(self) -> bool: + return self.data.get("spec", {}).get("hostPID") or False - return save( - values, "hostNetwork", data.get("spec", {}).get("hostNetwork") or False - ) + @computed_field + @cached_property + def host_network(self) -> bool: + return self.data.get("spec", {}).get("hostNetwork") or False @property def dangerous_host_path(self) -> bool: diff --git a/icekube/models/role.py b/icekube/models/role.py index b26b6af..e80b23e 100644 --- a/icekube/models/role.py +++ b/icekube/models/role.py @@ -1,30 +1,26 @@ from __future__ import annotations -import json +from functools import cached_property from typing import List -from icekube.models._helpers import load, save from icekube.models.base import Resource from icekube.models.policyrule import PolicyRule -from pydantic import model_validator -from pydantic.fields import Field +from pydantic import computed_field class Role(Resource): - rules: List[PolicyRule] = Field(default_factory=list) supported_api_groups: List[str] = [ "rbac.authorization.k8s.io", "authorization.openshift.io", ] - @model_validator(mode="before") - def inject_role(cls, values): - data = json.loads(load(values, "raw", "{}")) - + @computed_field + @cached_property + def rules(self) -> List[PolicyRule]: rules = [] - raw_rules = data.get("rules") or [] + raw_rules = self.data.get("rules") or [] for rule in raw_rules: rules.append(PolicyRule(**rule)) - return save(values, "rules", rules) + return rules diff --git a/icekube/models/rolebinding.py b/icekube/models/rolebinding.py index 4b270ad..ee7b9e2 100644 --- a/icekube/models/rolebinding.py +++ b/icekube/models/rolebinding.py @@ -1,9 +1,8 @@ from __future__ import annotations -import json +from functools import cached_property from typing import List, Union -from icekube.models._helpers import load, save from icekube.models.base import RELATIONSHIP, Resource from icekube.models.clusterrole import ClusterRole from icekube.models.clusterrolebinding import get_role, get_subjects @@ -12,35 +11,30 @@ from icekube.models.serviceaccount import ServiceAccount from icekube.models.user import User from icekube.relationships import Relationship -from pydantic import model_validator -from pydantic.fields import Field +from pydantic import computed_field class RoleBinding(Resource): - role: Union[ClusterRole, Role] - subjects: List[Union[ServiceAccount, User, Group]] = Field(default_factory=list) + subjects: List[Union[ServiceAccount, User, Group]] supported_api_groups: List[str] = [ "rbac.authorization.k8s.io", "authorization.openshift.io", ] - @model_validator(mode="before") - def inject_role_and_subjects(cls, values): - data = json.loads(load(values, "raw", "{}")) - - ns = values.get("namespace") - - role_ref = data.get("roleRef") + @computed_field + @cached_property + def role(self) -> Union[ClusterRole, Role]: + role_ref = self.data.get("roleRef") if role_ref: - role = get_role(role_ref, ns) + return get_role(role_ref, self.namespace) else: - role = ClusterRole(name="") - - save(values, "subjects", get_subjects(data.get("subjects", []), ns)) - save(values, "role", role) + return ClusterRole(name="") - return values + @computed_field + @cached_property + def subjects(self) -> List[Union[ServiceAccount, User, Group]]: + return get_subjects(self.data.get("subjects", []), self.namespace) def relationships( self, diff --git a/icekube/models/secret.py b/icekube/models/secret.py index ba05730..88395d4 100644 --- a/icekube/models/secret.py +++ b/icekube/models/secret.py @@ -1,40 +1,39 @@ from __future__ import annotations import json -from typing import Any, Dict, List, cast +from functools import cached_property +from typing import Any, Dict, List, cast, Optional -from icekube.models._helpers import load, save from icekube.models.base import RELATIONSHIP, Resource from icekube.relationships import Relationship -from pydantic import model_validator +from pydantic import computed_field, field_validator class Secret(Resource): - secret_type: str - annotations: Dict[str, Any] supported_api_groups: List[str] = [""] - @model_validator(mode="before") - def remove_secret_data(cls, values): - data = json.loads(load(values, "raw", "{}")) - if "data" in data: - del data["data"] + @field_validator("raw") + @classmethod + def remove_secret_data(cls, v: Optional[str]) -> Optional[str]: + if v: + data = json.loads(v) - return save(values, "raw", json.dumps(data)) + if "data" in data: + del data["data"] - @model_validator(mode="before") - def extract_type(cls, values): - data = json.loads(load(values, "raw", "{}")) + return json.dumps(data) - return save(values, "secret_type", data.get("type", "")) + return v - @model_validator(mode="before") - def extract_annotations(cls, values): - data = json.loads(load(values, "raw", "{}")) + @computed_field + @cached_property + def secret_type(self) -> str: + return cast(str, self.data.get("type", "")) - return save( - values, "annotations", data.get("metadata", {}).get("annotations") or {} - ) + @computed_field + @cached_property + def annotations(self) -> Dict[str, Any]: + return self.data.get("metadata", {}).get("annotations") or {} def relationships(self, initial: bool = True) -> List[RELATIONSHIP]: relationships = super().relationships() diff --git a/icekube/models/securitycontextconstraints.py b/icekube/models/securitycontextconstraints.py index 5bf4e22..9ee4236 100644 --- a/icekube/models/securitycontextconstraints.py +++ b/icekube/models/securitycontextconstraints.py @@ -1,51 +1,41 @@ from __future__ import annotations -import json +from functools import cached_property from typing import List, Union -from icekube.models._helpers import load, save from icekube.models.base import RELATIONSHIP, Resource from icekube.models.group import Group from icekube.models.serviceaccount import ServiceAccount from icekube.models.user import User -from pydantic import model_validator -from pydantic.fields import Field +from pydantic import computed_field class SecurityContextConstraints(Resource): plural: str = "securitycontextconstraints" - users: List[Union[User, ServiceAccount]] = Field(default_factory=list) groups: List[Group] supported_api_groups: List[str] = ["security.openshift.io"] - @model_validator(mode="before") - def inject_users_and_groups(cls, values): - data = json.loads(load(values, "raw", "{}")) + @computed_field + @cached_property + def users(self) -> List[Union[User, ServiceAccount]]: + users = [] + raw_users = self.data.get("users", []) - raw_users = data.get("users", []) - users: List[Union[User, ServiceAccount]] = [] for user in raw_users: if user.startswith("system:serviceaccount:"): ns, name = user.split(":")[2:] - users.append( - ServiceAccount( - ServiceAccount, - name=name, - namespace=ns, - ), - ) + users.append(ServiceAccount(name=name, namespace=ns)) else: users.append(User(name=user)) - groups = [] - raw_groups = data.get("groups", []) - for group in raw_groups: - groups.append(Group(name=group)) + return users - values = save(values, "users", users) - values = save(values, "groups", groups) + @computed_field + @cached_property + def groups(self) -> List[Group]: + raw_groups = self.data.get("groups", []) - return values + return [Group(name=x) for x in raw_groups] def relationships(self, initial: bool = True) -> List[RELATIONSHIP]: relationships = super().relationships() diff --git a/icekube/models/serviceaccount.py b/icekube/models/serviceaccount.py index 1e8ca95..4530918 100644 --- a/icekube/models/serviceaccount.py +++ b/icekube/models/serviceaccount.py @@ -1,36 +1,29 @@ from __future__ import annotations -import json +from functools import cached_property from typing import List -from icekube.models._helpers import load, save from icekube.models.base import RELATIONSHIP, Resource from icekube.models.secret import Secret from icekube.relationships import Relationship -from pydantic import model_validator -from pydantic.fields import Field +from pydantic import computed_field class ServiceAccount(Resource): - secrets: List[Secret] = Field(default_factory=list) supported_api_groups: List[str] = [""] - @model_validator(mode="before") - def inject_secrets(cls, values): - data = json.loads(load(values, "raw", "{}")) - + @computed_field + @cached_property + def secrets(self) -> List[Secret]: secrets = [] - raw_secrets = data.get("secrets") or [] + raw_secrets = self.data.get("secrets") or [] for secret in raw_secrets: secrets.append( - Secret( # type: ignore - name=secret.get("name", ""), - namespace=data.get("metadata", {}).get("namespace", ""), - ) + Secret(name=secret.get("name", ""), namespace=self.namespace), ) - return save(values, "secrets", secrets) + return secrets def relationships( self,