Skip to content

Commit

Permalink
starts migrating to computed_fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Skybound1 committed Dec 6, 2023
1 parent aa48783 commit 43733b3
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 191 deletions.
7 changes: 6 additions & 1 deletion icekube/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import json
import logging
import traceback
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from functools import cached_property
from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast

from icekube.models._helpers import load, save
from icekube.relationships import Relationship
Expand Down Expand Up @@ -51,6 +52,10 @@ def __eq__(self, other) -> bool:

return all(getattr(self, x) == getattr(other, x) for x in comparison_points)

@cached_property
def data(self) -> Dict[str, Any]:
return cast(Dict[str, Any], json.loads(self.raw or "{}"))

@model_validator(mode="before")
def inject_missing_required_fields(cls, values):
if not all(load(values, x) for x in ["apiVersion", "kind", "plural"]):
Expand Down
18 changes: 7 additions & 11 deletions icekube/models/clusterrole.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
from __future__ import annotations

import json
from functools import cached_property
from typing import List

from icekube.models._helpers import load, save
from icekube.models.base import Resource
from icekube.models.policyrule import PolicyRule
from pydantic import model_validator
from pydantic.fields import Field
from pydantic import computed_field


class ClusterRole(Resource):
rules: List[PolicyRule] = Field(default_factory=list)
supported_api_groups: List[str] = [
"rbac.authorization.k8s.io",
"authorization.openshift.io",
]

@model_validator(mode="before")
def inject_rules(cls, values):
data = json.loads(load(values, "raw", "{}"))

raw_rules = data.get("rules") or []
@computed_field
@cached_property
def rules(self) -> List[PolicyRule]:
rules = []
raw_rules = self.data.get("rules") or []

for rule in raw_rules:
rules.append(PolicyRule(**rule))

return save(values, "rules", rules)
return rules
31 changes: 12 additions & 19 deletions icekube/models/clusterrolebinding.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from __future__ import annotations

import json
from functools import cached_property
from typing import Any, Dict, List, Optional, Union

from icekube.models._helpers import load, save
from icekube.models.base import RELATIONSHIP, Resource
from icekube.models.clusterrole import ClusterRole
from icekube.models.group import Group
from icekube.models.role import Role
from icekube.models.serviceaccount import ServiceAccount
from icekube.models.user import User
from icekube.relationships import Relationship
from pydantic import model_validator
from pydantic.fields import Field
from pydantic import computed_field


