Skip to content

Commit 08e2d96

Browse files
authored
use same timeout on all calls, refactor bgw wait latch interaction (#63)
* use same timeout on all calls * only pause in wait latch
1 parent 0103b2a commit 08e2d96

7 files changed

Lines changed: 32 additions & 31 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "vectorize"
3-
version = "0.11.0"
3+
version = "0.11.1"
44
edition = "2021"
55
publish = false
66

Trunk.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ description = "The simplest way to orchestrate vector search on Postgres."
66
homepage = "https://github.com/tembo-io/pg_vectorize"
77
documentation = "https://github.com/tembo-io/pg_vectorize"
88
categories = ["orchestration", "machine_learning"]
9-
version = "0.11.0"
9+
version = "0.11.1"
1010

1111
[build]
1212
postgres_version = "15"

sql/vectorize--0.11.0--0.11.1.sql

Whitespace-only changes.

src/transformers/http_handler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,12 @@ pub async fn get_model_info(
9494
) -> Result<TransformerMetadata> {
9595
let svc_url = get_generic_svc_url()?;
9696
let info_url = svc_url.replace("/embeddings", "/info");
97+
let timeout = EMBEDDING_REQ_TIMEOUT_SEC.get();
9798
let client = reqwest::Client::new();
9899
let mut req = client
99100
.get(info_url)
100101
.query(&[("model_name", model_name)])
101-
.timeout(std::time::Duration::from_secs(5)); // model info must always be fast
102+
.timeout(std::time::Duration::from_secs(timeout as u64));
102103
if let Some(key) = api_key {
103104
req = req.header("Authorization", format!("Bearer {}", key));
104105
}

src/transformers/openai.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::Result;
44

55
use crate::{
66
executor::VectorizeMeta,
7-
guc::OPENAI_KEY,
7+
guc::{EMBEDDING_REQ_TIMEOUT_SEC, OPENAI_KEY},
88
transformers::{
99
http_handler::handle_response,
1010
types::{EmbeddingPayload, EmbeddingRequest, Inputs},
@@ -72,6 +72,7 @@ pub fn trim_inputs(inputs: &[Inputs]) -> Vec<String> {
7272

7373
pub fn validate_api_key(key: &str) -> Result<()> {
7474
let client = reqwest::Client::new();
75+
let timeout = EMBEDDING_REQ_TIMEOUT_SEC.get();
7576
let runtime = tokio::runtime::Builder::new_current_thread()
7677
.enable_io()
7778
.enable_time()
@@ -82,7 +83,7 @@ pub fn validate_api_key(key: &str) -> Result<()> {
8283
.get("https://api.openai.com/v1/models")
8384
.header("Content-Type", "application/json")
8485
.header("Authorization", format!("Bearer {}", key))
85-
.timeout(std::time::Duration::from_secs(5)) // api validation should be fast
86+
.timeout(std::time::Duration::from_secs(timeout as u64))
8687
.send()
8788
.await
8889
.unwrap_or_else(|e| error!("failed to make Open AI key validation call: {}", e));

src/workers/pg_bgw.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::guc::{init_guc, NUM_BGW_PROC};
22
use crate::init::VECTORIZE_QUEUE;
33
use crate::util::{get_pg_conn, ready};
4-
use anyhow::Result;
54
use pgrx::bgworkers::*;
65
use pgrx::*;
7-
use std::time::{Duration, Instant};
6+
use std::time::Duration;
87

98
use crate::workers::run_worker;
109

@@ -45,30 +44,30 @@ pub extern "C" fn background_worker_main(_arg: pg_sys::Datum) {
4544

4645
log!("Starting BG Workers {}", BackgroundWorker::get_name(),);
4746

48-
while BackgroundWorker::wait_latch(Some(Duration::from_millis(10))) {
49-
runtime.block_on(async {
50-
while !ready(&conn).await {
51-
log!("pg-vectorize: waiting for CREATE EXTENSION vectorize");
52-
tokio::time::sleep(Duration::from_secs(5)).await;
53-
}
54-
});
47+
let mut ext_ready: bool = false;
48+
let mut wait_duration: Duration = Duration::from_secs(6);
49+
while BackgroundWorker::wait_latch(Some(wait_duration)) {
50+
if !ext_ready {
51+
warning!("pg-vectorize: waiting for CREATE EXTENSION vectorize CASCADE;");
52+
runtime.block_on(async {
53+
ext_ready = ready(&conn).await;
54+
});
55+
// return to wait_latch if extension is not ready
56+
continue;
57+
}
5558

56-
let _worker_ran: Result<()> = runtime.block_on(async {
57-
// continue to poll without pauses
58-
let start = Instant::now();
59-
let duration = Duration::from_secs(6);
60-
// return control to wait_latch() after `duration` has elapsed
61-
while start.elapsed() < duration {
62-
match run_worker(queue.clone(), &conn, VECTORIZE_QUEUE).await {
63-
// sleep for 2 seconds when no messages in the queue
64-
Ok(None) => tokio::time::sleep(Duration::from_secs(2)).await,
65-
// sleep for 6 seconds when there is an error reading messages
66-
Err(_) => tokio::time::sleep(Duration::from_secs(6)).await,
67-
// continue to poll where there was a message consumed
68-
Ok(Some(_)) => continue,
69-
}
70-
}
71-
Ok(())
59+
wait_duration = runtime.block_on(async {
60+
let wait_dur = match run_worker(queue.clone(), &conn, VECTORIZE_QUEUE).await {
61+
// no messages in queue, so wait 2 seconds
62+
Ok(None) => 2000,
63+
// wait 10 seconds between polls when there is a failure
64+
Err(_) => 10000,
65+
// when there was a successfully processed message from queue,
66+
// only wait 10ms before checking for more messages
67+
// this allows postgres to kill or restart the bgw in between messages
68+
Ok(Some(_)) => 10,
69+
};
70+
Duration::from_millis(wait_dur)
7271
});
7372
}
7473
log!("pg-vectorize: shutting down");

0 commit comments

Comments
 (0)