Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/manager/rest-reference/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"info": {
"title": "Backend.AI Manager API",
"description": "Backend.AI Manager REST API specification",
"version": "25.2.0",
"version": "25.3.1",
"contact": {
"name": "Lablup Inc.",
"url": "https://docs.backend.ai",
Expand Down
15 changes: 15 additions & 0 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,21 @@ async def __ainit__(self) -> None:
self.event_producer,
bgtask_observer=self._metric_registry.bgtask,
)
for redis_client_info in (
self.event_producer.redis_client,
self.event_dispatcher.redis_client,
self.redis_stream_pool,
self.redis_stat_pool,
):
try:
await redis_helper.ping_redis_connection(redis_client_info.client)
except (Exception, asyncio.CancelledError) as e:
log.error(
"Failed to connect to redis (redis-name: {0}, error: {1})",
redis_client_info.name,
repr(e),
)
raise

alloc_map_mod.log_alloc_map = self.local_config["debug"]["log-alloc-map"]
computers = await self.load_resources()
Expand Down
16 changes: 15 additions & 1 deletion src/ai/backend/agent/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,13 @@ async def server_main(
scope_prefix_map,
credentials=etcd_credentials,
)
try:
await etcd.get_prefix("config")
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to connect to etcd (error: {0})", repr(e))
if aiomon_started:
monitor.close()
raise

rpc_addr = local_config["agent"]["rpc-listen-addr"]
if not rpc_addr.host:
Expand Down Expand Up @@ -1116,7 +1123,14 @@ async def server_main(
reuse_port=True,
ssl_context=ssl_ctx,
)
await site.start()
try:
await site.start()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to start server (error: {0})", repr(e))
if aiomon_started:
monitor.close()
await runner.cleanup()
raise
log.info("started serving HTTP at {}", service_addr)

# Run!
Expand Down
6 changes: 6 additions & 0 deletions src/ai/backend/common/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,9 @@ async def _subscribe_loop(self) -> None:
)
raise

async def ping(self) -> None:
await redis_helper.ping_redis_connection(self.redis_client.client)


class EventProducer(aobject):
redis_client: RedisConnectionInfo
Expand Down Expand Up @@ -1311,6 +1314,9 @@ async def produce_event(
lambda r: r.xadd(self._stream_key, raw_event), # type: ignore # aio-libs/aioredis-py#1182
)

async def ping(self) -> None:
await redis_helper.ping_redis_connection(self.redis_client.client)


def _generate_consumer_id(node_id: str | None = None) -> str:
h = hashlib.sha1()
Expand Down
7 changes: 6 additions & 1 deletion src/ai/backend/common/events_experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from collections import defaultdict
from collections.abc import AsyncIterable
from typing import Any, Protocol
from typing import Any, Protocol, override

import hiredis
from aiomonitor.task import preserve_termination_log
Expand Down Expand Up @@ -351,3 +351,8 @@ async def _consume_loop(self) -> None:

async def close(self) -> None:
self._closed = True

@override
async def ping(self) -> None:
async with RedisConnection(self.redis_config, db=self.db) as client:
await client.execute(["PING"])
2 changes: 1 addition & 1 deletion src/ai/backend/common/redis_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,5 +547,5 @@ async def ping_redis_connection(redis_client: Redis) -> bool:
try:
return await redis_client.ping()
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
log.exception(f"ping_redis_connection(): Connecting to redis failed: {e}")
log.exception("ping_redis_connection(): Failed to connect to Redis: {0}", repr(e))
raise e
18 changes: 11 additions & 7 deletions src/ai/backend/manager/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,17 @@ async def connect_database(
url = f"postgresql+asyncpg://{urlquote(username)}:{urlquote(password)}@{address}/{urlquote(dbname)}"

version_check_db = create_async_engine(url)
async with version_check_db.begin() as conn:
result = await conn.execute(sa.text("show server_version"))
version_str = result.scalar()
major, minor, *_ = map(int, version_str.partition(" ")[0].split("."))
if (major, minor) < (11, 0):
pgsql_connect_opts["server_settings"].pop("jit")
await version_check_db.dispose()
try:
async with version_check_db.begin() as conn:
result = await conn.execute(sa.text("show server_version"))
version_str = result.scalar()
major, minor, *_ = map(int, version_str.partition(" ")[0].split("."))
if (major, minor) < (11, 0):
pgsql_connect_opts["server_settings"].pop("jit")
await version_check_db.dispose()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to connect and check db version (error: {0})", repr(e))
raise

db = create_async_engine(
url,
Expand Down
37 changes: 31 additions & 6 deletions src/ai/backend/manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ async def shared_config_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
root_ctx.local_config["etcd"]["password"],
root_ctx.local_config["etcd"]["namespace"],
)
await root_ctx.shared_config.reload()
try:
await root_ctx.shared_config.reload()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to connect to etcd (error: {0})", repr(e))
raise
yield
await root_ctx.shared_config.close()

Expand Down Expand Up @@ -402,7 +406,13 @@ async def redis_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
root_ctx.redis_stream,
root_ctx.redis_lock,
):
await redis_helper.ping_redis_connection(redis_info.client)
try:
await redis_helper.ping_redis_connection(redis_info.client)
except (Exception, asyncio.CancelledError) as e:
log.error(
"Failed to connect to redis (redis-name: {0}, error: {1})", redis_info.name, repr(e)
)
raise
yield
await root_ctx.redis_stream.close()
await root_ctx.redis_image.close()
Expand Down Expand Up @@ -438,6 +448,11 @@ async def event_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
root_ctx.shared_config.data["redis"],
db=REDIS_STREAM_DB,
)
try:
await root_ctx.event_producer.ping()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to initiate event producer (error: {0})", repr(e))
raise
Comment on lines +467 to +471
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm… Rather than inserting log.error in every place where an Exception might occur, it seems better to convey exceptions meaningfully and handle them in a common place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which place should the error handler be? I want the logger logs the exception raised in bootstrap steps

