From 222ed7cb250c62a47871b6841d1c686156fcd453 Mon Sep 17 00:00:00 2001 From: Graeme Power Date: Tue, 15 Oct 2024 15:47:29 +0100 Subject: [PATCH 1/4] fix: make use of asyncio to lock llama_proxy context --- llama_cpp/server/app.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index cd3255176..39d632d0b 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -5,7 +5,7 @@ import typing import contextlib -from threading import Lock +from asyncio import Lock from functools import partial from typing import Iterator, List, Optional, Union, Dict @@ -70,14 +70,14 @@ def set_llama_proxy(model_settings: List[ModelSettings]): _llama_proxy = LlamaProxy(models=model_settings) -def get_llama_proxy(): +async def get_llama_proxy(): # NOTE: This double lock allows the currently streaming llama model to # check if any other requests are pending in the same thread and cancel # the stream if so. - llama_outer_lock.acquire() + await llama_outer_lock.acquire() release_outer_lock = True try: - llama_inner_lock.acquire() + await llama_inner_lock.acquire() try: llama_outer_lock.release() release_outer_lock = False @@ -267,10 +267,8 @@ async def create_completion( request: Request, body: CreateCompletionRequest, ) -> llama_cpp.Completion: - exit_stack = contextlib.ExitStack() - llama_proxy = await run_in_threadpool( - lambda: exit_stack.enter_context(contextlib.contextmanager(get_llama_proxy)()) - ) + exit_stack = contextlib.AsyncExitStack() + llama_proxy = await exit_stack.enter_async_context(contextlib.asynccontextmanager(get_llama_proxy)()) if llama_proxy is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, @@ -472,10 +470,8 @@ async def create_chat_completion( # where the dependency is cleaned up before a StreamingResponse # is complete. # https://github.com/tiangolo/fastapi/issues/11143 - exit_stack = contextlib.ExitStack() - llama_proxy = await run_in_threadpool( - lambda: exit_stack.enter_context(contextlib.contextmanager(get_llama_proxy)()) - ) + exit_stack = contextlib.AsyncExitStack() + llama_proxy = exit_stack.enter_async_context(contextlib.asynccontextmanager(get_llama_proxy)()) if llama_proxy is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, From ab0b7834f51ccde5641e2654ac5bb278479453da Mon Sep 17 00:00:00 2001 From: Graeme Power Date: Tue, 15 Oct 2024 16:30:07 +0100 Subject: [PATCH 2/4] fix: use aclose instead of close for AsyncExitStack --- llama_cpp/server/app.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 39d632d0b..ffbf71491 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -326,7 +326,7 @@ async def create_completion( def iterator() -> Iterator[llama_cpp.CreateCompletionStreamResponse]: yield first_response yield from iterator_or_completion - exit_stack.close() + exit_stack.aclose() send_chan, recv_chan = anyio.create_memory_object_stream(10) return EventSourceResponse( @@ -336,12 +336,13 @@ def iterator() -> Iterator[llama_cpp.CreateCompletionStreamResponse]: request=request, inner_send_chan=send_chan, iterator=iterator(), - on_complete=exit_stack.close, + on_complete=exit_stack.aclose, ), sep="\n", ping_message_factory=_ping_message_factory, ) else: + await exit_stack.aclose() return iterator_or_completion @@ -517,7 +518,7 @@ async def create_chat_completion( def iterator() -> Iterator[llama_cpp.ChatCompletionChunk]: yield first_response yield from iterator_or_completion - exit_stack.close() + exit_stack.aclose() send_chan, recv_chan = anyio.create_memory_object_stream(10) return EventSourceResponse( @@ -527,13 +528,13 @@ def iterator() -> Iterator[llama_cpp.ChatCompletionChunk]: request=request, inner_send_chan=send_chan, iterator=iterator(), - on_complete=exit_stack.close, + on_complete=exit_stack.aclose, ), sep="\n", ping_message_factory=_ping_message_factory, ) else: - exit_stack.close() + await exit_stack.aclose() return iterator_or_completion From 4da21cedaa5187c767320f1ed7527ddac6c274c5 Mon Sep 17 00:00:00 2001 From: Graeme Power Date: Tue, 15 Oct 2024 16:49:08 +0100 Subject: [PATCH 3/4] fix: don't call exit stack close in stream iterator as it will be called by finally from on_complete anyway --- llama_cpp/server/app.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index ffbf71491..bd2f9d8ed 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -159,7 +159,7 @@ async def get_event_publisher( request: Request, inner_send_chan: MemoryObjectSendStream[typing.Any], iterator: Iterator[typing.Any], - on_complete: typing.Optional[typing.Callable[[], None]] = None, + on_complete: typing.Optional[typing.Callable[[], typing.Awaitable[None]]] = None, ): server_settings = next(get_server_settings()) interrupt_requests = ( @@ -182,7 +182,7 @@ async def get_event_publisher( raise e finally: if on_complete: - on_complete() + await on_complete() def _logit_bias_tokens_to_input_ids( @@ -326,7 +326,6 @@ async def create_completion( def iterator() -> Iterator[llama_cpp.CreateCompletionStreamResponse]: yield first_response yield from iterator_or_completion - exit_stack.aclose() send_chan, recv_chan = anyio.create_memory_object_stream(10) return EventSourceResponse( @@ -518,7 +517,6 @@ async def create_chat_completion( def iterator() -> Iterator[llama_cpp.ChatCompletionChunk]: yield first_response yield from iterator_or_completion - exit_stack.aclose() send_chan, recv_chan = anyio.create_memory_object_stream(10) return EventSourceResponse( From 9ec5460acd65a66a4fc0066ceb587697a49c12c0 Mon Sep 17 00:00:00 2001 From: Graeme Power Date: Mon, 21 Oct 2024 12:57:58 +0100 Subject: [PATCH 4/4] fix: use anyio.Lock instead of asyncio.Lock --- llama_cpp/server/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index bd2f9d8ed..d66a40418 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -5,7 +5,7 @@ import typing import contextlib -from asyncio import Lock +from anyio import Lock from functools import partial from typing import Iterator, List, Optional, Union, Dict