Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emran/wip #220

Draft
wants to merge 14 commits into
base: realtime-ai-experimental
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 18 additions & 30 deletions runner/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
# Based on https://github.com/huggingface/api-inference-community/blob/main/docker_images/diffusers/Dockerfile

FROM nvidia/cuda:12.1.1-cudnn8-runtime-ubuntu20.04
LABEL maintainer="Yondon Fu <[email protected]>"

# Add any system dependency here
# RUN apt-get update -y && apt-get install libXXX -y
FROM nvidia/cuda:12.2.2-cudnn8-devel-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

# Install prerequisites
RUN apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev \
libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \
xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git \
ffmpeg
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev \
libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \
xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git runit libzmq3-dev && \
rm -rf /var/lib/apt/lists/*

# Install pyenv
RUN curl https://pyenv.run | bash
Expand All @@ -25,32 +20,25 @@ ENV PATH $PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH
# Install your desired Python version
ARG PYTHON_VERSION=3.11
RUN pyenv install $PYTHON_VERSION && \
pyenv global $PYTHON_VERSION && \
pyenv rehash
pyenv global $PYTHON_VERSION && \
pyenv rehash

# Upgrade pip and install your desired packages
ARG PIP_VERSION=23.3.2
RUN pip install --no-cache-dir --upgrade pip==${PIP_VERSION} setuptools==69.5.1 wheel==0.43.0 && \
pip install --no-cache-dir torch==2.1.1 torchvision==0.16.1 torchaudio==2.1.1

WORKDIR /app
COPY ./requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt

RUN pip install https://github.com/chengzeyi/stable-fast/releases/download/v1.0.3/stable_fast-1.0.3+torch211cu121-cp311-cp311-manylinux2014_x86_64.whl
RUN pip install --no-cache-dir --upgrade pip==${PIP_VERSION} setuptools==69.5.1 wheel==0.43.0

# Most DL models are quite large in terms of memory, using workers is a HUGE
# slowdown because of the fork and GIL with python.
# Using multiple pods seems like a better default strategy.
# Feel free to override if it does not make sense for your library.
ARG max_workers=1
ENV MAX_WORKERS=$max_workers
# Set environment variables
ENV MAX_WORKERS=1
ENV HUGGINGFACE_HUB_CACHE=/models
ENV DIFFUSERS_CACHE=/models
ENV MODEL_DIR=/models

COPY app/ /app/app
COPY images/ /app/images
COPY bench.py /app/bench.py
# The following steps have been moved to the app Dockerfile:
# - Copying application files
# - Setting up and compiling Go application
# - Setting up runit service directories
# - Creating log directories
# - Setting the init system to runit

CMD ["uvicorn", "app.main:app", "--log-config", "app/cfg/uvicorn_logging_config.json", "--host", "0.0.0.0", "--port", "8000"]
# We're keeping the WORKDIR /app here as it's a good default
WORKDIR /app
13 changes: 13 additions & 0 deletions runner/app/go/ingress/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module github.com/livepeer/ai-worker/runner/app/go/ingress

go 1.22

toolchain go1.22.8

require (
github.com/go-gst/go-glib v1.0.0
github.com/go-gst/go-gst v1.0.0
github.com/pebbe/zmq4 v1.2.11
)

require github.com/mattn/go-pointer v0.0.1 // indirect
8 changes: 8 additions & 0 deletions runner/app/go/ingress/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
github.com/go-gst/go-glib v1.0.0 h1:/Gl3lk3M3MmWoSEtOyH3bpUxMUvO1gUL35A2drbr/K0=
github.com/go-gst/go-glib v1.0.0/go.mod h1:7Ehl6klsMBT94bf+Bic9qRyEkXARhhqpiZnU2PXeO6I=
github.com/go-gst/go-gst v1.0.0 h1:YBzE3JVZvbrnWWb/iGCXuiaOvHQ7HW+xXUBR++EgEtQ=
github.com/go-gst/go-gst v1.0.0/go.mod h1:sQMWMnR98s2B4w52e4IXyGvz75rXV8CZ1bejdPT3KIs=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
173 changes: 173 additions & 0 deletions runner/app/go/ingress/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package main

import (
"fmt"
"log"
"time"

"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/app"
"github.com/go-gst/go-glib/glib"
zmq "github.com/pebbe/zmq4"
)

func main() {
// Initialize GStreamer
gst.Init(nil)

// Create a new pipeline
pipeline, err := gst.NewPipeline("")
if err != nil {
log.Fatalf("Failed to create pipeline: %s", err)
}

// Create elements
src, err := gst.NewElement("filesrc")
if err != nil {
log.Fatalf("Failed to create filesrc: %s", err)
}
src.SetProperty("location", "10s.mp4") // Replace with your MP4 file path

decodebin, err := gst.NewElement("decodebin")
if err != nil {
log.Fatalf("Failed to create decodebin: %s", err)
}

queue, err := gst.NewElement("queue")
if err != nil {
log.Fatalf("Failed to create queue: %s", err)
}
queue.SetProperty("max-size-buffers", uint(1))
queue.SetProperty("max-size-time", uint64(0))
queue.SetProperty("max-size-bytes", uint(0))

videoconvert, err := gst.NewElement("videoconvert")
if err != nil {
log.Fatalf("Failed to create videoconvert: %s", err)
}

videoscale, err := gst.NewElement("videoscale")
if err != nil {
log.Fatalf("Failed to create videoscale: %s", err)
}

capsfilter, err := gst.NewElement("capsfilter")
if err != nil {
log.Fatalf("Failed to create capsfilter: %s", err)
}
capsfilter.SetProperty("caps", gst.NewCapsFromString("video/x-raw,width=512,height=512"))

jpegenc, err := gst.NewElement("jpegenc")
if err != nil {
log.Fatalf("Failed to create jpegenc: %s", err)
}

appsink, err := app.NewAppSink()
if err != nil {
log.Fatalf("Failed to create appsink: %s", err)
}
appsink.SetProperty("max-buffers", uint(1))
appsink.SetProperty("drop", true)
appsink.SetProperty("sync", false)

// Add elements to pipeline
pipeline.AddMany(src, decodebin, queue, videoconvert, videoscale, capsfilter, jpegenc, appsink.Element)

// Link elements
src.Link(decodebin)
queue.Link(videoconvert)
videoconvert.Link(videoscale)
videoscale.Link(capsfilter)
capsfilter.Link(jpegenc)
jpegenc.Link(appsink.Element)

// Connect pad-added signal for decodebin
decodebin.Connect("pad-added", func(element *gst.Element, pad *gst.Pad) {
sinkpad := queue.GetStaticPad("sink")
pad.Link(sinkpad)
})

// Set up ZMQ PUB socket
publisher, err := zmq.NewSocket(zmq.PUB)
if err != nil {
log.Fatalf("Failed to create ZMQ socket: %s", err)
}
defer publisher.Close()

// Set high water mark
err = publisher.SetSndhwm(1)
if err != nil {
log.Fatalf("Failed to set high water mark: %s", err)
}

err = publisher.Bind("tcp://*:5555")
if err != nil {
log.Fatalf("Failed to bind ZMQ socket: %s", err)
}

// Start playing
pipeline.SetState(gst.StatePlaying)

// Variables for frame rate calculation
frameCount := 0
startTime := time.Now()

// Callback function for new samples
appsink.SetCallbacks(&app.SinkCallbacks{
NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
sample := sink.PullSample()
if sample == nil {
log.Printf("Failed to pull sample")
return gst.FlowError
}

buffer := sample.GetBuffer()
data := buffer.Bytes()

// Send frame via ZMQ
_, err = publisher.SendBytes(data, zmq.DONTWAIT)
if err != nil {
log.Printf("Failed to send frame: %s", err)
}

// Update frame count and calculate FPS
frameCount++
elapsed := time.Since(startTime)
if elapsed >= time.Second {
fps := float64(frameCount) / elapsed.Seconds()
log.Printf("Producer FPS: %.2f", fps)
frameCount = 0
startTime = time.Now()
}

return gst.FlowOK
},
})

// Create a main loop
mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false)

// Add a message handler to the pipeline bus
pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
pipeline.BlockSetState(gst.StateNull)
mainLoop.Quit()
case gst.MessageError: // Error messages are always fatal
err := msg.ParseError()
fmt.Println("ERROR:", err.Error())
if debug := err.DebugString(); debug != "" {
fmt.Println("DEBUG:", debug)
}
mainLoop.Quit()
default:
// All messages implement a Stringer. However, this is
// typically an expensive thing to do and should be avoided.
fmt.Println(msg)
}
return true
})

// Run the main loop
mainLoop.Run()
}
3 changes: 3 additions & 0 deletions runner/app/python/infer/StreamDiffusionWrapper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .wrapper import StreamDiffusionWrapper

__all__ = ["StreamDiffusionWrapper"]
Loading
Loading