Skip to content

Commit b35b5b3

Browse files
committed
feat: Add Privileged Storage Worker
1 parent 117f39a commit b35b5b3

24 files changed

+1106
-3
lines changed

changes/5779.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add Privileged Storage Worker

src/ai/backend/storage/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ python_distribution(
3636
"backendai_cli_v10": {
3737
"storage": "ai.backend.storage.cli.__main__:main",
3838
"storage.start-server": "ai.backend.storage.server:main",
39+
"storage.start-privileged-worker": "ai.backend.storage.privileged.server:main",
3940
},
4041
},
4142
generate_setup=True,

src/ai/backend/storage/api/manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
from ai.backend.logging import BraceStyleAdapter
5858

5959
from .. import __version__
60-
from ..bgtask.tags import ROOT_PRIVILEGED_TAG
60+
from ..bgtask.tags import PRIVILEGED_WORKER_TAG
6161
from ..bgtask.tasks.clone import VFolderCloneTaskArgs
6262
from ..bgtask.tasks.delete import VFolderDeleteTaskArgs
6363
from ..exception import (
@@ -424,7 +424,7 @@ class Params(TypedDict):
424424
task_id = await ctx.background_task_manager.start_retriable(
425425
TaskName.DELETE_VFOLDER,
426426
delete_args,
427-
tags=[ROOT_PRIVILEGED_TAG],
427+
tags=[PRIVILEGED_WORKER_TAG],
428428
)
429429
data = VFolderDeleteResponse(bgtask_id=task_id).model_dump(mode="json")
430430
return web.json_response(data, status=HTTPStatus.ACCEPTED)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from typing import Final, Literal
22

3-
ROOT_PRIVILEGED_TAG: Final[Literal["privileged"]] = "privileged"
3+
PRIVILEGED_WORKER_TAG: Final[Literal["privileged"]] = "privileged"

src/ai/backend/storage/privileged/__init__.py

Whitespace-only changes.

src/ai/backend/storage/privileged/bgtask/__init__.py

Whitespace-only changes.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from ai.backend.common.bgtask.task.registry import BackgroundTaskHandlerRegistry
2+
from ai.backend.common.events.dispatcher import EventProducer
3+
4+
from ...bgtask.tasks.delete import VFolderDeleteTaskHandler
5+
from ...volumes.pool import VolumePool
6+
7+
8+
class BgtaskHandlerRegistryCreator:
9+
def __init__(self, volume_pool: VolumePool, event_producer: EventProducer) -> None:
10+
self._volume_pool = volume_pool
11+
self._event_producer = event_producer
12+
13+
def create(self) -> BackgroundTaskHandlerRegistry:
14+
registry = BackgroundTaskHandlerRegistry()
15+
registry.register(VFolderDeleteTaskHandler(self._volume_pool, self._event_producer))
16+
17+
return registry

src/ai/backend/storage/privileged/bootstrap/__init__.py

Whitespace-only changes.
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import override
4+
5+
from ai.backend.common.stage.types import (
6+
ArgsSpecGenerator,
7+
Provisioner,
8+
ProvisionStage,
9+
)
10+
11+
from ...bgtask.tags import PRIVILEGED_WORKER_TAG
12+
from ..config import StorageProxyPrivilegedWorkerConfig
13+
from .stage.bgtask import (
14+
BgtaskManagerProvisioner,
15+
BgtaskManagerResult,
16+
BgtaskManagerSpec,
17+
BgtaskManagerSpecGenerator,
18+
BgtaskManagerStage,
19+
)
20+
from .stage.etcd import (
21+
EtcdProvisioner,
22+
EtcdResult,
23+
EtcdSpec,
24+
EtcdSpecGenerator,
25+
EtcdStage,
26+
)
27+
from .stage.event_dispatcher import (
28+
EventDispatcherProvisioner,
29+
EventDispatcherResult,
30+
EventDispatcherSpec,
31+
EventDispatcherSpecGenerator,
32+
EventDispatcherStage,
33+
)
34+
from .stage.logger import (
35+
LoggerProvisioner,
36+
LoggerResult,
37+
LoggerSpec,
38+
LoggerSpecGenerator,
39+
LoggerStage,
40+
)
41+
from .stage.message_queue import (
42+
MessageQueueProvisioner,
43+
MessageQueueResult,
44+
MessageQueueSpec,
45+
MessageQueueSpecGenerator,
46+
MessageQueueStage,
47+
)
48+
from .stage.monitor import (
49+
MonitorProvisioner,
50+
MonitorResult,
51+
MonitorSpec,
52+
MonitorSpecGenerator,
53+
MonitorStage,
54+
)
55+
from .stage.redis_config import (
56+
RedisConfigProvisioner,
57+
RedisConfigResult,
58+
RedisConfigSpec,
59+
RedisConfigSpecGenerator,
60+
RedisConfigStage,
61+
)
62+
from .stage.valkey_client import (
63+
ValkeyClientProvisioner,
64+
ValkeyClientResult,
65+
ValkeyClientSpec,
66+
ValkeyClientSpecGenerator,
67+
ValkeyClientStage,
68+
)
69+
from .stage.volume_pool import (
70+
VolumePoolProvisioner,
71+
VolumePoolResult,
72+
VolumePoolSpec,
73+
VolumePoolSpecGenerator,
74+
VolumePoolStage,
75+
)
76+
77+
78+
@dataclass
79+
class BootstrapSpec:
80+
loop: asyncio.AbstractEventLoop
81+
local_config: StorageProxyPrivilegedWorkerConfig
82+
pidx: int
83+
84+
85+
class BootstrapSpecGenerator(ArgsSpecGenerator[BootstrapSpec]):
86+
pass
87+
88+
89+
@dataclass
90+
class BootstrapResult:
91+
logger: LoggerResult
92+
monitor: MonitorResult
93+
etcd: EtcdResult
94+
redis_config: RedisConfigResult
95+
message_queue: MessageQueueResult
96+
event_dispatcher: EventDispatcherResult
97+
valkey_client: ValkeyClientResult
98+
volume_pool: VolumePoolResult
99+
bgtask_manager: BgtaskManagerResult
100+
101+
102+
class BootstrapProvisioner(Provisioner[BootstrapSpec, BootstrapResult]):
103+
@property
104+
@override
105+
def name(self) -> str:
106+
return "storage-worker-bootstrap"
107+
108+
@override
109+
async def setup(self, spec: BootstrapSpec) -> BootstrapResult:
110+
local_config = spec.local_config
111+
sub_logger_stage = LoggerStage(LoggerProvisioner())
112+
sub_logger_spec = LoggerSpec(
113+
is_master=False,
114+
ipc_base_path=local_config.storage_proxy.ipc_base_path,
115+
config=spec.local_config.logging,
116+
)
117+
await sub_logger_stage.setup(LoggerSpecGenerator(sub_logger_spec))
118+
logger_result = await sub_logger_stage.wait_for_resource()
119+
120+
monitor_stage = MonitorStage(MonitorProvisioner())
121+
monitor_spec = MonitorSpec(loop=spec.loop, pidx=spec.pidx, local_config=local_config)
122+
await monitor_stage.setup(MonitorSpecGenerator(monitor_spec))
123+
monitor_result = await monitor_stage.wait_for_resource()
124+
125+
etcd_stage = EtcdStage(EtcdProvisioner())
126+
etcd_spec = EtcdSpec(local_config=local_config)
127+
await etcd_stage.setup(EtcdSpecGenerator(etcd_spec))
128+
etcd_result = await etcd_stage.wait_for_resource()
129+
130+
redis_config_stage = RedisConfigStage(RedisConfigProvisioner())
131+
redis_config_spec = RedisConfigSpec(etcd=etcd_result.etcd)
132+
await redis_config_stage.setup(RedisConfigSpecGenerator(redis_config_spec))
133+
redis_config_result = await redis_config_stage.wait_for_resource()
134+
redis_profile_target = redis_config_result.redis_config.to_redis_profile_target()
135+
136+
mq_stage = MessageQueueStage(MessageQueueProvisioner())
137+
mq_spec = MessageQueueSpec(
138+
local_config=local_config, redis_profile_target=redis_profile_target
139+
)
140+
await mq_stage.setup(MessageQueueSpecGenerator(mq_spec))
141+
mq_result = await mq_stage.wait_for_resource()
142+
143+
event_dispatcher_stage = EventDispatcherStage(EventDispatcherProvisioner())
144+
event_dispatcher_spec = EventDispatcherSpec(
145+
message_queue=mq_result.message_queue,
146+
log_events=local_config.debug.log_events,
147+
event_observer=monitor_result.metric_registry.event,
148+
source_id=None,
149+
)
150+
await event_dispatcher_stage.setup(EventDispatcherSpecGenerator(event_dispatcher_spec))
151+
event_dispatcher_result = await event_dispatcher_stage.wait_for_resource()
152+
153+
valkey_client_stage = ValkeyClientStage(ValkeyClientProvisioner())
154+
valkey_client_spec = ValkeyClientSpec(redis_profile_target)
155+
await valkey_client_stage.setup(ValkeyClientSpecGenerator(valkey_client_spec))
156+
valkey_client_result = await valkey_client_stage.wait_for_resource()
157+
158+
volume_pool_stage = VolumePoolStage(VolumePoolProvisioner())
159+
volume_pool_spec = VolumePoolSpec(
160+
local_config=local_config,
161+
etcd=etcd_result.etcd,
162+
event_dispatcher=event_dispatcher_result.event_dispatcher,
163+
event_producer=event_dispatcher_result.event_producer,
164+
)
165+
await volume_pool_stage.setup(VolumePoolSpecGenerator(volume_pool_spec))
166+
volume_pool_result = await volume_pool_stage.wait_for_resource()
167+
168+
bgtask_manager_stage = BgtaskManagerStage(BgtaskManagerProvisioner())
169+
bgtask_manager_spec = BgtaskManagerSpec(
170+
volume_pool=volume_pool_result.volume_pool,
171+
valkey_client=valkey_client_result.bgtask_client,
172+
event_producer=event_dispatcher_result.event_producer,
173+
node_id=local_config.storage_proxy.node_id,
174+
tags=[PRIVILEGED_WORKER_TAG],
175+
)
176+
await bgtask_manager_stage.setup(BgtaskManagerSpecGenerator(bgtask_manager_spec))
177+
bgtask_manager_result = await bgtask_manager_stage.wait_for_resource()
178+
179+
return BootstrapResult(
180+
logger=logger_result,
181+
monitor=monitor_result,
182+
etcd=etcd_result,
183+
redis_config=redis_config_result,
184+
message_queue=mq_result,
185+
event_dispatcher=event_dispatcher_result,
186+
valkey_client=valkey_client_result,
187+
volume_pool=volume_pool_result,
188+
bgtask_manager=bgtask_manager_result,
189+
)
190+
191+
@override
192+
async def teardown(self, resource: BootstrapResult) -> None:
193+
pass
194+
195+
196+
class BootstrapStage(ProvisionStage[BootstrapSpec, BootstrapResult]):
197+
pass

src/ai/backend/storage/privileged/bootstrap/stage/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)