Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.

Commit 1fb7d90

Browse files
committed
TRY_PARA
1 parent 42194e3 commit 1fb7d90

File tree

1 file changed

+62
-41
lines changed

1 file changed

+62
-41
lines changed

fhevm-engine/fhevm-listener/tests/integration_test.rs

Lines changed: 62 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use futures_util::future::try_join_all;
12
use std::sync::atomic::AtomicU32;
23
use std::sync::atomic::Ordering;
34

@@ -26,36 +27,42 @@ sol!(
2627

2728
use crate::TFHEExecutorTest::TFHEExecutorTestInstance;
2829

29-
const NB_EVENTS: i64 = 30;
30+
const NB_EVENTS: i64 = 5;
3031

31-
async fn emit_events<P, N>(wallet: &EthereumWallet, url: &String, tfhe_contract: TFHEExecutorTestInstance<(), P, N>)
32+
async fn emit_events<P, N >(signer: &PrivateKeySigner, url: &String, tfhe_contract: TFHEExecutorTestInstance<(), P, N>)
3233
where
3334
P: Clone + alloy_provider::Provider<N> + 'static,
3435
N: Clone + alloy_provider::Network<TransactionRequest = TransactionRequest> + 'static,
3536
{
36-
static UNIQUE_INT: AtomicU32 = AtomicU32::new(1); // to counter avoid idempotency
37-
let url_clone = url.clone();
38-
let wallet_clone = wallet.clone();
39-
tokio::spawn(async move {
40-
let provider = ProviderBuilder::new()
41-
.wallet(wallet_clone)
42-
.on_ws(WsConnect::new(url_clone))
43-
.await
44-
.unwrap();
45-
let to_type = ToType::from_slice(&[1]);
46-
for i in 1..=NB_EVENTS {
47-
let pt = U256::from(UNIQUE_INT.fetch_add(1, Ordering::Relaxed));
48-
let txn_req = tfhe_contract
49-
.trivialEncrypt_1(pt.clone(), to_type.clone())
50-
.into_transaction_request()
51-
.into();
52-
let pending_txn = provider.send_transaction(txn_req).await.unwrap();
53-
let receipt = pending_txn.get_receipt().await.unwrap();
54-
assert!(receipt.status());
55-
}
56-
})
57-
.await
58-
.unwrap();
37+
static UNIQUE_INT: AtomicU32 = AtomicU32::new(1); // to counter idempotency
38+
let to_type = ToType::from_slice(&[4]);
39+
let mut pending_txns = vec![];
40+
for _ in 1..=NB_EVENTS {
41+
let url_clone = url.clone();
42+
let pt = U256::from(UNIQUE_INT.fetch_add(1, Ordering::Relaxed));
43+
let txn_req = tfhe_contract
44+
.trivialEncrypt_1(pt.clone(), to_type.clone())
45+
.into_transaction_request()
46+
.into();
47+
let signer_clone = signer.clone();
48+
pending_txns.push(tokio::spawn(async move {
49+
let wallet = EthereumWallet::new(signer_clone);
50+
let provider = ProviderBuilder::new()
51+
.wallet(wallet)
52+
.on_ws(WsConnect::new(url_clone))
53+
.await
54+
.unwrap();
55+
provider.clone().send_transaction(txn_req).await
56+
}));
57+
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
58+
}
59+
let mut receipts = vec![];
60+
for pending_txn in try_join_all(pending_txns).await.unwrap() {
61+
receipts.push(tokio::spawn(pending_txn.unwrap().get_receipt()));
62+
}
63+
for receipt in try_join_all(receipts).await.unwrap() {
64+
assert!(receipt.unwrap().status());
65+
}
5966
}
6067

6168

@@ -110,39 +117,53 @@ async fn test_listener_restart() -> Result<(), anyhow::Error> {
110117
// Start listener in background task
111118
let listener_handle = tokio::spawn(main(args.clone()));
112119

113-
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
120+
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
114121

115122
// Emit first batch of events
116-
let wallet = EthereumWallet::new(signer.clone());
123+
emit_events(&signer, &url, tfhe_contract.clone()).await;
117124

118-
emit_events(&wallet, &url, tfhe_contract.clone()).await;
125+
tokio::time::sleep(tokio::time::Duration::from_secs(40)).await; // let some time for propagation
119126

120127
// Kill the listener
121128
listener_handle.abort();
129+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
122130
let mut database = Database::new(&database_url, &coprocessor_api_key.unwrap(), chain_id).await;
123131
let last_block = database.read_last_seen_block().await;
124132
assert!(last_block.is_some());
125133
assert!(last_block.unwrap() > 1);
134+
let db_pool = PgPoolOptions::new().max_connections(1).connect(database_url).await?;
135+
let new_count = sqlx::query!("SELECT COUNT(*) FROM computations")
136+
.fetch_one(&db_pool)
137+
.await?
138+
.count
139+
.unwrap_or(0);
140+
assert_ne!(count, 0);
126141

127142
// Emit second batch
128-
emit_events(&wallet, &url, tfhe_contract.clone()).await;
143+
emit_events(&signer, &url, tfhe_contract.clone()).await;
129144

130145
// Restart listener
131146
let listener_handle = tokio::spawn(main(args.clone()));
132147

133148
// Continue with events
134-
emit_events(&wallet, &url, tfhe_contract.clone()).await;
135-
136-
// Give time for events to be processed
137-
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
138-
139-
let db_pool = PgPoolOptions::new().max_connections(1).connect(database_url).await?;
140-
141-
let count = sqlx::query!("SELECT COUNT(*) FROM computations")
142-
.fetch_one(&db_pool)
143-
.await?
144-
.count
145-
.unwrap_or(0);
149+
emit_events(&signer, &url, tfhe_contract.clone()).await;
150+
151+
let mut old_count = 0;
152+
loop {
153+
// Give time for events to be processed
154+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
155+
156+
let new_count = sqlx::query!("SELECT COUNT(*) FROM computations")
157+
.fetch_one(&db_pool)
158+
.await?
159+
.count
160+
.unwrap_or(0);
161+
162+
if old_count == new_count {
163+
break;
164+
};
165+
old_count = new_count;
166+
}
146167

147168
assert_eq!(count, 3 * NB_EVENTS);
148169
// Cleanup

0 commit comments

Comments
 (0)