Skip to content

Commit

Permalink
feat(config): make table name of pg backend configurable (#5244)
Browse files Browse the repository at this point in the history
* feat: configurable table name

* fix: election sql

* feat: configurable lock_id

* chore: update config file

* perf: useless allocation

* perf: useless allocation

* chore: remove unused type hint

* Apply suggestions from code review

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* chore: update config file

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
CookiePieWw and WenyXu authored Jan 8, 2025
1 parent 03a2e6d commit 751fa4e
Showing 5 changed files with 188 additions and 80 deletions.
2 changes: 2 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
@@ -296,6 +296,8 @@
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
8 changes: 8 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
@@ -24,6 +24,14 @@ store_key_prefix = ""
## - `postgres_store`
backend = "etcd_store"

## Table name in RDS to store metadata. Effect when using a RDS kvbackend.
## **Only used when backend is `postgres_store`.**
meta_table_name = "greptime_metakv"

## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
## Only used when backend is `postgres_store`.
meta_election_lock_id = 1

## Datanode selector type.
## - `round_robin` (default value)
## - `lease_based`
4 changes: 3 additions & 1 deletion src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -230,7 +230,7 @@ pub async fn metasrv_builder(
(None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(opts).await?;
// TODO(CookiePie): use table name from config.
let kv_backend = PgStore::with_pg_pool(pool, "greptime_metakv", opts.max_txn_ops)
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.
@@ -240,6 +240,8 @@ pub async fn metasrv_builder(
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
(kv_backend, Some(election))
239 changes: 160 additions & 79 deletions src/meta-srv/src/election/postgres.rs
Original file line number Diff line number Diff line change
@@ -35,50 +35,138 @@ use crate::error::{
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};

// TODO(CookiePie): The lock id should be configurable.
const CAMPAIGN: &str = "SELECT pg_try_advisory_lock({})";
const STEP_DOWN: &str = "SELECT pg_advisory_unlock({})";
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_session_timeout = '10s';";

// Separator between value and expire time.
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;

// SQL to put a value with expire time. Parameters: key, value, LEASE_SEP, expire_time
const PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME: &str = r#"
WITH prev AS (
SELECT k, v FROM greptime_metakv WHERE k = $1
), insert AS (
INSERT INTO greptime_metakv
VALUES($1, convert_to($2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
ON CONFLICT (k) DO NOTHING
)
struct ElectionSqlFactory<'a> {
lock_id: u64,
table_name: &'a str,
}

SELECT k, v FROM prev;
"#;
struct ElectionSqlSet {
campaign: String,
step_down: String,
// SQL to put a value with expire time.
//
// Parameters for the query:
// `$1`: key,
// `$2`: value,
// `$3`: lease time in seconds
//
// Returns:
// If the key already exists, return the previous value.
put_value_with_lease: String,
// SQL to update a value with expire time.
//
// Parameters for the query:
// `$1`: key,
// `$2`: previous value,
// `$3`: updated value,
// `$4`: lease time in seconds
update_value_with_lease: String,
// SQL to get a value with expire time.
//
// Parameters:
// `$1`: key
get_value_with_lease: String,
// SQL to get all values with expire time with the given key prefix.
//
// Parameters:
// `$1`: key prefix like 'prefix%'
//
// Returns:
// column 0: value,
// column 1: current timestamp
get_value_with_lease_by_prefix: String,
// SQL to delete a value.
//
// Parameters:
// `$1`: key
//
// Returns:
// column 0: key deleted,
// column 1: value deleted
delete_value: String,
}

// SQL to update a value with expire time. Parameters: key, prev_value_with_lease, updated_value, LEASE_SEP, expire_time
const CAS_WITH_EXPIRE_TIME: &str = r#"
UPDATE greptime_metakv
SET k=$1,
v=convert_to($3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
WHERE
k=$1 AND v=$2
"#;
impl<'a> ElectionSqlFactory<'a> {
fn new(lock_id: u64, table_name: &'a str) -> Self {
Self {
lock_id,
table_name,
}
}

const GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k = $1"#;
fn build(self) -> ElectionSqlSet {
ElectionSqlSet {
campaign: self.campaign_sql(),
step_down: self.step_down_sql(),
put_value_with_lease: self.put_value_with_lease_sql(),
update_value_with_lease: self.update_value_with_lease_sql(),
get_value_with_lease: self.get_value_with_lease_sql(),
get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
delete_value: self.delete_value_sql(),
}
}

const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k LIKE $1"#;
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
fn set_idle_session_timeout_sql(&self) -> &str {
"SET idle_session_timeout = '10s';"
}

const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;";
fn campaign_sql(&self) -> String {
format!("SELECT pg_try_advisory_lock({})", self.lock_id)
}

fn campaign_sql(lock_id: u64) -> String {
CAMPAIGN.replace("{}", &lock_id.to_string())
}
fn step_down_sql(&self) -> String {
format!("SELECT pg_advisory_unlock({})", self.lock_id)
}

fn step_down_sql(lock_id: u64) -> String {
STEP_DOWN.replace("{}", &lock_id.to_string())
fn put_value_with_lease_sql(&self) -> String {
format!(
r#"WITH prev AS (
SELECT k, v FROM {} WHERE k = $1
), insert AS (
INSERT INTO {}
VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
ON CONFLICT (k) DO NOTHING
)
SELECT k, v FROM prev;
"#,
self.table_name, self.table_name, LEASE_SEP
)
}

fn update_value_with_lease_sql(&self) -> String {
format!(
r#"UPDATE {}
SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
WHERE k = $1 AND v = $2"#,
self.table_name, LEASE_SEP
)
}

fn get_value_with_lease_sql(&self) -> String {
format!(
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k = $1"#,
self.table_name
)
}

fn get_value_with_lease_by_prefix_sql(&self) -> String {
format!(
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k LIKE $1"#,
self.table_name
)
}

fn delete_value_sql(&self) -> String {
format!(
"DELETE FROM {} WHERE k = $1 RETURNING k,v;",
self.table_name
)
}
}

/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
@@ -138,7 +226,7 @@ pub struct PgElection {
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
lock_id: u64,
sql_set: ElectionSqlSet,
}

impl PgElection {
@@ -147,10 +235,13 @@ impl PgElection {
client: Client,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
table_name: &str,
lock_id: u64,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(SET_IDLE_SESSION_TIMEOUT, &[])
.execute(sql_factory.set_idle_session_timeout_sql(), &[])
.await
.context(PostgresExecutionSnafu)?;

@@ -163,8 +254,7 @@ impl PgElection {
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
// TODO(CookiePie): The lock id should be configurable.
lock_id: 28319,
sql_set: sql_factory.build(),
}))
}

@@ -276,7 +366,7 @@ impl Election for PgElection {
loop {
let res = self
.client
.query(&campaign_sql(self.lock_id), &[])
.query(&self.sql_set.campaign, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Some(row) = res.first() {
@@ -333,10 +423,10 @@ impl PgElection {
key: &str,
with_origin: bool,
) -> Result<Option<(String, Timestamp, Timestamp, Option<String>)>> {
let key = key.as_bytes().to_vec();
let key = key.as_bytes();
let res = self
.client
.query(GET_WITH_CURRENT_TIMESTAMP, &[&key as &(dyn ToSql + Sync)])
.query(&self.sql_set.get_value_with_lease, &[&key])
.await
.context(PostgresExecutionSnafu)?;

@@ -378,10 +468,7 @@ impl PgElection {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
let res = self
.client
.query(
PREFIX_GET_WITH_CURRENT_TIMESTAMP,
&[(&key_prefix as &(dyn ToSql + Sync))],
)
.query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
.await
.context(PostgresExecutionSnafu)?;

@@ -406,17 +493,16 @@ impl PgElection {
}

async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> {
let key = key.as_bytes().to_vec();
let prev = prev.as_bytes().to_vec();
let key = key.as_bytes();
let prev = prev.as_bytes();
let res = self
.client
.execute(
CAS_WITH_EXPIRE_TIME,
&self.sql_set.update_value_with_lease,
&[
&key as &(dyn ToSql + Sync),
&prev as &(dyn ToSql + Sync),
&key,
&prev,
&updated,
&LEASE_SEP,
&(self.candidate_lease_ttl_secs as f64),
],
)
@@ -426,7 +512,7 @@ impl PgElection {
ensure!(
res == 1,
UnexpectedSnafu {
violated: format!("Failed to update key: {}", String::from_utf8_lossy(&key)),
violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
}
);

@@ -440,17 +526,12 @@ impl PgElection {
value: &str,
lease_ttl_secs: u64,
) -> Result<bool> {
let key = key.as_bytes().to_vec();
let key = key.as_bytes();
let lease_ttl_secs = lease_ttl_secs as f64;
let params: Vec<&(dyn ToSql + Sync)> = vec![
&key as &(dyn ToSql + Sync),
&value as &(dyn ToSql + Sync),
&LEASE_SEP,
&lease_ttl_secs,
];
let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
let res = self
.client
.query(PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, &params)
.query(&self.sql_set.put_value_with_lease, &params)
.await
.context(PostgresExecutionSnafu)?;
Ok(res.is_empty())
@@ -459,10 +540,10 @@ impl PgElection {
/// Returns `true` if the deletion is successful.
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str) -> Result<bool> {
let key = key.as_bytes().to_vec();
let key = key.as_bytes();
let res = self
.client
.query(POINT_DELETE, &[&key as &(dyn ToSql + Sync)])
.query(&self.sql_set.delete_value, &[&key])
.await
.context(PostgresExecutionSnafu)?;

@@ -574,7 +655,7 @@ impl PgElection {
{
self.delete_value(&key).await?;
self.client
.query(&step_down_sql(self.lock_id), &[])
.query(&self.sql_set.step_down, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Err(e) = self
@@ -686,7 +767,7 @@ mod tests {
leader_watcher: tx,
store_key_prefix: uuid::Uuid::new_v4().to_string(),
candidate_lease_ttl_secs: 10,
lock_id: 28319,
sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(),
};

let res = pg_election
@@ -760,7 +841,7 @@ mod tests {
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
lock_id: 28319,
sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(),
};

let node_info = MetasrvNodeInfo {
@@ -802,7 +883,7 @@ mod tests {
leader_watcher: tx,
store_key_prefix: store_key_prefix.clone(),
candidate_lease_ttl_secs,
lock_id: 28319,
sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(),
};

let candidates = pg_election.all_candidates().await.unwrap();
@@ -843,7 +924,7 @@ mod tests {
leader_watcher: tx,
store_key_prefix: uuid::Uuid::new_v4().to_string(),
candidate_lease_ttl_secs,
lock_id: 28320,
sql_set: ElectionSqlFactory::new(28320, "greptime_metakv").build(),
};

leader_pg_election.elected().await.unwrap();
@@ -952,13 +1033,13 @@ mod tests {
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
lock_id: 28321,
sql_set: ElectionSqlFactory::new(28321, "greptime_metakv").build(),
};

// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.client
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -989,7 +1070,7 @@ mod tests {
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.client
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -1009,7 +1090,7 @@ mod tests {

let res = leader_pg_election
.client
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -1037,7 +1118,7 @@ mod tests {
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.client
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -1094,7 +1175,7 @@ mod tests {
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.client
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -1125,7 +1206,7 @@ mod tests {
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.client
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
@@ -1162,7 +1243,7 @@ mod tests {
// Clean up
leader_pg_election
.client
.query(&step_down_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
}
@@ -1183,7 +1264,7 @@ mod tests {
leader_watcher: tx,
store_key_prefix: store_key_prefix.clone(),
candidate_lease_ttl_secs,
lock_id: 28322,
sql_set: ElectionSqlFactory::new(28322, "greptime_metakv").build(),
};

let leader_client = create_postgres_client().await.unwrap();
@@ -1196,12 +1277,12 @@ mod tests {
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
lock_id: 28322,
sql_set: ElectionSqlFactory::new(28322, "greptime_metakv").build(),
};

leader_pg_election
.client
.query(&campaign_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.campaign, &[])
.await
.unwrap();
leader_pg_election.elected().await.unwrap();
@@ -1242,7 +1323,7 @@ mod tests {
// Clean up
leader_pg_election
.client
.query(&step_down_sql(leader_pg_election.lock_id), &[])
.query(&leader_pg_election.sql_set.step_down, &[])
.await
.unwrap();
}
15 changes: 15 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
@@ -71,6 +71,11 @@ pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_HOME: &str = "/tmp/metasrv";

#[cfg(feature = "pg_kvbackend")]
pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
#[cfg(feature = "pg_kvbackend")]
pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;

// The datastores that implements metadata kvbackend.
#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
#[serde(rename_all = "snake_case")]
@@ -140,6 +145,12 @@ pub struct MetasrvOptions {
pub tracing: TracingOptions,
/// The datastore for kv metadata.
pub backend: BackendImpl,
#[cfg(feature = "pg_kvbackend")]
/// Table name of rds kv backend.
pub meta_table_name: String,
#[cfg(feature = "pg_kvbackend")]
/// Lock id for meta kv election. Only effect when using pg_kvbackend.
pub meta_election_lock_id: u64,
}

const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
@@ -177,6 +188,10 @@ impl Default for MetasrvOptions {
flush_stats_factor: 3,
tracing: TracingOptions::default(),
backend: BackendImpl::EtcdStore,
#[cfg(feature = "pg_kvbackend")]
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
}
}
}

0 comments on commit 751fa4e

Please sign in to comment.