def get_role(
Expand Down Expand Up @@ -62,29 +60,24 @@ def get_subjects(


class ClusterRoleBinding(Resource):
role: Union[ClusterRole, Role]
subjects: List[Union[ServiceAccount, User, Group]] = Field(default_factory=list)
supported_api_groups: List[str] = [
"rbac.authorization.k8s.io",
"authorization.openshift.io",
]

@model_validator(mode="before")
def inject_role_and_subjects(cls, values):
data = json.loads(load(values, "raw", "{}"))

role_ref = data.get("roleRef")
@computed_field
@cached_property
def role(self) -> Union[ClusterRole, Role]:
role_ref = self.data.get("roleRef")
if role_ref:
role = get_role(role_ref)
return get_role(role_ref)
else:
role = ClusterRole(name="")

subjects = get_subjects(data.get("subjects", []))

values = save(values, "role", role)
values = save(values, "subjects", subjects)
return ClusterRole(name="")

return values
@computed_field
@cached_property
def subjects(self) -> List[Union[ServiceAccount, User, Group]]:
return get_subjects(self.data.get("subjects", []))

def relationships(
self,
Expand Down
113 changes: 43 additions & 70 deletions icekube/models/pod.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from __future__ import annotations

import json
from functools import cached_property
from itertools import product
from pathlib import Path
from typing import Any, Dict, List, Optional, cast

from icekube.models._helpers import load, save
from icekube.models.base import RELATIONSHIP, Resource
from icekube.models.node import Node
from icekube.models.secret import Secret
from icekube.models.serviceaccount import ServiceAccount
from icekube.relationships import Relationship
from pydantic import model_validator
from pydantic import computed_field

CAPABILITIES = [
"AUDIT_CONTROL",
Expand Down Expand Up @@ -59,60 +59,40 @@


class Pod(Resource):
service_account: Optional[ServiceAccount]
node: Optional[Node]
containers: List[Dict[str, Any]]
capabilities: List[str]
host_path_volumes: List[str]
privileged: bool
hostPID: bool
hostNetwork: bool
supported_api_groups: List[str] = [""]

@model_validator(mode="before")
def inject_service_account(cls, values):
data = json.loads(load(values, "raw", "{}"))
sa = data.get("spec", {}).get("serviceAccountName")
@computed_field
@cached_property
def service_account(self) -> Optional[ServiceAccount]:
sa = self.data.get("spec", {}).get("serviceAccountName")

if sa:
values = save(
values,
"service_account",
ServiceAccount(
name=sa,
namespace=values.get("namespace"),
),
)
return ServiceAccount(name=sa, namespace=self.namespace)
else:
values = save(values, "service_account", None)
return None

return values
@computed_field
@cached_property
def node(self) -> Optional[Node]:
node = self.data.get("spec", {}).get("nodeName")

@model_validator(mode="before")
def inject_node(cls, values):
data = json.loads(load(values, "raw", "{}"))
node = data.get("spec", {}).get("nodeName")
if node:
values = save(values, "node", Node(name=node))
return Node(name=node)
else:
values = save(values, "node", None)

return values

@model_validator(mode="before")
def inject_containers(cls, values):
data = json.loads(load(values, "raw", "{}"))

return save(values, "containers", data.get("spec", {}).get("containers", []))
return None

@model_validator(mode="before")
def inject_capabilities(cls, values):
data = json.loads(load(values, "raw", "{}"))
@computed_field
@cached_property
def containers(self) -> List[Dict[str, Any]]:
return self.data.get("spec", {}).get("containers", [])

containers = data.get("spec", {}).get("containers", [])
@computed_field
@cached_property
def capabilities(self) -> List[str]:
capabilities = set()

for container in containers:
security_context = container.get("securityContext") or {}
for container in self.containers:
security_context = container.get("security_context") or {}
caps = security_context.get("capabilities") or {}
addl = caps.get("add") or []
addl = [x.upper() for x in addl]
Expand All @@ -124,45 +104,38 @@ def inject_capabilities(cls, values):

capabilities.update(add)

return save(values, "capabilities", list(capabilities))

@model_validator(mode="before")
def inject_privileged(cls, values):
data = json.loads(load(values, "raw", "{}"))
return list(capabilities)

containers = data.get("spec", {}).get("containers", [])
@computed_field
@cached_property
def privileged(self) -> bool:
containers = self.data.get("spec", {}).get("containers", [])
privileged = False
for container in containers:
context = container.get("securityContext") or {}

if context.get("privileged", False):
privileged = True

return save(values, "privileged", privileged)
return privileged

@model_validator(mode="before")
def inject_host_path_volumes(cls, values):
data = json.loads(load(values, "raw", "{}"))
volumes = data.get("spec", {}).get("volumes") or []
@computed_field
@cached_property
def host_path_volumes(self) -> List[str]:
volumes = self.data.get("spec", {}).get("volumes") or []
host_volumes = [x for x in volumes if "hostPath" in x and x["hostPath"]]

return save(
values, "host_path_volumes", [x["hostPath"]["path"] for x in host_volumes]
)

@model_validator(mode="before")
def inject_host_pid(cls, values):
data = json.loads(load(values, "raw", "{}"))

return save(values, "hostPID", data.get("spec", {}).get("hostPID") or False)
return [x["hostPath"]["path"] for x in host_volumes]

@model_validator(mode="before")
def inject_host_network(cls, values):
data = json.loads(load(values, "raw", "{}"))
@computed_field
@cached_property
def host_pid(self) -> bool:
return self.data.get("spec", {}).get("hostPID") or False

return save(
values, "hostNetwork", data.get("spec", {}).get("hostNetwork") or False
)
@computed_field
@cached_property
def host_network(self) -> bool:
return self.data.get("spec", {}).get("hostNetwork") or False

@property
def dangerous_host_path(self) -> bool:
Expand Down
18 changes: 7 additions & 11 deletions icekube/models/role.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
from __future__ import annotations

import json
from functools import cached_property
from typing import List

from icekube.models._helpers import load, save
from icekube.models.base import Resource
from icekube.models.policyrule import PolicyRule
from pydantic import model_validator
from pydantic.fields import Field
from pydantic import computed_field


class Role(Resource):
rules: List[PolicyRule] = Field(default_factory=list)
supported_api_groups: List[str] = [
"rbac.authorization.k8s.io",
"authorization.openshift.io",
]

@model_validator(mode="before")
def inject_role(cls, values):
data = json.loads(load(values, "raw", "{}"))

@computed_field
@cached_property
def rules(self) -> List[PolicyRule]:
rules = []
raw_rules = data.get("rules") or []
raw_rules = self.data.get("rules") or []

for rule in raw_rules:
rules.append(PolicyRule(**rule))

return save(values, "rules", rules)
return rules
32 changes: 13 additions & 19 deletions icekube/models/rolebinding.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from __future__ import annotations

import json
from functools import cached_property
from typing import List, Union

from icekube.models._helpers import load, save
from icekube.models.base import RELATIONSHIP, Resource
from icekube.models.clusterrole import ClusterRole
from icekube.models.clusterrolebinding import get_role, get_subjects
Expand All @@ -12,35 +11,30 @@
from icekube.models.serviceaccount import ServiceAccount
from icekube.models.user import User
from icekube.relationships import Relationship
from pydantic import model_validator
from pydantic.fields import Field
from pydantic import computed_field


class RoleBinding(Resource):
role: Union[ClusterRole, Role]
subjects: List[Union[ServiceAccount, User, Group]] = Field(default_factory=list)
subjects: List[Union[ServiceAccount, User, Group]]
supported_api_groups: List[str] = [
"rbac.authorization.k8s.io",
"authorization.openshift.io",
]

@model_validator(mode="before")
def inject_role_and_subjects(cls, values):
data = json.loads(load(values, "raw", "{}"))

ns = values.get("namespace")

role_ref = data.get("roleRef")
@computed_field
@cached_property
def role(self) -> Union[ClusterRole, Role]:
role_ref = self.data.get("roleRef")

if role_ref:
role = get_role(role_ref, ns)
return get_role(role_ref, self.namespace)
else:
role = ClusterRole(name="")

save(values, "subjects", get_subjects(data.get("subjects", []), ns))
save(values, "role", role)
return ClusterRole(name="")

return values
@computed_field
@cached_property
def subjects(self) -> List[Union[ServiceAccount, User, Group]]:
return get_subjects(self.data.get("subjects", []), self.namespace)

def relationships(
self,
Expand Down
Loading

0 comments on commit 43733b3

Please sign in to comment.