Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,20 @@ python -m olmocr.pipeline ./localworkspace --markdown --pdfs tests/gnarly_pdfs/*

With the addition of the `--markdown` flag, results will be stored as markdown files inside of `./localworkspace/markdown/`.

### Using External vLLM Server

If you have a vLLM server already running elsewhere (or any inference platform implementing the relevant subset of the OpenAI API), you can point olmOCR to use it instead of spawning a local instance:

```bash
# Use external vLLM server instead of local one
python -m olmocr.pipeline ./localworkspace --server http://remote-server:8000 --markdown --pdfs tests/gnarly_pdfs/*.pdf
```

The served model name should be `olmocr`. An example vLLM launch command would be:
```bash
vllm serve allenai/olmOCR-7B-0825-FP8 --served-model-name olmocr --max-model-len 16384
```

#### Viewing Results

The `./localworkspace/` workspace folder will then have both [Dolma](https://github.com/allenai/dolma) and markdown files (if using `--markdown`).
Expand Down Expand Up @@ -271,7 +285,7 @@ python -m olmocr.pipeline ./localworkspace --markdown --pdfs olmocr-sample.pdf
python -m olmocr.pipeline --help
usage: pipeline.py [-h] [--pdfs [PDFS ...]] [--model MODEL] [--workspace_profile WORKSPACE_PROFILE] [--pdf_profile PDF_PROFILE] [--pages_per_group PAGES_PER_GROUP] [--max_page_retries MAX_PAGE_RETRIES] [--max_page_error_rate MAX_PAGE_ERROR_RATE] [--workers WORKERS]
[--apply_filter] [--stats] [--markdown] [--target_longest_image_dim TARGET_LONGEST_IMAGE_DIM] [--target_anchor_text_len TARGET_ANCHOR_TEXT_LEN] [--guided_decoding] [--gpu-memory-utilization GPU_MEMORY_UTILIZATION] [--max_model_len MAX_MODEL_LEN]
[--tensor-parallel-size TENSOR_PARALLEL_SIZE] [--data-parallel-size DATA_PARALLEL_SIZE] [--port PORT] [--beaker] [--beaker_workspace BEAKER_WORKSPACE] [--beaker_cluster BEAKER_CLUSTER] [--beaker_gpus BEAKER_GPUS] [--beaker_priority BEAKER_PRIORITY]
[--tensor-parallel-size TENSOR_PARALLEL_SIZE] [--data-parallel-size DATA_PARALLEL_SIZE] [--port PORT] [--server SERVER] [--beaker] [--beaker_workspace BEAKER_WORKSPACE] [--beaker_cluster BEAKER_CLUSTER] [--beaker_gpus BEAKER_GPUS] [--beaker_priority BEAKER_PRIORITY]
workspace

Manager for running millions of PDFs through a batch inference pipeline
Expand Down Expand Up @@ -303,7 +317,7 @@ options:
Maximum amount of anchor text to use (characters), not used for new models
--guided_decoding Enable guided decoding for model YAML type outputs

VLLM Forwarded arguments:
VLLM arguments:
--gpu-memory-utilization GPU_MEMORY_UTILIZATION
Fraction of VRAM vLLM may pre-allocate for KV-cache (passed through to vllm serve).
--max_model_len MAX_MODEL_LEN
Expand All @@ -313,6 +327,9 @@ VLLM Forwarded arguments:
--data-parallel-size DATA_PARALLEL_SIZE, -dp DATA_PARALLEL_SIZE
Data parallel size for vLLM
--port PORT Port to use for the VLLM server
--server SERVER URL of external vLLM (or other compatible provider)
server (e.g., http://hostname:port). If provided,
skips spawning local vLLM instance

beaker/cluster execution:
--beaker Submit this job to beaker instead of running locally
Expand Down
40 changes: 31 additions & 9 deletions olmocr/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ async def apost(url, json_data):


async def process_page(args, worker_id: int, pdf_orig_path: str, pdf_local_path: str, page_num: int) -> PageResult:
COMPLETION_URL = f"http://localhost:{BASE_SERVER_PORT}/v1/chat/completions"
if args.server:
COMPLETION_URL = f"{args.server.rstrip('/')}/v1/chat/completions"
else:
COMPLETION_URL = f"http://localhost:{BASE_SERVER_PORT}/v1/chat/completions"
MAX_RETRIES = args.max_page_retries
MODEL_MAX_CONTEXT = 16384
TEMPERATURE_BY_ATTEMPT = [0.1, 0.1, 0.2, 0.3, 0.5, 0.8, 0.9, 1.0]
Expand Down Expand Up @@ -730,10 +733,13 @@ async def vllm_server_host(model_name_or_path, args, semaphore, unknown_args=Non
sys.exit(1)


async def vllm_server_ready():
async def vllm_server_ready(args):
max_attempts = 300
delay_sec = 1
url = f"http://localhost:{BASE_SERVER_PORT}/v1/models"
if args.server:
url = f"{args.server.rstrip('/')}/v1/models"
else:
url = f"http://localhost:{BASE_SERVER_PORT}/v1/models"

for attempt in range(1, max_attempts + 1):
try:
Expand Down Expand Up @@ -1069,6 +1075,9 @@ async def main():
vllm_group.add_argument("--tensor-parallel-size", "-tp", type=int, default=1, help="Tensor parallel size for vLLM")
vllm_group.add_argument("--data-parallel-size", "-dp", type=int, default=1, help="Data parallel size for vLLM")
vllm_group.add_argument("--port", type=int, default=30024, help="Port to use for the VLLM server")
vllm_group.add_argument(
"--server", type=str, help="URL of external vLLM (or other compatible provider) server (e.g., http://hostname:port). If provided, skips spawning local vLLM instance"
)

# Beaker/job running stuff
beaker_group = parser.add_argument_group("beaker/cluster execution")
Expand Down Expand Up @@ -1207,12 +1216,17 @@ async def main():

# If you get this far, then you are doing inference and need a GPU
# check_sglang_version()
check_torch_gpu_available()
if not args.server:
check_torch_gpu_available()

logger.info(f"Starting pipeline with PID {os.getpid()}")

# Download the model before you do anything else
model_name_or_path = await download_model(args.model)
if args.server:
logger.info(f"Using external server at {args.server}")
model_name_or_path = None
else:
model_name_or_path = await download_model(args.model)

# Initialize the work queue
qsize = await work_queue.initialize_queue()
Expand All @@ -1226,9 +1240,12 @@ async def main():
# As soon as one worker is no longer saturating the gpu, the next one can start sending requests
semaphore = asyncio.Semaphore(1)

vllm_server = asyncio.create_task(vllm_server_host(model_name_or_path, args, semaphore, unknown_args))
# Start local vLLM instance if not using external one
vllm_server = None
if not args.server:
vllm_server = asyncio.create_task(vllm_server_host(model_name_or_path, args, semaphore, unknown_args))

await vllm_server_ready()
await vllm_server_ready(args)

metrics_task = asyncio.create_task(metrics_reporter(work_queue))

Expand All @@ -1241,11 +1258,16 @@ async def main():
# Wait for all worker tasks to finish
await asyncio.gather(*worker_tasks)

vllm_server.cancel()
# Cancel vLLM server if it was started
if vllm_server is not None:
vllm_server.cancel()
metrics_task.cancel()

# Wait for cancelled tasks to complete
await asyncio.gather(vllm_server, metrics_task, return_exceptions=True)
tasks_to_wait = [metrics_task]
if vllm_server is not None:
tasks_to_wait.append(vllm_server)
await asyncio.gather(*tasks_to_wait, return_exceptions=True)

# Output final metrics summary
metrics_summary = metrics.get_metrics_summary()
Expand Down
Loading