Skip to content

Commit 97f46f1

Browse files
authored
Worker refactors (#253)
1 parent b5fcecc commit 97f46f1

15 files changed

Lines changed: 208 additions & 158 deletions

File tree

.github/workflows/extension_upgrade.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,17 @@ jobs:
5050
- name: Install project dependencies
5151
run: |
5252
make setup
53-
cargo install pg-trunk
5453
- name: Test previous version (v0.20.0)
5554
env:
5655
HF_API_KEY: ${{ secrets.HF_API_KEY }}
5756
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
5857
CO_API_KEY: ${{ secrets.CO_API_KEY }}
5958
run: |
60-
trunk install vectorize --version 0.20.0 --pg-config $(cargo pgrx info pg-config pg17)
6159
git fetch --tags
6260
git checkout tags/v0.20.0
6361
# pgrx=0.12.5 required for v0.20.0
6462
cargo install cargo-pgrx --version 0.12.5 --locked
63+
cargo pgrx install --pg-config $(cargo pgrx info pg-config pg17)
6564
# use integration tests for v0.20.0
6665
make test-integration
6766
- name: Test branch's version

Cargo.lock

Lines changed: 14 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
<b>pg_vectorize: a VectorDB on Postgres</b>
33
</h1>
44

5+
[![PostgreSQL](https://img.shields.io/badge/PostgreSQL-13%20%7C%2014%20%7C%2015%20%7C%2016%20%7C%2017%20%7C%2018-336791?logo=postgresql&logoColor=white)](https://www.postgresql.org/)
6+
57
A Postgres server and extension that automates the transformation and orchestration of text to embeddings and provides hooks into the most popular LLMs. This allows you to do get up and running and automate maintenance for vector search, full text search, and hybrid search, which enables you to quickly build RAG and search engines on Postgres.
68

79
This project relies heavily on the work by [pgvector](https://github.com/pgvector/pgvector) for vector similarity search, [pgmq](https://github.com/pgmq/pgmq) for orchestration in background workers, and [SentenceTransformers](https://huggingface.co/sentence-transformers).
810

911
---
1012

11-
[![PostgreSQL](https://img.shields.io/badge/PostgreSQL-13%20%7C%2014%20%7C%2015%20%7C%2016%20%7C%2017%20%7C%2018-336791?logo=postgresql&logoColor=white)](https://www.postgresql.org/)
1213

1314
**API Documentation**: https://chuckhend.github.io/pg_vectorize/
1415

@@ -64,7 +65,12 @@ curl -X POST http://localhost:8080/api/v1/table -d '{
6465
Search using the HTTP API:
6566

6667
```bash
67-
curl -X GET "http://localhost:8080/api/v1/search?job_name=my_job&query=camping%20backpack&limit=1" | jq .
68+
curl -G \
69+
"http://localhost:8080/api/v1/search" \
70+
--data-urlencode "job_name=my_job" \
71+
--data-urlencode "query=camping backpack" \
72+
--data-urlencode "limit=1" \
73+
| jq .
6874
```
6975

7076
```json
@@ -79,7 +85,7 @@ curl -X GET "http://localhost:8080/api/v1/search?job_name=my_job&query=camping%2
7985
"rrf_score": 0.03278688524590164,
8086
"semantic_rank": 1,
8187
"similarity_score": 0.6296013593673706,
82-
"updated_at": "2025-10-04T14:45:16.152526+00:00"
88+
"updated_at": "2025-10-05T00:14:39.220893+00:00"
8389
}
8490
]
8591
```

core/src/config.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,35 @@ pub struct Config {
3030
pub max_retries: i32,
3131
pub webserver_port: u16,
3232
pub num_server_workers: usize,
33+
pub database_pool_max: u32,
34+
pub database_cache_pool_max: u32,
3335
}
3436

3537
impl Config {
3638
pub fn from_env() -> Config {
39+
// read server worker count first so we can derive sensible defaults
40+
let num_server_workers: usize =
41+
from_env_default("NUM_SERVER_WORKERS", "8").parse().unwrap();
42+
43+
// derive a default DB pool size from num_server_workers: 2 connections per worker + 2 extra,
44+
// clamped between 4 and 64. This gives headroom for background tasks and short bursts.
45+
let derived_db_pool_default: u32 = ((num_server_workers as u32).saturating_mul(2))
46+
.saturating_add(2)
47+
.clamp(4, 64);
48+
49+
// allow environment override; fall back to derived default when not set or invalid.
50+
let database_pool_max: u32 = env::var("DATABASE_POOL_MAX")
51+
.ok()
52+
.and_then(|v| v.parse().ok())
53+
.unwrap_or(derived_db_pool_default);
54+
55+
// cache pool is typically small; default to max(2, num_server_workers / 4)
56+
let derived_cache_pool_default: u32 = ((num_server_workers as u32) / 4).max(2).clamp(2, 16);
57+
let database_cache_pool_max: u32 = env::var("DATABASE_CACHE_POOL_MAX")
58+
.ok()
59+
.and_then(|v| v.parse().ok())
60+
.unwrap_or(derived_cache_pool_default);
61+
3762
Config {
3863
proxy_enabled: env::var("VECTORIZE_PROXY_ENABLED")
3964
.map(|v| parse_bool_flexible(&v))
@@ -63,7 +88,9 @@ impl Config {
6388
.unwrap(),
6489
max_retries: from_env_default("MAX_RETRIES", "2").parse().unwrap(),
6590
webserver_port: from_env_default("WEBSERVER_PORT", "8080").parse().unwrap(),
66-
num_server_workers: from_env_default("NUM_SERVER_WORKERS", "8").parse().unwrap(),
91+
num_server_workers,
92+
database_pool_max,
93+
database_cache_pool_max,
6794
}
6895
}
6996
}

core/src/query.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,16 @@ BEGIN
327327
1000 -- default batch size
328328
);
329329
FOR batch_result IN SELECT batch FROM vectorize.batch_texts(record_ids, batch_size) LOOP
330-
job_messages := array_append(
331-
job_messages,
332-
jsonb_build_object(
333-
'job_name', job_name,
334-
'record_ids', batch_result.batch
335-
)
336-
);
330+
-- only append non-null, non-empty batches
331+
IF array_length(batch_result.batch, 1) > 0 THEN
332+
job_messages := array_append(
333+
job_messages,
334+
jsonb_build_object(
335+
'job_name', job_name,
336+
'record_ids', batch_result.batch
337+
)
338+
);
339+
END IF;
337340
END LOOP;
338341
339342
PERFORM pgmq.send_batch(

extension/sql/meta.sql

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,16 @@ BEGIN
7070
-- create jobs of size batch_size
7171
batch_size := current_setting('vectorize.batch_size')::integer;
7272
FOR batch_result IN SELECT batch FROM vectorize.batch_texts(record_ids, batch_size) LOOP
73-
job_messages := array_append(
74-
job_messages,
75-
jsonb_build_object(
76-
'job_name', job_name,
77-
'record_ids', batch_result.batch
78-
)
79-
);
73+
-- only append non-null, non-empty batches
74+
IF array_length(batch_result.batch, 1) > 0 THEN
75+
job_messages := array_append(
76+
job_messages,
77+
jsonb_build_object(
78+
'job_name', job_name,
79+
'record_ids', batch_result.batch
80+
)
81+
);
82+
END IF;
8083
END LOOP;
8184

8285
PERFORM pgmq.send_batch(

proxy/src/embeddings.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -130,21 +130,20 @@ pub fn resolve_prepared_embed_calls(
130130
parameters: &[String],
131131
) -> Result<Vec<EmbedCall>, VectorizeError> {
132132
for call in &mut embed_calls {
133-
if call.is_prepared {
134-
if let (Some(query_idx), Some(project_idx)) =
133+
if call.is_prepared
134+
&& let (Some(query_idx), Some(project_idx)) =
135135
(call.query_param_index, call.project_param_index)
136-
{
137-
if query_idx >= parameters.len() || project_idx >= parameters.len() {
138-
return Err(VectorizeError::EmbeddingGenerationFailed(format!(
139-
"Parameter index out of bounds: query_idx={}, project_idx={}, params_len={}",
140-
query_idx,
141-
project_idx,
142-
parameters.len()
143-
)));
144-
}
145-
call.query = parameters[query_idx].clone();
146-
call.project_name = parameters[project_idx].clone();
136+
{
137+
if query_idx >= parameters.len() || project_idx >= parameters.len() {
138+
return Err(VectorizeError::EmbeddingGenerationFailed(format!(
139+
"Parameter index out of bounds: query_idx={}, project_idx={}, params_len={}",
140+
query_idx,
141+
project_idx,
142+
parameters.len()
143+
)));
147144
}
145+
call.query = parameters[query_idx].clone();
146+
call.project_name = parameters[project_idx].clone();
148147
}
149148
}
150149
Ok(embed_calls)

0 commit comments

Comments
 (0)