Skip to content

Commit

Permalink
support definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
jhnnsrs committed Jan 22, 2024
1 parent 4eda355 commit 6edb27c
Show file tree
Hide file tree
Showing 39 changed files with 1,850 additions and 147 deletions.
44 changes: 43 additions & 1 deletion bridge/backends/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from bridge.models import Flavour, Setup, Pod
from bridge.models import Flavour, Setup, Pod, Release
from bridge.backends.errors import NotImplementedByBackendError
from typing import AsyncGenerator
from channels.layers import get_channel_layer, InMemoryChannelLayer
Expand Down Expand Up @@ -46,6 +46,27 @@ async def apull_flavour(self, flavour: Flavour) -> None:


raise NotImplementedByBackendError("Subclasses should implement this!")

async def aget_fitting_flavour(self, release: Release) -> Flavour:
""" A function to get the flavour that best fits a release
This function is called by the API when a user requests a release to be run. This should
cause the flavour that best fits the release to be returned. This function
should return immediately
Args:
release (Release): The release to get the flavour for
Raises:
NotImplementedError: This function should be implemented by subclasses
"""
raise NotImplementedByBackendError("Subclasses should implement this!")


async def ais_image_pulled(self,image: str) -> bool:
""" A function to check if a flavour is pulled"""



