Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emran/wip 3 #224

Draft
wants to merge 35 commits into
base: realtime-ai-experimental
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
25a1181
add streamdiffusion
emranemran Oct 1, 2024
14f6aec
runner/docker: add ffmpeg + streamdiffusion and update base image
emranemran Oct 3, 2024
3412f1e
runner/docker: add runit and turn processes to use runit to launch
emranemran Oct 4, 2024
008539b
wip: i think it works so far
emranemran Oct 4, 2024
9a0406d
wip: works but no gst
emranemran Oct 4, 2024
4d28610
works now with gst but test ffmpeg with the cp fix
emranemran Oct 5, 2024
ce9428e
works with new apps dockerfile
emranemran Oct 6, 2024
aec260a
wip
emranemran Oct 7, 2024
fcaf2b8
add gstreamer pipeline
emranemran Oct 7, 2024
ed0c243
refactor a bunch + add python version of ingress pipeline
emranemran Oct 7, 2024
9c31aea
fix /app nesting
emranemran Oct 7, 2024
c9520e2
infer: Add infer app
victorges Oct 9, 2024
f693332
infer: Add streamdiffusion to requirements
victorges Oct 9, 2024
88f5f7b
infer: Fix stream-diffusion error
victorges Oct 9, 2024
f3a1a26
ingress: read from fd instead of filesrc
emranemran Oct 9, 2024
d2a9e44
update ingress
emranemran Oct 9, 2024
6884443
ingress: make it resilient to restarts on sender side
emranemran Oct 9, 2024
30a92fb
uvicorn: do not start this service
emranemran Oct 9, 2024
0f4c175
Revert "infer: Fix stream-diffusion error"
victorges Oct 9, 2024
eee88a7
Revert "infer: Add streamdiffusion to requirements"
victorges Oct 9, 2024
c4cc37e
Fix streamdiffusion installation
victorges Oct 9, 2024
50c08fb
ingress: load via runit and fix logging to stdout
emranemran Oct 10, 2024
c859f12
infer: start via runit
emranemran Oct 10, 2024
c491adf
try to fix nvidia driver/library mismatch issue
emranemran Oct 10, 2024
acbc0fc
Revert "try to fix nvidia driver/library mismatch issue"
victorges Oct 10, 2024
c4adb9e
Stop cloning streamdiffusion repo
victorges Oct 10, 2024
dd0cea7
egress: add script + fix cuda
emranemran Oct 11, 2024
009015e
egress: fixes
emranemran Oct 11, 2024
acdd8bb
egress: fix output
emranemran Oct 11, 2024
aecdc4f
egress: launch via runit
emranemran Oct 11, 2024
efdc1f7
README: add quick build instructions
emranemran Oct 11, 2024
b62514a
infer: Add /api path prefix
victorges Oct 11, 2024
79f3da6
go.mod
emranemran Oct 11, 2024
bcd8eba
egress: make egress output fps a bit smoother
emranemran Oct 15, 2024
aa86598
infer: Turn off similar images, improve liveportrait
victorges Oct 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/livepeer/ai-worker

go 1.21.5
go 1.22

toolchain go1.22.0

