Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/5779.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Privileged Storage Worker
1 change: 1 addition & 0 deletions changes/5787.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Produce vfolder deletion event rather than direct request to Storage Proxy
19 changes: 19 additions & 0 deletions src/ai/backend/common/events/event_types/vfolder/anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,22 @@ def deserialize(cls, value: tuple) -> Self:
@classmethod
def event_name(cls) -> str:
return "vfolder_clone_failure"


@dataclass
class VFolderDeleteRequestEvent(VFolderEvent):
volume: str

def serialize(self) -> tuple:
return (str(self.vfid), self.volume)

@classmethod
def deserialize(cls, value: tuple) -> Self:
return cls(
VFolderID.from_str(value[0]),
value[1],
)

@classmethod
def event_name(cls) -> str:
return "vfolder_delete_request"
1 change: 1 addition & 0 deletions src/ai/backend/manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ async def processors_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
error_monitor=root_ctx.error_monitor,
idle_checker_host=root_ctx.idle_checker_host,
event_dispatcher=root_ctx.event_dispatcher,
event_producer=root_ctx.event_producer,
hook_plugin_ctx=root_ctx.hook_plugin_ctx,
scheduling_controller=root_ctx.scheduling_controller,
deployment_controller=root_ctx.deployment_controller,
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/manager/services/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ai.backend.common.clients.valkey_client.valkey_live.client import ValkeyLiveClient
from ai.backend.common.clients.valkey_client.valkey_stat.client import ValkeyStatClient
from ai.backend.common.etcd import AsyncEtcd
from ai.backend.common.events.dispatcher import EventDispatcher
from ai.backend.common.events.dispatcher import EventDispatcher, EventProducer
from ai.backend.common.events.fetcher import EventFetcher
from ai.backend.common.events.hub.hub import EventHub
from ai.backend.common.plugin.hook import HookPluginContext
Expand Down Expand Up @@ -102,6 +102,7 @@ class ServiceArgs:
error_monitor: ErrorPluginContext
idle_checker_host: IdleCheckerHost
event_dispatcher: EventDispatcher
event_producer: EventProducer
hook_plugin_ctx: HookPluginContext
scheduling_controller: "SchedulingController"
deployment_controller: Optional["DeploymentController"]
Expand Down Expand Up @@ -172,6 +173,7 @@ def create(cls, args: ServiceArgs) -> Self:
args.background_task_manager,
repositories.vfolder.repository,
repositories.user.repository,
args.event_producer,
)
vfolder_file_service = VFolderFileService(
args.config_provider,
Expand Down
17 changes: 15 additions & 2 deletions src/ai/backend/manager/services/vfolder/services/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from ai.backend.common.bgtask.bgtask import BackgroundTaskManager
from ai.backend.common.defs import VFOLDER_GROUP_PERMISSION_MODE
from ai.backend.common.events.dispatcher import EventProducer
from ai.backend.common.events.event_types.vfolder.anycast import VFolderDeleteRequestEvent
from ai.backend.common.types import (
QuotaScopeID,
QuotaScopeType,
Expand Down Expand Up @@ -101,6 +103,7 @@ class VFolderService:
_background_task_manager: BackgroundTaskManager
_vfolder_repository: VfolderRepository
_user_repository: UserRepository
_event_producer: EventProducer

def __init__(
self,
Expand All @@ -109,12 +112,14 @@ def __init__(
background_task_manager: BackgroundTaskManager,
vfolder_repository: VfolderRepository,
user_repository: UserRepository,
event_producer: EventProducer,
) -> None:
self._config_provider = config_provider
self._storage_manager = storage_manager
self._vfolder_repository = vfolder_repository
self._user_repository = user_repository
self._background_task_manager = background_task_manager
self._event_producer = event_producer

async def create(self, action: CreateVFolderAction) -> CreateVFolderActionResult:
user_role = action.user_role
Expand Down Expand Up @@ -494,6 +499,7 @@ async def restore(
return RestoreVFolderFromTrashActionResult(vfolder_uuid=action.vfolder_uuid)

async def _remove_vfolder_from_storage(self, vfolder_data: VFolderData) -> None:
# TODO: Deprecate this function
proxy_name, volume_name = self._storage_manager.get_proxy_and_volume(
vfolder_data.host, is_unmanaged(vfolder_data.unmanaged_path)
)
Expand All @@ -510,6 +516,13 @@ async def _remove_vfolder_from_storage(self, vfolder_data: VFolderData) -> None:
# If the vfolder is not found, just delete it from the repository
log.warning("VFolder {} not found: {}", vfolder_data.id, e)

async def _request_vfolder_deletion(self, vfolder_data: VFolderData) -> None:
_, volume_name = self._storage_manager.get_proxy_and_volume(
vfolder_data.host, is_unmanaged(vfolder_data.unmanaged_path)
)
vfid = VFolderID(vfolder_data.quota_scope_id, vfolder_data.id)
await self._event_producer.anycast_event(VFolderDeleteRequestEvent(vfid, volume_name))

async def delete_forever(
self, action: DeleteForeverVFolderAction
) -> DeleteForeverVFolderActionResult:
Expand All @@ -519,7 +532,7 @@ async def delete_forever(
vfolder_data = await self._vfolder_repository.get_by_id_validated(
action.vfolder_uuid, user.id, user.domain_name
)
await self._remove_vfolder_from_storage(vfolder_data)
await self._request_vfolder_deletion(vfolder_data)
await self._vfolder_repository.delete_vfolders_forever([action.vfolder_uuid])
return DeleteForeverVFolderActionResult(vfolder_uuid=action.vfolder_uuid)

Expand All @@ -532,7 +545,7 @@ async def force_delete(
vfolder_data = await self._vfolder_repository.get_by_id_validated(
action.vfolder_uuid, user.id, user.domain_name
)
await self._remove_vfolder_from_storage(vfolder_data)
await self._request_vfolder_deletion(vfolder_data)
await self._vfolder_repository.delete_vfolders_forever([action.vfolder_uuid])
return ForceDeleteVFolderActionResult(vfolder_uuid=action.vfolder_uuid)

Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ python_distribution(
"backendai_cli_v10": {
"storage": "ai.backend.storage.cli.__main__:main",
"storage.start-server": "ai.backend.storage.server:main",
"storage.start-privileged-worker": "ai.backend.storage.privileged.server:main",
},
},
generate_setup=True,
Expand Down
8 changes: 5 additions & 3 deletions src/ai/backend/storage/api/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from ai.backend.logging import BraceStyleAdapter

from .. import __version__
from ..bgtask.tags import ROOT_PRIVILEGED_TAG
from ..bgtask.tags import PRIVILEGED_WORKER_TAG
from ..bgtask.tasks.clone import VFolderCloneTaskArgs
from ..bgtask.tasks.delete import VFolderDeleteTaskArgs
from ..exception import (
Expand Down Expand Up @@ -398,6 +398,8 @@ class Params(TypedDict):


async def delete_vfolder(request: web.Request) -> web.Response:
# TODO: Deprecate this function

class Params(TypedDict):
volume: str
vfid: VFolderID
Expand All @@ -424,7 +426,7 @@ class Params(TypedDict):
task_id = await ctx.background_task_manager.start_retriable(
TaskName.DELETE_VFOLDER,
delete_args,
tags=[ROOT_PRIVILEGED_TAG],
tags=[PRIVILEGED_WORKER_TAG],
)
data = VFolderDeleteResponse(bgtask_id=task_id).model_dump(mode="json")
return web.json_response(data, status=HTTPStatus.ACCEPTED)
Expand Down Expand Up @@ -1190,7 +1192,7 @@ async def init_manager_app(ctx: RootContext) -> web.Application:
app.router.add_route("PATCH", "/quota-scope", update_quota_scope)
app.router.add_route("DELETE", "/quota-scope/quota", unset_quota)
app.router.add_route("POST", "/folder/create", create_vfolder)
app.router.add_route("POST", "/folder/delete", delete_vfolder)
app.router.add_route("POST", "/folder/delete", delete_vfolder) # TODO: Deprecate this API
app.router.add_route("POST", "/folder/clone", clone_vfolder)
app.router.add_route("GET", "/folder/mount", get_vfolder_mount)
app.router.add_route("GET", "/volume/performance-metric", get_performance_metric)
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/storage/bgtask/tags.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from typing import Final, Literal

ROOT_PRIVILEGED_TAG: Final[Literal["privileged"]] = "privileged"
PRIVILEGED_WORKER_TAG: Final[Literal["privileged"]] = "privileged"
Empty file.
Empty file.
17 changes: 17 additions & 0 deletions src/ai/backend/storage/privileged/bgtask/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from ai.backend.common.bgtask.task.registry import BackgroundTaskHandlerRegistry
from ai.backend.common.events.dispatcher import EventProducer

from ...bgtask.tasks.delete import VFolderDeleteTaskHandler
from ...volumes.pool import VolumePool


class BgtaskHandlerRegistryCreator:
def __init__(self, volume_pool: VolumePool, event_producer: EventProducer) -> None:
self._volume_pool = volume_pool
self._event_producer = event_producer

def create(self) -> BackgroundTaskHandlerRegistry:
registry = BackgroundTaskHandlerRegistry()
registry.register(VFolderDeleteTaskHandler(self._volume_pool, self._event_producer))

return registry
Empty file.
Loading
Loading