Skip to content

Fix/redis connection wait #705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::{Config, PredicatesApi};
use crate::scan::common::get_block_heights_to_scan;
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status,
connect_to_redis_with_retry, set_confirmed_expiration_status,
set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData,
};
use chainhook_sdk::bitcoincore_rpc::RpcApi;
Expand Down Expand Up @@ -71,7 +71,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(

let mut predicates_db_conn = match config.http_api {
PredicatesApi::On(ref api_config) => {
Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx))
Some(connect_to_redis_with_retry(&api_config.database_uri))
}
PredicatesApi::Off => None,
};
Expand Down
5 changes: 2 additions & 3 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use crate::{
config::{Config, PredicatesApi},
scan::common::get_block_heights_to_scan,
service::{
open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status,
set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData,
connect_to_redis_with_retry, set_confirmed_expiration_status, set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData
},
storage::{
get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted,
Expand Down Expand Up @@ -223,7 +222,7 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(

let mut predicates_db_conn = match config.http_api {
PredicatesApi::On(ref api_config) => {
Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx))
Some(connect_to_redis_with_retry(&api_config.database_uri))
}
PredicatesApi::Off => None,
};
Expand Down
89 changes: 34 additions & 55 deletions components/chainhook-cli/src/service/http_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::error::Error;

use crate::config::PredicatesApiConfig;

use super::{open_readwrite_predicates_db_conn, PredicateStatus};
use super::{connect_to_redis_with_retry, PredicateStatus};

pub async fn start_predicate_api_server(
api_config: PredicatesApiConfig,
Expand Down Expand Up @@ -87,10 +87,9 @@ fn handle_get_predicates(
ctx: &State<Context>,
) -> Json<JsonValue> {
ctx.try_log(|logger| slog::info!(logger, "Handling HTTP GET /v1/chainhooks"));
match open_readwrite_predicates_db_conn(api_config) {
Ok(mut predicates_db_conn) => {
let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, ctx) {
Ok(predicates) => predicates,
let mut predicates_db_conn = connect_to_redis_with_retry(&api_config.database_uri);
let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, ctx) {
Ok(predicates) => predicates,
Err(e) => {
ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}"));
return Json(json!({
Expand All @@ -107,14 +106,8 @@ fn handle_get_predicates(

Json(json!({
"status": 200,
"result": serialized_predicates
}))
}
Err(e) => Json(json!({
"status": 500,
"message": e,
})),
}
"result": serialized_predicates
}))
}

#[openapi(tag = "Managing Predicates")]
Expand Down Expand Up @@ -147,17 +140,16 @@ fn handle_create_predicate(

let predicate_uuid = predicate.get_uuid().to_string();

if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) {
if let Ok(Some(_)) = get_entry_from_predicates_db(
&ChainhookInstance::either_stx_or_btc_key(&predicate_uuid),
&mut predicates_db_conn,
ctx,
) {
return Json(json!({
"status": 409,
"error": "Predicate uuid already in use",
}))
}
let mut predicates_db_conn = connect_to_redis_with_retry(&api_config.database_uri);
if let Ok(Some(_)) = get_entry_from_predicates_db(
&ChainhookInstance::either_stx_or_btc_key(&predicate_uuid),
&mut predicates_db_conn,
ctx,
) {
return Json(json!({
"status": 409,
"error": "Predicate uuid already in use",
}))
}

let background_job_tx = background_job_tx.inner();
Expand Down Expand Up @@ -186,31 +178,24 @@ fn handle_get_predicate(
)
});

match open_readwrite_predicates_db_conn(api_config) {
Ok(mut predicates_db_conn) => {
let (predicate, status) = match get_entry_from_predicates_db(
&ChainhookInstance::either_stx_or_btc_key(&predicate_uuid),
&mut predicates_db_conn,
ctx,
) {
Ok(Some(predicate_with_status)) => predicate_with_status,
_ => {
return Json(json!({
"status": 404,
}))
}
};
let result = serialized_predicate_with_status(&predicate, &status);
Json(json!({
"status": 200,
"result": result
let mut predicates_db_conn = connect_to_redis_with_retry(&api_config.database_uri);
let (predicate, status) = match get_entry_from_predicates_db(
&ChainhookInstance::either_stx_or_btc_key(&predicate_uuid),
&mut predicates_db_conn,
ctx,
) {
Ok(Some(predicate_with_status)) => predicate_with_status,
_ => {
return Json(json!({
"status": 404,
}))
}
Err(e) => Json(json!({
"status": 500,
"message": e,
})),
}
};
let result = serialized_predicate_with_status(&predicate, &status);
Json(json!({
"status": 200,
"result": result
}))
}

#[openapi(tag = "Managing Predicates")]
Expand Down Expand Up @@ -334,16 +319,10 @@ pub fn get_entries_from_predicates_db(
}

pub fn load_predicates_from_redis(
config: &crate::config::Config,
predicate_db_conn: &mut Connection,
ctx: &Context,
) -> Result<Vec<(ChainhookInstance, PredicateStatus)>, String> {
let redis_uri: &str = config.expected_api_database_uri();
let client = redis::Client::open(redis_uri)
.map_err(|e| format!("unable to connect to redis: {}", e))?;
let mut predicate_db_conn = client
.get_connection()
.map_err(|e| format!("unable to connect to redis: {}", e))?;
get_entries_from_predicates_db(&mut predicate_db_conn, ctx)
get_entries_from_predicates_db(predicate_db_conn, ctx)
}

pub fn document_predicate_api_server() -> Result<String, String> {
Expand Down
Loading
Loading