Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(worker): cleanup worker error handling logic #190

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion runner/app/pipelines/audio_to_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from app.pipelines.base import Pipeline
from app.pipelines.utils import get_model_dir, get_torch_device
from app.pipelines.utils.audio import AudioConverter
from app.utils.errors import InferenceError
from fastapi import File, UploadFile
from huggingface_hub import file_download
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
Expand Down Expand Up @@ -76,7 +77,12 @@ def __call__(self, audio: UploadFile, **kwargs) -> List[File]:
converted_bytes = audio_converter.convert(audio, "mp3")
audio_converter.write_bytes_to_file(converted_bytes, audio)

return self.tm(audio.file.read(), **kwargs)
try:
outputs = self.tm(audio.file.read(), **kwargs)
except Exception as e:
raise InferenceError(original_exception=e)

return outputs

def __str__(self) -> str:
return f"AudioToTextPipeline model_id={self.model_id}"
12 changes: 8 additions & 4 deletions runner/app/pipelines/image_to_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
is_lightning_model,
is_turbo_model,
)
from app.utils.errors import InferenceError
from diffusers import (
AutoPipelineForImage2Image,
EulerAncestralDiscreteScheduler,
Expand Down Expand Up @@ -222,14 +223,17 @@ def __call__(
# Default to 8step
kwargs["num_inference_steps"] = 8

output = self.ldm(prompt, image=image, **kwargs)
try:
outputs = self.ldm(prompt, image=image, **kwargs)
except Exception as e:
raise InferenceError(original_exception=e)

if safety_check:
_, has_nsfw_concept = self._safety_checker.check_nsfw_images(output.images)
_, has_nsfw_concept = self._safety_checker.check_nsfw_images(outputs.images)
else:
has_nsfw_concept = [None] * len(output.images)
has_nsfw_concept = [None] * len(outputs.images)

return output.images, has_nsfw_concept
return outputs.images, has_nsfw_concept

def __str__(self) -> str:
return f"ImageToImagePipeline model_id={self.model_id}"
8 changes: 7 additions & 1 deletion runner/app/pipelines/image_to_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import torch
from app.pipelines.base import Pipeline
from app.pipelines.utils import SafetyChecker, get_model_dir, get_torch_device
from app.utils.errors import InferenceError
from diffusers import StableVideoDiffusionPipeline
from huggingface_hub import file_download
from PIL import ImageFile
Expand Down Expand Up @@ -134,7 +135,12 @@ def __call__(
else:
has_nsfw_concept = [None]

return self.ldm(image, **kwargs).frames, has_nsfw_concept
try:
outputs = self.ldm(image, **kwargs)
except Exception as e:
raise InferenceError(original_exception=e)

return outputs.frames, has_nsfw_concept

def __str__(self) -> str:
return f"ImageToVideoPipeline model_id={self.model_id}"
2 changes: 1 addition & 1 deletion runner/app/pipelines/optim/sfast.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def compile_model(pipe):
except ImportError:
logger.info("xformers not installed, skip")
try:
import triton # noqa: F401
import triton # noqa: F401

config.enable_triton = True
except ImportError:
Expand Down
4 changes: 2 additions & 2 deletions runner/app/pipelines/segment_anything_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

import PIL
from app.pipelines.base import Pipeline
from app.pipelines.utils import get_torch_device, get_model_dir
from app.routes.util import InferenceError
from app.pipelines.utils import get_model_dir, get_torch_device
from app.utils.errors import InferenceError
from PIL import ImageFile
from sam2.sam2_image_predictor import SAM2ImagePredictor

Expand Down
12 changes: 8 additions & 4 deletions runner/app/pipelines/text_to_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
is_turbo_model,
split_prompt,
)
from app.utils.errors import InferenceError
from diffusers import (
AutoPipelineForText2Image,
EulerDiscreteScheduler,
Expand Down Expand Up @@ -263,14 +264,17 @@ def __call__(
)
kwargs.update(neg_prompts)

output = self.ldm(prompt=prompt, **kwargs)
try:
outputs = self.ldm(prompt=prompt, **kwargs)
except Exception as e:
raise InferenceError(original_exception=e)

if safety_check:
_, has_nsfw_concept = self._safety_checker.check_nsfw_images(output.images)
_, has_nsfw_concept = self._safety_checker.check_nsfw_images(outputs.images)
else:
has_nsfw_concept = [None] * len(output.images)
has_nsfw_concept = [None] * len(outputs.images)

return output.images, has_nsfw_concept
return outputs.images, has_nsfw_concept

def __str__(self) -> str:
return f"TextToImagePipeline model_id={self.model_id}"
12 changes: 8 additions & 4 deletions runner/app/pipelines/upscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import PIL
import torch
from app.utils.errors import InferenceError
from app.pipelines.base import Pipeline
from app.pipelines.utils import (
SafetyChecker,
Expand Down Expand Up @@ -113,14 +114,17 @@ def __call__(
if num_inference_steps is None or num_inference_steps < 1:
del kwargs["num_inference_steps"]

output = self.ldm(prompt, image=image, **kwargs)
try:
outputs = self.ldm(prompt, image=image, **kwargs)
except Exception as e:
raise InferenceError(original_exception=e)

if safety_check:
_, has_nsfw_concept = self._safety_checker.check_nsfw_images(output.images)
_, has_nsfw_concept = self._safety_checker.check_nsfw_images(outputs.images)
else:
has_nsfw_concept = [None] * len(output.images)
has_nsfw_concept = [None] * len(outputs.images)

return output.images, has_nsfw_concept
return outputs.images, has_nsfw_concept

def __str__(self) -> str:
return f"UpscalePipeline model_id={self.model_id}"
2 changes: 1 addition & 1 deletion runner/app/pipelines/utils/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class AudioConversionError(Exception):
"""Raised when an audio file cannot be converted."""

def __init__(self, message="Audio conversion failed."):
def __init__(self, message="Audio conversion failed"):
self.message = message
super().__init__(self.message)

Expand Down
37 changes: 23 additions & 14 deletions runner/app/routes/audio_to_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from app.dependencies import get_pipeline
from app.pipelines.base import Pipeline
from app.pipelines.utils.audio import AudioConversionError
from app.routes.util import HTTPError, TextResponse, file_exceeds_max_size, http_error
from app.routes.utils import HTTPError, TextResponse, file_exceeds_max_size, http_error
from app.utils.errors import InferenceError, OutOfMemoryError
from fastapi import APIRouter, Depends, File, Form, UploadFile, status
from fastapi.responses import JSONResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
Expand All @@ -14,39 +15,47 @@

logger = logging.getLogger(__name__)

RESPONSES = {
status.HTTP_400_BAD_REQUEST: {"model": HTTPError},
status.HTTP_401_UNAUTHORIZED: {"model": HTTPError},
status.HTTP_413_REQUEST_ENTITY_TOO_LARGE: {"model": HTTPError},
status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": HTTPError},
}


def handle_pipeline_error(e: Exception) -> JSONResponse:
"""Handles exceptions raised during audio processing.
"""Handles exceptions raised during audio pipeline processing.

Args:
e: The exception raised during audio processing.

Returns:
A JSONResponse with the appropriate error message and status code.
"""
logger.error(f"Audio processing error: {str(e)}") # Log the detailed error
logger.error(f"AudioToText pipeline error: {str(e)}") # Log the detailed error
if "Soundfile is either not in the correct format or is malformed" in str(
e
) or isinstance(e, AudioConversionError):
status_code = status.HTTP_415_UNSUPPORTED_MEDIA_TYPE
error_message = "Unsupported audio format or malformed file."
elif "CUDA out of memory" in str(e) or isinstance(e, OutOfMemoryError):
status_code = status.HTTP_400_BAD_REQUEST
error_message = "Out of memory error."
elif isinstance(e, InferenceError):
status_code = status.HTTP_400_BAD_REQUEST
error_message = str(e)
else:
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
error_message = "Internal server error during audio processing."
error_message = "Audio-to-text pipeline error."

return JSONResponse(
status_code=status_code,
content=http_error(error_message),
)


RESPONSES = {
status.HTTP_400_BAD_REQUEST: {"model": HTTPError},
status.HTTP_401_UNAUTHORIZED: {"model": HTTPError},
status.HTTP_413_REQUEST_ENTITY_TOO_LARGE: {"model": HTTPError},
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE: {"model": HTTPError},
status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": HTTPError},
}


@router.post(
"/audio-to-text",
response_model=TextResponse,
Expand Down Expand Up @@ -76,22 +85,22 @@ async def audio_to_text(
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Bearer"},
content=http_error("Invalid bearer token"),
content=http_error("Invalid bearer token."),
)

if model_id != "" and model_id != pipeline.model_id:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=http_error(
f"pipeline configured with {pipeline.model_id} but called with "
f"{model_id}"
f"{model_id}."
),
)

if file_exceeds_max_size(audio, 50 * 1024 * 1024):
return JSONResponse(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
content=http_error("File size exceeds limit"),
content=http_error("File size exceeds limit."),
)

try:
Expand Down
46 changes: 35 additions & 11 deletions runner/app/routes/image_to_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from app.dependencies import get_pipeline
from app.pipelines.base import Pipeline
from app.routes.util import HTTPError, ImageResponse, http_error, image_to_data_url
from app.routes.utils import HTTPError, ImageResponse, http_error, image_to_data_url
from app.utils.errors import InferenceError, OutOfMemoryError
from fastapi import APIRouter, Depends, File, Form, UploadFile, status
from fastapi.responses import JSONResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
Expand All @@ -18,6 +19,34 @@
logger = logging.getLogger(__name__)


def handle_pipeline_error(e: Exception) -> JSONResponse:
"""Handles exceptions raised during image-to-image pipeline processing.

Args:
e: The exception raised during image-to-image processing.

Returns:
A JSONResponse with the appropriate error message and status code.
"""
logger.error(
f"ImageToImagePipeline pipeline error: {str(e)}"
) # Log the detailed error
if "CUDA out of memory" in str(e) or isinstance(e, OutOfMemoryError):
status_code = status.HTTP_400_BAD_REQUEST
error_message = "Out of memory error. Try reducing input image resolution."
elif isinstance(e, InferenceError):
status_code = status.HTTP_400_BAD_REQUEST
error_message = str(e)
else:
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
error_message = "Image-to-image pipeline error."

return JSONResponse(
status_code=status_code,
content=http_error(error_message),
)


RESPONSES = {
status.HTTP_400_BAD_REQUEST: {"model": HTTPError},
status.HTTP_401_UNAUTHORIZED: {"model": HTTPError},
Expand Down Expand Up @@ -119,15 +148,15 @@ async def image_to_image(
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Bearer"},
content=http_error("Invalid bearer token"),
content=http_error("Invalid bearer token."),
)

if model_id != "" and model_id != pipeline.model_id:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=http_error(
f"pipeline configured with {pipeline.model_id} but called with "
f"{model_id}"
f"{model_id}."
),
)

Expand All @@ -154,15 +183,10 @@ async def image_to_image(
num_images_per_prompt=1,
num_inference_steps=num_inference_steps,
)
images.extend(imgs)
has_nsfw_concept.extend(nsfw_checks)
except Exception as e:
logger.error(f"ImageToImagePipeline error: {e}")
logger.exception(e)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=http_error("ImageToImagePipeline error"),
)
return handle_pipeline_error(e)
images.extend(imgs)
has_nsfw_concept.extend(nsfw_checks)

# TODO: Return None once Go codegen tool supports optional properties
# OAPI 3.1 https://github.com/deepmap/oapi-codegen/issues/373
Expand Down
Loading