root_ctx.event_dispatcher = await event_dispatcher_cls.new(
root_ctx.shared_config.data["redis"],
db=REDIS_STREAM_DB,
Expand All @@ -446,6 +461,11 @@ async def event_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
node_id=root_ctx.local_config["manager"]["id"],
event_observer=root_ctx.metrics.event,
)
try:
await root_ctx.event_dispatcher.ping()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to initiate event dispatcher (error: {0})", repr(e))
raise
yield
await root_ctx.event_producer.close()
await asyncio.sleep(0.2)
Expand Down Expand Up @@ -910,9 +930,9 @@ async def _cleanup_context_wrapper(cctx, app: web.Application) -> AsyncIterator[
try:
async with cctx_instance:
yield
except Exception as e:
exc_info = (type(e), e, e.__traceback__)
log.error("Error initializing cleanup_contexts: {0}", cctx.__name__, exc_info=exc_info)
except Exception:
# Let each context instance handle its own errors to reduce misleading error logs
pass

async def _call_cleanup_context_shutdown_handlers(app: web.Application) -> None:
for cctx in app["_cctx_instances"]:
Expand Down Expand Up @@ -1021,7 +1041,12 @@ async def server_main(
reuse_port=True,
ssl_context=ssl_ctx,
)
await site.start()
try:
await site.start()
except BaseException as e:
log.error("Failed to start server (error: {0})", repr(e))
await runner.cleanup()
raise
public_metrics_port = cast(
Optional[int], root_ctx.local_config["manager"]["public-metrics-port"]
)
Expand Down
54 changes: 41 additions & 13 deletions src/ai/backend/storage/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,17 @@ async def server_main(
try:
etcd = load_shared_config(local_config)
try:
redis_config = redis_config_iv.check(
await etcd.get_prefix("config/redis"),
)
log.info(
"PID: {0} - configured redis_config: {1}",
pidx,
safe_print_redis_config(redis_config),
)
except Exception as e:
log.exception("Unable to read config from etcd")
raise e
raw_redis_config = await etcd.get_prefix("config/redis")
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to connect to etcd (error: {0})", repr(e))
raise

redis_config = redis_config_iv.check(raw_redis_config)
log.info(
"PID: {0} - configured redis_config: {1}",
pidx,
safe_print_redis_config(redis_config),
)
event_dispatcher_cls: type[EventDispatcher] | type[ExperimentalEventDispatcher]
if local_config["storage-proxy"].get("use-experimental-redis-event-dispatcher"):
event_dispatcher_cls = ExperimentalEventDispatcher
Expand All @@ -135,6 +134,11 @@ async def server_main(
db=REDIS_STREAM_DB,
log_events=local_config["debug"]["log-events"],
)
try:
await event_producer.ping()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to initiate event producer (error: {0})", repr(e))
raise
log.info(
"PID: {0} - Event producer created. (redis_config: {1})",
pidx,
Expand All @@ -148,6 +152,11 @@ async def server_main(
consumer_group=EVENT_DISPATCHER_CONSUMER_GROUP,
event_observer=metric_registry,
)
try:
await event_dispatcher.ping()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to initiate event dispatcher (error: {0})", repr(e))
raise
log.info(
"PID: {0} - Event dispatcher created. (redis_config: {1})",
pidx,
Expand Down Expand Up @@ -226,8 +235,27 @@ async def server_main(
reuse_port=True,
ssl_context=manager_ssl_ctx,
)
await client_api_site.start()
await manager_api_site.start()

async def _cleanup():
await manager_api_runner.cleanup()
await client_api_runner.cleanup()
await event_producer.close()
await event_dispatcher.close()
if watcher_client is not None:
await watcher_client.close()

try:
await client_api_site.start()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to start client-facing API server (error: {0})", repr(e))
await _cleanup()
raise
try:
await manager_api_site.start()
except (Exception, asyncio.CancelledError) as e:
log.error("Failed to start manager-facing API server (error: {0})", repr(e))
await _cleanup()
raise
if _is_root():
uid = local_config["storage-proxy"]["user"]
gid = local_config["storage-proxy"]["group"]
Expand Down
7 changes: 6 additions & 1 deletion src/ai/backend/web/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,12 @@ async def on_prepare(request, response):
reuse_port=True,
ssl_context=ssl_ctx,
)
await site.start()
try:
await site.start()
except (BaseException, asyncio.CancelledError) as e:
log.error(f"Failed to start server (error: {repr(e)})")
await runner.cleanup()
raise
log.info("started.")

try:
Expand Down
Loading