Skip to content

Commit 484d89e

Browse files
authored
check if queue already exists (#28)
* fix .table bug * unused import * add upgrade script
1 parent 67c3132 commit 484d89e

6 files changed

Lines changed: 20 additions & 13 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "vectorize"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
edition = "2021"
55
publish = false
66

Trunk.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ description = "The simplest way to orchestrate vector search on Postgres."
66
homepage = "https://github.com/tembo-io/pg_vectorize"
77
documentation = "https://github.com/tembo-io/pg_vectorize"
88
categories = ["orchestration", "machine_learning"]
9-
version = "0.6.0"
9+
version = "0.6.1"
1010

1111
[build]
1212
postgres_version = "15"

sql/vectorize--0.6.0--0.6.1.sql

Whitespace-only changes.

src/api.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,7 @@ fn table(
130130
// TODO: first batch update
131131
// initialize cron
132132
let _ = init::init_cron(&schedule, &job_name); // handle this error
133-
Ok(format!(
134-
"{schema}.{table}.{columns:?}.{transformer}.{search_alg}"
135-
))
133+
Ok(format!("Successfully created job: {job_name}"))
136134
}
137135

138136
#[pg_extern]

src/init.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{query::check_input, types, types::TableMethod, types::Transformer};
22
use pgrx::prelude::*;
33
use std::collections::HashMap;
44

5-
use anyhow::Result;
5+
use anyhow::{Context, Result};
66
use lazy_static::lazy_static;
77

88
lazy_static! {
@@ -18,12 +18,21 @@ lazy_static! {
1818

1919
pub fn init_pgmq(transformer: &Transformer) -> Result<()> {
2020
let qname = QUEUE_MAPPING.get(transformer).expect("invalid transformer");
21-
let ran: Result<_, spi::Error> = Spi::connect(|mut c| {
22-
let _r = c.update(&format!("SELECT pgmq.create('{qname}');"), None, None)?;
23-
Ok(())
24-
});
25-
if let Err(e) = ran {
26-
error!("error creating job queue: {}", e);
21+
// check if queue already created:
22+
let queue_exists: bool = Spi::get_one(&format!(
23+
"SELECT EXISTS (SELECT 1 FROM pgmq.meta WHERE queue_name = '{qname}');",
24+
))?
25+
.context("error checking if queue exists")?;
26+
if queue_exists {
27+
return Ok(());
28+
} else {
29+
let ran: Result<_, spi::Error> = Spi::connect(|mut c| {
30+
let _r = c.update(&format!("SELECT pgmq.create('{qname}');"), None, None)?;
31+
Ok(())
32+
});
33+
if let Err(e) = ran {
34+
error!("error creating job queue: {}", e);
35+
}
2736
}
2837
Ok(())
2938
}

tests/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pub mod common {
22
use log::LevelFilter;
33
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
44
use sqlx::ConnectOptions;
5-
use sqlx::{FromRow, Pool, Postgres, Row};
5+
use sqlx::{Pool, Postgres, Row};
66
use url::{ParseError, Url};
77

88
pub async fn connect(url: &str) -> Pool<Postgres> {

0 commit comments

Comments
 (0)