require (
github.com/deepmap/oapi-codegen/v2 v2.2.0
Expand All @@ -9,7 +11,9 @@ require (
github.com/docker/go-connections v0.4.0
github.com/getkin/kin-openapi v0.124.0
github.com/go-chi/chi/v5 v5.0.12
github.com/go-gst/go-gst v1.3.0
github.com/oapi-codegen/runtime v1.1.1
github.com/pebbe/zmq4 v1.2.11
github.com/vincent-petithory/dataurl v1.0.0
)

Expand All @@ -19,13 +23,15 @@ require (
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/go-gst/go-glib v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.20.2 // indirect
github.com/go-openapi/swag v0.22.8 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/invopop/yaml v0.2.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/morikuni/aec v1.0.0 // indirect
Expand All @@ -34,6 +40,7 @@ require (
github.com/perimeterx/marshmallow v1.1.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ github.com/getkin/kin-openapi v0.124.0 h1:VSFNMB9C9rTKBnQ/fpyDU8ytMTr4dWI9QovSKj
github.com/getkin/kin-openapi v0.124.0/go.mod h1:wb1aSZA/iWmorQP9KTAS/phLj/t17B5jT7+fS8ed9NM=
github.com/go-chi/chi/v5 v5.0.12 h1:9euLV5sTrTNTRUU9POmDUvfxyj6LAABLUcEWO+JJb4s=
github.com/go-chi/chi/v5 v5.0.12/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-gst/go-glib v1.3.0 h1:u+mPUdLmrDFA/MskIxInJY+M0O1RSkHeZYggnJGWlPk=
github.com/go-gst/go-glib v1.3.0/go.mod h1:JybIYeoHNwCkHGaBf1fHNIaM4sQTrJPkPLsi7dmPNOU=
github.com/go-gst/go-gst v1.3.0 h1:z4mQ7CNJXd6ZfkibzIT9kZKwtgEFJo7jJGlX9cXFzz0=
github.com/go-gst/go-gst v1.3.0/go.mod h1:2li6ghiCBz7/R6DA7itVto3gsYh0QKicwSxEefNVYqE=
github.com/go-openapi/jsonpointer v0.20.2 h1:mQc3nmndL8ZBzStEo3JYF8wzmeWffDH4VbXz58sAx6Q=
github.com/go-openapi/jsonpointer v0.20.2/go.mod h1:bHen+N0u1KEO3YlmqOjTT9Adn1RfD91Ar825/PuiRVs=
github.com/go-openapi/swag v0.22.8 h1:/9RjDSQ0vbFR+NyjGMkFTsA1IA0fmhKSThmfGZjicbw=
Expand All @@ -52,6 +56,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
Expand All @@ -64,6 +70,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s=
github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -89,6 +97,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
Expand Down
50 changes: 19 additions & 31 deletions runner/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
# Based on https://github.com/huggingface/api-inference-community/blob/main/docker_images/diffusers/Dockerfile

FROM nvidia/cuda:12.1.1-cudnn8-runtime-ubuntu20.04
LABEL maintainer="Yondon Fu <[email protected]>"

# Add any system dependency here
# RUN apt-get update -y && apt-get install libXXX -y
FROM nvidia/cuda:12.2.2-cudnn8-devel-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

# Install prerequisites
RUN apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev \
libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \
xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git \
ffmpeg
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev \
libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \
xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git runit libzmq3-dev && \
rm -rf /var/lib/apt/lists/*

# Install pyenv
RUN curl https://pyenv.run | bash
Expand All @@ -23,34 +18,27 @@ ENV PYENV_ROOT /root/.pyenv
ENV PATH $PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH

# Install your desired Python version
ARG PYTHON_VERSION=3.11
ARG PYTHON_VERSION=3.10
RUN pyenv install $PYTHON_VERSION && \
pyenv global $PYTHON_VERSION && \
pyenv rehash
pyenv global $PYTHON_VERSION && \
pyenv rehash

# Upgrade pip and install your desired packages
ARG PIP_VERSION=23.3.2
RUN pip install --no-cache-dir --upgrade pip==${PIP_VERSION} setuptools==69.5.1 wheel==0.43.0 && \
pip install --no-cache-dir torch==2.1.1 torchvision==0.16.1 torchaudio==2.1.1

WORKDIR /app
COPY ./requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt

RUN pip install https://github.com/chengzeyi/stable-fast/releases/download/v1.0.3/stable_fast-1.0.3+torch211cu121-cp311-cp311-manylinux2014_x86_64.whl
RUN pip install --no-cache-dir --upgrade pip==${PIP_VERSION} setuptools==69.5.1 wheel==0.43.0

# Most DL models are quite large in terms of memory, using workers is a HUGE
# slowdown because of the fork and GIL with python.
# Using multiple pods seems like a better default strategy.
# Feel free to override if it does not make sense for your library.
ARG max_workers=1
ENV MAX_WORKERS=$max_workers
# Set environment variables
ENV MAX_WORKERS=1
ENV HUGGINGFACE_HUB_CACHE=/models
ENV DIFFUSERS_CACHE=/models
ENV MODEL_DIR=/models

COPY app/ /app/app
COPY images/ /app/images
COPY bench.py /app/bench.py
# The following steps have been moved to the app Dockerfile:
# - Copying application files
# - Setting up and compiling Go application
# - Setting up runit service directories
# - Creating log directories
# - Setting the init system to runit

CMD ["uvicorn", "app.main:app", "--log-config", "app/cfg/uvicorn_logging_config.json", "--host", "0.0.0.0", "--port", "8000"]
# We're keeping the WORKDIR /app here as it's a good default
WORKDIR /app
9 changes: 9 additions & 0 deletions runner/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# How to run
docker build --no-cache -t livepeer/ai-runner:base .
docker build --no-cache -t livepeer/ai-runner:multimedia -f docker/Dockerfile.multimedia .
docker build --no-cache -t livepeer/ai-runner:stream-diffusion -f docker/Dockerfile.stream-diffusion .
docker build --no-cache -t livepeer/ai-runner:apps -f docker/Dockerfile.apps .
docker run -d --add-host=host.docker.internal:host-gateway --gpus all -p 10420:8080 --name realtime -v ~/ai-worker/runner/app:/app -v /tmp/video:/tmp/video livepeer/ai-runner:apps
docker exec -it realtime /bin/bash


# Runner

The AI runner is a containerized Python application responsible for processes AI inference requests on the [Livepeer AI subnet](https://explorer.livepeer.org/treasury/82843445347363563575858115586375001878287509193479217286690041153234635982713). It loads models into GPU memory and exposes a REST API other programs like the [AI worker](../README.md) can use to request AI inference requests.
Expand Down
13 changes: 13 additions & 0 deletions runner/app/go/ingress/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module github.com/livepeer/ai-worker/runner/app/go/ingress

go 1.22

toolchain go1.22.8

require (
github.com/go-gst/go-glib v1.0.0
github.com/go-gst/go-gst v1.0.0
github.com/pebbe/zmq4 v1.2.11
)

require github.com/mattn/go-pointer v0.0.1 // indirect
8 changes: 8 additions & 0 deletions runner/app/go/ingress/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
github.com/go-gst/go-glib v1.0.0 h1:/Gl3lk3M3MmWoSEtOyH3bpUxMUvO1gUL35A2drbr/K0=
github.com/go-gst/go-glib v1.0.0/go.mod h1:7Ehl6klsMBT94bf+Bic9qRyEkXARhhqpiZnU2PXeO6I=
github.com/go-gst/go-gst v1.0.0 h1:YBzE3JVZvbrnWWb/iGCXuiaOvHQ7HW+xXUBR++EgEtQ=
github.com/go-gst/go-gst v1.0.0/go.mod h1:sQMWMnR98s2B4w52e4IXyGvz75rXV8CZ1bejdPT3KIs=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
173 changes: 173 additions & 0 deletions runner/app/go/ingress/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package main

import (
"fmt"
"log"
"time"

"github.com/go-gst/go-gst/gst"
"github.com/go-gst/go-gst/gst/app"
"github.com/go-gst/go-glib/glib"
zmq "github.com/pebbe/zmq4"
)

func main() {
// Initialize GStreamer
gst.Init(nil)

// Create a new pipeline
pipeline, err := gst.NewPipeline("")
if err != nil {
log.Fatalf("Failed to create pipeline: %s", err)
}

// Create elements
src, err := gst.NewElement("filesrc")
if err != nil {
log.Fatalf("Failed to create filesrc: %s", err)
}
src.SetProperty("location", "10s.mp4") // Replace with your MP4 file path

decodebin, err := gst.NewElement("decodebin")
if err != nil {
log.Fatalf("Failed to create decodebin: %s", err)
}

queue, err := gst.NewElement("queue")
if err != nil {
log.Fatalf("Failed to create queue: %s", err)
}
queue.SetProperty("max-size-buffers", uint(1))
queue.SetProperty("max-size-time", uint64(0))
queue.SetProperty("max-size-bytes", uint(0))

videoconvert, err := gst.NewElement("videoconvert")
if err != nil {
log.Fatalf("Failed to create videoconvert: %s", err)
}

videoscale, err := gst.NewElement("videoscale")
if err != nil {
log.Fatalf("Failed to create videoscale: %s", err)
}

capsfilter, err := gst.NewElement("capsfilter")
if err != nil {
log.Fatalf("Failed to create capsfilter: %s", err)
}
capsfilter.SetProperty("caps", gst.NewCapsFromString("video/x-raw,width=512,height=512"))

jpegenc, err := gst.NewElement("jpegenc")
if err != nil {
log.Fatalf("Failed to create jpegenc: %s", err)
}

appsink, err := app.NewAppSink()
if err != nil {
log.Fatalf("Failed to create appsink: %s", err)
}
appsink.SetProperty("max-buffers", uint(1))
appsink.SetProperty("drop", true)
appsink.SetProperty("sync", false)

// Add elements to pipeline
pipeline.AddMany(src, decodebin, queue, videoconvert, videoscale, capsfilter, jpegenc, appsink.Element)

// Link elements
src.Link(decodebin)
queue.Link(videoconvert)
videoconvert.Link(videoscale)
videoscale.Link(capsfilter)
capsfilter.Link(jpegenc)
jpegenc.Link(appsink.Element)

// Connect pad-added signal for decodebin
decodebin.Connect("pad-added", func(element *gst.Element, pad *gst.Pad) {
sinkpad := queue.GetStaticPad("sink")
pad.Link(sinkpad)
})

// Set up ZMQ PUB socket
publisher, err := zmq.NewSocket(zmq.PUB)
if err != nil {
log.Fatalf("Failed to create ZMQ socket: %s", err)
}
defer publisher.Close()

// Set high water mark
err = publisher.SetSndhwm(1)
if err != nil {
log.Fatalf("Failed to set high water mark: %s", err)
}

err = publisher.Bind("tcp://*:5555")
if err != nil {
log.Fatalf("Failed to bind ZMQ socket: %s", err)
}

// Start playing
pipeline.SetState(gst.StatePlaying)

// Variables for frame rate calculation
frameCount := 0
startTime := time.Now()

// Callback function for new samples
appsink.SetCallbacks(&app.SinkCallbacks{
NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
sample := sink.PullSample()
if sample == nil {
log.Printf("Failed to pull sample")
return gst.FlowError
}

buffer := sample.GetBuffer()
data := buffer.Bytes()

// Send frame via ZMQ
_, err = publisher.SendBytes(data, zmq.DONTWAIT)
if err != nil {
log.Printf("Failed to send frame: %s", err)
}

// Update frame count and calculate FPS
frameCount++
elapsed := time.Since(startTime)
if elapsed >= time.Second {
fps := float64(frameCount) / elapsed.Seconds()
log.Printf("Producer FPS: %.2f", fps)
frameCount = 0
startTime = time.Now()
}

return gst.FlowOK
},
})

// Create a main loop
mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false)

// Add a message handler to the pipeline bus
pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
pipeline.BlockSetState(gst.StateNull)
mainLoop.Quit()
case gst.MessageError: // Error messages are always fatal
err := msg.ParseError()
fmt.Println("ERROR:", err.Error())
if debug := err.DebugString(); debug != "" {
fmt.Println("DEBUG:", debug)
}
mainLoop.Quit()
default:
// All messages implement a Stringer. However, this is
// typically an expensive thing to do and should be avoided.
fmt.Println(msg)
}
return true
})

// Run the main loop
mainLoop.Run()
}
Loading
Loading