async def aup_setup(self, setup: Setup) -> Pod:
Expand Down Expand Up @@ -123,6 +144,27 @@ def awatch_flavour(self, info: Info, flavour: Flavour) -> AsyncGenerator[message
raise NotImplementedByBackendError("Subclasses should implement this!")
pass

def awatch_flavours(self, info: Info) -> AsyncGenerator[messages.FlavourUpdate, None]:
""" A async generator that yields updates to all flavours
This function is called by the API when a user requests to watch all flavours within a graphql
subscription. This function should then yield updates to all flavours, which will be sent to
the user.
Most like this should delegate to the alisten function, which is implemented by the backend
base class and allows for easy listening to channels.
Args:
info (Info): The info object from the graphql query
Raises:
NotImplementedError: This function should be implemented by subclasses
"""
raise NotImplementedByBackendError("Subclasses should implement this!")
pass


async def asend_background(self, function: str, *args: typing.Any, **kwargs: typing.Any) -> None:
Expand Down
157 changes: 92 additions & 65 deletions bridge/backends/docker/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from bridge.backends.base import ContainerBackend
from channels.layers import get_channel_layer
from bridge.backends.messages import PullUpdate, UpUpdate, FlavourUpdate
from bridge.models import Flavour, Setup, Pod
from bridge.models import Flavour, Release, Setup, Pod
from bridge.repo import selectors
from bridge.enums import PodStatus
import docker
import os
from kante.types import Info
Expand Down Expand Up @@ -84,6 +85,45 @@ async def awatch_flavour(self, info: Info, flavour: Flavour) -> AsyncGenerator[F
""" Watches a flavour for updates and sends them to the client """
async for i in self.alisten(info, "whale_pull", FlavourUpdate, groups=[flavour.id]):
yield i

async def awatch_flavours(self, info: Info):
""" Watches a flavour for updates and sends them to the client """
async for i in self.alisten(info, "whale_pull", FlavourUpdate, groups=["all"]):
yield i


async def aget_fitting_flavour(self, release: Release) -> Flavour:

flavour_dict = {}

error_dict = {}

async for flavour in Flavour.objects.filter(release=release).all():
try:
rating = await self.arate_flavour(flavour)
flavour_dict[flavour] = rating
except RateError as e:
logger.warning(f"Flavour {flavour} cannot be deployed")
error_dict[flavour] = e



# Sort flavours by rating
sorted_flavours = sorted(flavour_dict.items(), key=lambda x: x[1])

# Pull the best flavour

try:
best_flavour = sorted_flavours[0][0]
return best_flavour
except IndexError:
logger.error("No flavours available for setup")

error_string = "\n".join([f"{k.name}: {v}" for k, v in error_dict.items()])


raise Exception("No flavours available for setup: " +error_string)




Expand Down Expand Up @@ -111,7 +151,7 @@ async def abackground_pull_flavour(self, flavour_id: str) -> None:
finished = []


await self.abroadcast("whale_pull", FlavourUpdate(status="Pulling", progress=0.5, flavour=flavour.id), groups=[flavour.id])
await self.abroadcast("whale_pull", FlavourUpdate(status="Pulling", progress=0.1, id=flavour.id), groups=[flavour.id, "all"])


for f in s:
Expand All @@ -132,16 +172,26 @@ async def abackground_pull_flavour(self, flavour_id: str) -> None:
"Progress: " + flavour.image + " " + str(progress)
)

await self.abroadcast("whale_pull", FlavourUpdate(status="Pulling", progress=progress, flavour=flavour.id), groups=[flavour.id])
await self.abroadcast("whale_pull", FlavourUpdate(status="Pulling", progress=progress, id=flavour.id), groups=[flavour.id, "all"])


await self.abroadcast("whale_pull", FlavourUpdate(status="Pulling", progress=1, flavour=flavour.id), groups=[flavour.id])
await self.abroadcast("whale_pull", FlavourUpdate(status="Pulled", progress=1, id=flavour.id), groups=[flavour.id, "all"])


async def apull_flavour(self, flavour: Flavour) -> None:
""" A function to pull a flavour"""
await self.asend_background("abackground_pull_flavour", flavour.id)
return None

async def ais_image_pulled(self, image: str) -> bool:
""" A function to check if a flavour is pulled"""


try:
image = self.api.images.get(image)
return True
except docker.errors.ImageNotFound:
return False



Expand All @@ -156,18 +206,12 @@ async def arate_flavour(self, flavour: Flavour) -> int:
Returns:
int: The rating of the flavour (-1 if it cannot be deployed)
Raises:
Raises:str
RateError: If the flavour cannot be deployed
"""

count = 0


try:
self.api.images.get(flavour.image)
except docker.errors.ImageNotFound:
raise RateError(f"Image {flavour.image} not found")

for selector in flavour.get_selectors():
if isinstance(selector, selectors.CudaSelector):
if not self.gpu_available:
Expand All @@ -187,15 +231,51 @@ async def arate_flavour(self, flavour: Flavour) -> int:
return count


async def aup_setup_with_flavour(self, setup: Setup, flavour: Flavour) -> Pod:


async def aget_status(self, pod: Pod) -> PodStatus:
try:
container = self.api.containers.get(pod.pod_id)
return container.status
except docker.errors.NotFound:
return "Not Found"


async def aget_logs(self, pod: Pod) -> str:
try:
container = self.api.containers.get(pod.pod_id)
return container.logs().decode("utf-8")
except docker.errors.NotFound:
return "Not Found"



async def aup_setup(self, setup: Setup) -> Pod:


# Iterate over flavours and pull them



# Create a pod

flavour = await Flavour.objects.aget(
id=setup.flavour.id
)


container_id = f"{setup.id}-{flavour.id}"


try:
# Lets see if a pod already exists
image = self.api.images.get(flavour.image)
except docker.errors.ImageNotFound:
raise Exception("Flavour not pulled. Please pull first")




device_requests = []


Expand Down Expand Up @@ -251,59 +331,6 @@ async def aup_setup_with_flavour(self, setup: Setup, flavour: Flavour) -> Pod:

return pod

async def aget_status(self, pod: Pod) -> str:
try:
container = self.api.containers.get(pod.pod_id)
return container.status
except docker.errors.NotFound:
return "Not Found"


async def aget_logs(self, pod: Pod) -> str:
try:
container = self.api.containers.get(pod.pod_id)
return container.logs().decode("utf-8")
except docker.errors.NotFound:
return "Not Found"



async def aup_setup(self, setup: Setup) -> Pod:


# Iterate over flavours and pull them

flavour_dict = {}

error_dict = {}

async for flavour in Flavour.objects.filter(release=setup.release).all():
try:
rating = await self.arate_flavour(flavour)
flavour_dict[flavour] = rating
except RateError as e:
logger.warning(f"Flavour {flavour} cannot be deployed")
error_dict[flavour] = e



# Sort flavours by rating
sorted_flavours = sorted(flavour_dict.items(), key=lambda x: x[1])

# Pull the best flavour

try:
best_flavour = sorted_flavours[0][0]
except IndexError:
logger.error("No flavours available for setup")

error_string = "\n".join([f"{k.name}: {v}" for k, v in error_dict.items()])


raise Exception("No flavours available for setup: " +error_string)

return await self.aup_setup_with_flavour(setup, best_flavour)




Expand Down
2 changes: 1 addition & 1 deletion bridge/backends/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ class UpUpdate(PullUpdate):


class FlavourUpdate(BaseModel):
flavour: str
id: str
status: str
progress: float
14 changes: 10 additions & 4 deletions bridge/inputs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pydantic import BaseModel
from strawberry.experimental import pydantic

import strawberry

class ScanRepoInputModel(BaseModel):
"""Create a dask cluster input model"""
Expand All @@ -20,6 +20,7 @@ class CreateGithupRepoInputModel(BaseModel):
user: str
branch: str
repo: str
auto_scan: bool = True

@pydantic.input(CreateGithupRepoInputModel, description="Create a new Github repository input")
class CreateGithupRepoInput:
Expand All @@ -28,6 +29,7 @@ class CreateGithupRepoInput:
user: str
branch: str
repo: str
auto_scan: bool | None = True



Expand All @@ -39,24 +41,28 @@ class PullFlavourInputModel(BaseModel):
@pydantic.input(PullFlavourInputModel, description="Create a new Github repository input")
class PullFlavourInput:
""" Create a new Github repository input"""
id: str
id: strawberry.ID



class CreateSetupInputModel(BaseModel):
""" Create a new Github repository input model"""
release: str
flavour: str | None = None
fakts_url: str | None = "lok:80"
fakts_token: str | None = None
command: str | None = "arkitekt prod run"
auto_pull: bool = True

@pydantic.input(CreateSetupInputModel, description="Create a new Github repository input")
class CreateSetupInput:
""" Create a new Github repository input"""
release: str
release: strawberry.ID
flavour: strawberry.ID | None = None
fakts_url: str | None = None
fakts_token: str | None = None
command: str | None = "arkitekt prod run"
auto_pull: bool | None = True

class DeploySetupInputModel(BaseModel):
""" Create a new Github repository input model"""
Expand All @@ -65,4 +71,4 @@ class DeploySetupInputModel(BaseModel):
@pydantic.input(DeploySetupInputModel, description="Create a new Github repository input")
class DeploySetupInput:
""" Create a new Github repository input"""
setup: str
setup: strawberry.ID
Loading

0 comments on commit 6edb27c

Please sign in to comment.