From 76d4b23d9bba266fb3cd34d06c17cc2c8d6755a4 Mon Sep 17 00:00:00 2001 From: rudy Date: Mon, 3 Feb 2025 14:23:16 +0100 Subject: [PATCH 1/2] fix: listener, correct in memory version of last_seen_block --- fhevm-engine/fhevm-listener/src/bin/main.rs | 22 +++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/fhevm-engine/fhevm-listener/src/bin/main.rs b/fhevm-engine/fhevm-listener/src/bin/main.rs index 0382c2c5..4c6a2c71 100644 --- a/fhevm-engine/fhevm-listener/src/bin/main.rs +++ b/fhevm-engine/fhevm-listener/src/bin/main.rs @@ -208,6 +208,15 @@ impl InfiniteLogIter { tokio::time::sleep(Duration::from_secs(1)).await; continue; }; + match (self.last_seen_block, log.block_number) { + (Some(last_seen_block), Some(block_number)) => { + self.last_seen_block = Some(last_seen_block.max(block_number)); + } + (None, Some(block_number)) => { + self.last_seen_block = Some(block_number); + } + _ => (), + } return Some(log); } } @@ -241,7 +250,10 @@ async fn main() { log_iter.new_log_stream(true).await; while let Some(log) = log_iter.next().await { if let Some(block_number) = log.block_number { - eprintln!("Event at block: {}", { block_number }); + if log.block_number != log_iter.last_seen_block { + eprintln!("\n--------------------"); + eprintln!("Block {block_number}"); + } log_iter.last_seen_block = Some(block_number); } if !args.ignore_tfhe_events { @@ -249,7 +261,7 @@ async fn main() { TfheContract::TfheContractEvents::decode_log(&log.inner, true) { // TODO: filter on contract address if known - println!("\nTFHE {event:#?}"); + println!("TFHE {event:#?}"); if let Some(ref mut db) = db { match db.insert_tfhe_event(&event).await { Ok(_) => db.notify_database(EVENT_WORK_AVAILABLE).await, /* we always notify, e.g. for catchup */ @@ -261,10 +273,8 @@ async fn main() { continue; } } - if !args.ignore_tfhe_events { - if let Ok(event) = - AclContract::AclContractEvents::decode_log(&log.inner, true) - { + if !args.ignore_acl_events { + if let Ok(event) = AclContract::AclContractEvents::decode_log(&log.inner, true) { println!("ACL {event:#?}"); if let Some(ref mut db) = db { From ca97bc0de8b7a700d31939d6b522f7587bbf75fc Mon Sep 17 00:00:00 2001 From: rudy Date: Mon, 17 Feb 2025 16:52:23 +0100 Subject: [PATCH 2/2] feat: implement automatic catchup based on db table --- fhevm-engine/Cargo.lock | 3 + .../src/tests/operators_from_events.rs | 3 +- .../20250217133315_add_table_blocks_valid.sql | 8 + ...0330cd688076d02e375486d7ab07fab628280.json | 16 + ...53d059e01468778eea9588aff3f8aee6171a7.json | 22 ++ fhevm-engine/fhevm-listener/Cargo.toml | 7 + .../fhevm-listener/{src => }/build.rs | 14 +- .../contracts/TFHEExecutorTest.sol | 178 +++++++++ fhevm-engine/fhevm-listener/src/bin/main.rs | 284 +-------------- fhevm-engine/fhevm-listener/src/cmd/mod.rs | 344 ++++++++++++++++++ .../src/database/tfhe_event_propagate.rs | 59 +++ fhevm-engine/fhevm-listener/src/lib.rs | 1 + .../fhevm-listener/tests/integration_test.rs | 197 ++++++++++ 13 files changed, 849 insertions(+), 287 deletions(-) create mode 100644 fhevm-engine/fhevm-db/migrations/20250217133315_add_table_blocks_valid.sql create mode 100644 fhevm-engine/fhevm-listener/.sqlx/query-416ef65e70058585ce4cec14ef80330cd688076d02e375486d7ab07fab628280.json create mode 100644 fhevm-engine/fhevm-listener/.sqlx/query-69eea32069c04ab351856d18f4b53d059e01468778eea9588aff3f8aee6171a7.json rename fhevm-engine/fhevm-listener/{src => }/build.rs (50%) create mode 100644 fhevm-engine/fhevm-listener/contracts/TFHEExecutorTest.sol create mode 100644 fhevm-engine/fhevm-listener/src/cmd/mod.rs create mode 100644 fhevm-engine/fhevm-listener/tests/integration_test.rs diff --git a/fhevm-engine/Cargo.lock b/fhevm-engine/Cargo.lock index 2b836b21..5c7b7c0e 100644 --- a/fhevm-engine/Cargo.lock +++ b/fhevm-engine/Cargo.lock @@ -3130,10 +3130,13 @@ dependencies = [ "alloy-provider", "alloy-rpc-types", "alloy-sol-types", + "anyhow", "clap", "fhevm-engine-common", + "foundry-compilers", "futures-util", "serde", + "serial_test", "sqlx", "tokio", ] diff --git a/fhevm-engine/coprocessor/src/tests/operators_from_events.rs b/fhevm-engine/coprocessor/src/tests/operators_from_events.rs index 0b5fbcc0..55cf16a5 100644 --- a/fhevm-engine/coprocessor/src/tests/operators_from_events.rs +++ b/fhevm-engine/coprocessor/src/tests/operators_from_events.rs @@ -248,7 +248,8 @@ fn next_handle() -> Handle { async fn listener_event_to_db(app: &TestInstance) -> ListenerDatabase { let coprocessor_api_key = sqlx::types::Uuid::parse_str(default_api_key()).unwrap(); let url = app.db_url().to_string(); - ListenerDatabase::new(&url, &coprocessor_api_key).await + let chain_id = 0; + ListenerDatabase::new(&url, &coprocessor_api_key, chain_id).await } #[tokio::test] diff --git a/fhevm-engine/fhevm-db/migrations/20250217133315_add_table_blocks_valid.sql b/fhevm-engine/fhevm-db/migrations/20250217133315_add_table_blocks_valid.sql new file mode 100644 index 00000000..f0bd04dc --- /dev/null +++ b/fhevm-engine/fhevm-db/migrations/20250217133315_add_table_blocks_valid.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS blocks_valid ( + chain_id INT NOT NULL, + block_hash BYTEA NOT NULL, + block_number BIGINT NOT NULL, + listener_tfhe BOOLEAN NOT NULL DEFAULT FALSE, -- all tfhe events have been propagated for this block + listener_acl BOOLEAN NOT NULL DEFAULT FALSE, -- all acl events have been propagated for this block + PRIMARY KEY (chain_id, block_hash) +); diff --git a/fhevm-engine/fhevm-listener/.sqlx/query-416ef65e70058585ce4cec14ef80330cd688076d02e375486d7ab07fab628280.json b/fhevm-engine/fhevm-listener/.sqlx/query-416ef65e70058585ce4cec14ef80330cd688076d02e375486d7ab07fab628280.json new file mode 100644 index 00000000..23e15bca --- /dev/null +++ b/fhevm-engine/fhevm-listener/.sqlx/query-416ef65e70058585ce4cec14ef80330cd688076d02e375486d7ab07fab628280.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO blocks_valid (chain_id, block_hash, block_number, listener_tfhe)\n VALUES ($1, $2, $3, true)\n ON CONFLICT (chain_id, block_hash) DO UPDATE SET listener_tfhe = true;\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Bytea", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "416ef65e70058585ce4cec14ef80330cd688076d02e375486d7ab07fab628280" +} diff --git a/fhevm-engine/fhevm-listener/.sqlx/query-69eea32069c04ab351856d18f4b53d059e01468778eea9588aff3f8aee6171a7.json b/fhevm-engine/fhevm-listener/.sqlx/query-69eea32069c04ab351856d18f4b53d059e01468778eea9588aff3f8aee6171a7.json new file mode 100644 index 00000000..c5e79af5 --- /dev/null +++ b/fhevm-engine/fhevm-listener/.sqlx/query-69eea32069c04ab351856d18f4b53d059e01468778eea9588aff3f8aee6171a7.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT block_number FROM blocks_valid WHERE chain_id = $1 ORDER BY block_number DESC LIMIT 1;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "block_number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false + ] + }, + "hash": "69eea32069c04ab351856d18f4b53d059e01468778eea9588aff3f8aee6171a7" +} diff --git a/fhevm-engine/fhevm-listener/Cargo.toml b/fhevm-engine/fhevm-listener/Cargo.toml index aa8d2e16..acfc33e6 100644 --- a/fhevm-engine/fhevm-listener/Cargo.toml +++ b/fhevm-engine/fhevm-listener/Cargo.toml @@ -27,3 +27,10 @@ tokio = { workspace = true } # local dependencies fhevm-engine-common = { path = "../fhevm-engine-common" } + +[dev-dependencies] +anyhow = { workspace = true } +serial_test = "3.2.0" + +[build-dependencies] +foundry-compilers = { version = "0.13.0", features = ["svm-solc"] } diff --git a/fhevm-engine/fhevm-listener/src/build.rs b/fhevm-engine/fhevm-listener/build.rs similarity index 50% rename from fhevm-engine/fhevm-listener/src/build.rs rename to fhevm-engine/fhevm-listener/build.rs index b5a4d55e..92a2700c 100644 --- a/fhevm-engine/fhevm-listener/src/build.rs +++ b/fhevm-engine/fhevm-listener/build.rs @@ -1,12 +1,18 @@ -use std::path::Path; use foundry_compilers::{Project, ProjectPathsConfig}; +use std::path::Path; fn main() { - let paths = ProjectPathsConfig::hardhat(Path::new(env!("CARGO_MANIFEST_DIR"))).unwrap(); + println!("cargo::warning=build.rs run ..."); + let paths = + ProjectPathsConfig::hardhat(Path::new(env!("CARGO_MANIFEST_DIR"))) + .unwrap(); let project = Project::builder() .paths(paths) .build(Default::default()) .unwrap(); let output = project.compile().unwrap(); - assert_eq!(output.has_compiler_errors(), false); + if output.has_compiler_errors() { + eprintln!("{output}"); + } + assert!(!output.has_compiler_errors()); project.rerun_if_sources_changed(); -} \ No newline at end of file +} diff --git a/fhevm-engine/fhevm-listener/contracts/TFHEExecutorTest.sol b/fhevm-engine/fhevm-listener/contracts/TFHEExecutorTest.sol new file mode 100644 index 00000000..29e71191 --- /dev/null +++ b/fhevm-engine/fhevm-listener/contracts/TFHEExecutorTest.sol @@ -0,0 +1,178 @@ +// SPDX-License-Identifier: BSD-3-Clause-Clear +pragma solidity ^0.8.24; + +contract TFHEExecutorTest { + event FheAdd(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheSub(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheMul(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheDiv(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheRem(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheBitAnd(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheBitOr(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheBitXor(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheShl(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheShr(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheRotl(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheRotr(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheEq(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheEqBytes(address indexed caller, uint256 lhs, bytes rhs, bytes1 scalarByte, uint256 result); + event FheNe(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheNeBytes(address indexed caller, uint256 lhs, bytes rhs, bytes1 scalarByte, uint256 result); + event FheGe(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheGt(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheLe(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheLt(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheMin(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheMax(address indexed caller, uint256 lhs, uint256 rhs, bytes1 scalarByte, uint256 result); + event FheNeg(address indexed caller, uint256 ct, uint256 result); + event FheNot(address indexed caller, uint256 ct, uint256 result); + event VerifyCiphertext( + address indexed caller, + bytes32 inputHandle, + address userAddress, + bytes inputProof, + bytes1 inputType, + uint256 result + ); + event Cast(address indexed caller, uint256 ct, bytes1 toType, uint256 result); + event TrivialEncrypt(address indexed caller, uint256 pt, bytes1 toType, uint256 result); + event TrivialEncryptBytes(address indexed caller, bytes pt, bytes1 toType, uint256 result); + event FheIfThenElse(address indexed caller, uint256 control, uint256 ifTrue, uint256 ifFalse, uint256 result); + event FheRand(address indexed caller, bytes1 randType, bytes16 seed, uint256 result); + event FheRandBounded(address indexed caller, uint256 upperBound, bytes1 randType, bytes16 seed, uint256 result); + + function fheAdd(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheAdd", lhs, rhs, scalarByte))); + emit FheAdd(msg.sender, lhs, rhs, scalarByte, result); + } + function fheSub(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheSub", lhs, rhs, scalarByte))); + emit FheSub(msg.sender, lhs, rhs, scalarByte, result); + } + function fheMul(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheMul", lhs, rhs, scalarByte))); + emit FheMul(msg.sender, lhs, rhs, scalarByte, result); + } + function fheDiv(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheDiv", lhs, rhs, scalarByte))); + emit FheDiv(msg.sender, lhs, rhs, scalarByte, result); + } + function fheRem(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheRem", lhs, rhs, scalarByte))); + emit FheRem(msg.sender, lhs, rhs, scalarByte, result); + } + function fheBitAnd(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheBitAnd", lhs, rhs, scalarByte))); + emit FheBitAnd(msg.sender, lhs, rhs, scalarByte, result); + } + function fheBitOr(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheBitOr", lhs, rhs, scalarByte))); + emit FheBitOr(msg.sender, lhs, rhs, scalarByte, result); + } + function fheBitXor(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheBitXor", lhs, rhs, scalarByte))); + emit FheBitXor(msg.sender, lhs, rhs, scalarByte, result); + } + function fheShl(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheShl", lhs, rhs, scalarByte))); + emit FheShl(msg.sender, lhs, rhs, scalarByte, result); + } + function fheShr(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheShr", lhs, rhs, scalarByte))); + emit FheShr(msg.sender, lhs, rhs, scalarByte, result); + } + function fheRotl(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheRotl", lhs, rhs, scalarByte))); + emit FheRotl(msg.sender, lhs, rhs, scalarByte, result); + } + function fheRotr(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheRotr", lhs, rhs, scalarByte))); + emit FheRotr(msg.sender, lhs, rhs, scalarByte, result); + } + function fheEq(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheEq", lhs, rhs, scalarByte))); + emit FheEq(msg.sender, lhs, rhs, scalarByte, result); + } + function fheEq(uint256 lhs, bytes memory rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheEqBytes", lhs, rhs, scalarByte))); + emit FheEqBytes(msg.sender, lhs, rhs, scalarByte, result); + } + function fheNe(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheNe", lhs, rhs, scalarByte))); + emit FheNe(msg.sender, lhs, rhs, scalarByte, result); + } + function fheNe(uint256 lhs, bytes memory rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheNeBytes", lhs, rhs, scalarByte))); + emit FheNeBytes(msg.sender, lhs, rhs, scalarByte, result); + } + function fheGe(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheGe", lhs, rhs, scalarByte))); + emit FheGe(msg.sender, lhs, rhs, scalarByte, result); + } + function fheGt(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheGt", lhs, rhs, scalarByte))); + emit FheGt(msg.sender, lhs, rhs, scalarByte, result); + } + function fheLe(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheLe", lhs, rhs, scalarByte))); + emit FheLe(msg.sender, lhs, rhs, scalarByte, result); + } + function fheLt(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheLt", lhs, rhs, scalarByte))); + emit FheLt(msg.sender, lhs, rhs, scalarByte, result); + } + function fheMin(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheMin", lhs, rhs, scalarByte))); + emit FheMin(msg.sender, lhs, rhs, scalarByte, result); + } + function fheMax(uint256 lhs, uint256 rhs, bytes1 scalarByte) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheMax", lhs, rhs, scalarByte))); + emit FheMax(msg.sender, lhs, rhs, scalarByte, result); + } + function fheNeg(uint256 ct) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheNeg", ct))); + emit FheNeg(msg.sender, ct, result); + } + function fheNot(uint256 ct) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheNot", ct))); + emit FheNot(msg.sender, ct, result); + } + function fheIfThenElse(uint256 control, uint256 ifTrue, uint256 ifFalse) public { + uint256 result = uint256(keccak256(abi.encodePacked("fheIfThenElse", control, ifTrue, ifFalse))); + emit FheIfThenElse(msg.sender, control, ifTrue, ifFalse, result); + } + function fheRand(bytes1 randType) public { + bytes16 seed = bytes16(keccak256(abi.encodePacked(block.timestamp))); + uint256 result = uint256(keccak256(abi.encodePacked("fheRand", randType, seed))); + emit FheRand(msg.sender, randType, seed, result); + } + function fheRandBounded(uint256 upperBound, bytes1 randType) public { + bytes16 seed = bytes16(keccak256(abi.encodePacked(block.timestamp))); + uint256 result = uint256(keccak256(abi.encodePacked("fheRandBounded", upperBound, randType, seed))); + emit FheRandBounded(msg.sender, upperBound, randType, seed, result); + } + function cast(uint256 ct, bytes1 toType) public { + uint256 result = uint256(keccak256(abi.encodePacked("cast", ct, toType))); + emit Cast(msg.sender, ct, toType, result); + } + + function trivialEncrypt(uint256 pt, bytes1 toType) public { + uint256 result = uint256(keccak256(abi.encodePacked("trivialEncrypt", pt, toType))); + emit TrivialEncrypt(msg.sender, pt, toType, result); + } + + function trivialEncrypt(bytes memory pt, bytes1 toType) public { + uint256 result = uint256(keccak256(abi.encodePacked("trivialEncryptBytes", pt, toType))); + emit TrivialEncryptBytes(msg.sender, pt, toType, result); + } + + function verifyCiphertext( + bytes32 inputHandle, + address userAddress, + bytes memory inputProof, + bytes1 inputType + ) public { + uint256 result = uint256(keccak256(abi.encodePacked("verifyCiphertext", inputHandle, userAddress, inputProof, inputType))); + emit VerifyCiphertext(msg.sender, inputHandle, userAddress, inputProof, inputType, result); + } +} diff --git a/fhevm-engine/fhevm-listener/src/bin/main.rs b/fhevm-engine/fhevm-listener/src/bin/main.rs index 4c6a2c71..76a08549 100644 --- a/fhevm-engine/fhevm-listener/src/bin/main.rs +++ b/fhevm-engine/fhevm-listener/src/bin/main.rs @@ -1,287 +1,7 @@ -use alloy_provider::fillers::{ - BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, - NonceFiller, -}; -use futures_util::stream::StreamExt; -use sqlx::types::Uuid; -use std::str::FromStr; -use std::time::Duration; - -use alloy::primitives::Address; -use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect}; -use alloy::pubsub::SubscriptionStream; -use alloy::rpc::types::{BlockNumberOrTag, Filter, Log}; - -use alloy_sol_types::SolEventInterface; - use clap::Parser; -use fhevm_listener::contracts::{AclContract, TfheContract}; -use fhevm_listener::database::tfhe_event_propagate::{ - Database, EVENT_WORK_AVAILABLE, -}; - -const DEFAULT_CATCHUP: u64 = 5; - -#[derive(Parser, Debug, Clone)] -#[command(version, about, long_about = None)] -pub struct Args { - #[arg(long, default_value = "ws://0.0.0.0:8746")] - pub url: String, - - #[arg(long, default_value = "false")] - pub ignore_tfhe_events: bool, - - #[arg(long, default_value = "false")] - pub ignore_acl_events: bool, - - #[arg(long, default_value = None)] - pub acl_contract_address: Option, - - #[arg(long, default_value = None)] - pub tfhe_contract_address: Option, - - #[arg( - long, - default_value = "postgresql://postgres:testmdp@localhost:5432/postgres" - )] - pub database_url: String, - - #[arg(long, default_value = None, help = "Can be negative from last block", allow_hyphen_values = true)] - pub start_at_block: Option, - - #[arg(long, default_value = None)] - pub end_at_block: Option, - - #[arg(long, default_value = None, help = "A Coprocessor API key is needed for database access")] - pub coprocessor_api_key: Option, -} - -type RProvider = FillProvider< - JoinFill< - alloy::providers::Identity, - JoinFill< - GasFiller, - JoinFill>, - >, - >, - RootProvider, ->; - -// TODO: to merge with Levent works -struct InfiniteLogIter { - url: String, - contract_addresses: Vec
, - stream: Option>, - provider: Option, // required to maintain the stream - last_seen_block: Option, - start_at_block: Option, - end_at_block: Option, -} - -impl InfiniteLogIter { - fn new(args: &Args) -> Self { - let mut contract_addresses = vec![]; - if let Some(acl_contract_address) = &args.acl_contract_address { - contract_addresses - .push(Address::from_str(acl_contract_address).unwrap()); - }; - if let Some(tfhe_contract_address) = &args.tfhe_contract_address { - contract_addresses - .push(Address::from_str(tfhe_contract_address).unwrap()); - }; - Self { - url: args.url.clone(), - contract_addresses, - stream: None, - provider: None, - last_seen_block: None, - start_at_block: args.start_at_block, - end_at_block: args.end_at_block, - } - } - - async fn catchup_block_from( - &self, - provider: &RProvider, - ) -> BlockNumberOrTag { - if let Some(last_seen_block) = self.last_seen_block { - return BlockNumberOrTag::Number(last_seen_block - 1); - } - if let Some(start_at_block) = self.start_at_block { - if start_at_block >= 0 { - return BlockNumberOrTag::Number( - start_at_block.try_into().unwrap(), - ); - } - } - let Ok(last_block) = provider.get_block_number().await else { - return BlockNumberOrTag::Earliest; // should not happend - }; - let catch_size = if let Some(start_at_block) = self.start_at_block { - (-start_at_block).try_into().unwrap() - } else { - DEFAULT_CATCHUP - }; - BlockNumberOrTag::Number(last_block - catch_size.min(last_block)) - } - - async fn new_log_stream(&mut self, not_initialized: bool) { - let mut retry = 20; - loop { - let ws = WsConnect::new(&self.url); - match ProviderBuilder::new().on_ws(ws).await { - Ok(provider) => { - let catch_up_from = - self.catchup_block_from(&provider).await; - if not_initialized { - eprintln!("Catchup from {:?}", catch_up_from); - } - let mut filter = Filter::new().from_block(catch_up_from); - if let Some(end_at_block) = self.end_at_block { - filter = filter - .to_block(BlockNumberOrTag::Number(end_at_block)); - // inclusive - } - if !self.contract_addresses.is_empty() { - filter = filter.address(self.contract_addresses.clone()) - } - eprintln!("Listening on {}", &self.url); - eprintln!("Contracts {:?}", &self.contract_addresses); - self.stream = Some( - provider - .subscribe_logs(&filter) - .await - .expect("BLA2") - .into_stream(), - ); - self.provider = Some(provider); - return; - } - Err(err) => { - let delay = if not_initialized { - if retry == 0 { - panic!( - "Cannot connect to {} due to {err}.", - &self.url - ) - } - 5 - } else { - 1 - }; - if not_initialized { - eprintln!("Cannot connect to {} due to {err}. Will retry in {delay} secs, {retry} times.", &self.url); - } else { - eprintln!("Cannot connect to {} due to {err}. Will retry in {delay} secs, indefinitively.", &self.url); - } - retry -= 1; - tokio::time::sleep(Duration::from_secs(delay)).await; - } - } - } - } - - async fn next(&mut self) -> Option { - let mut not_initialized = true; - loop { - let Some(stream) = &mut self.stream else { - self.new_log_stream(not_initialized).await; - not_initialized = false; - continue; - }; - let Some(log) = stream.next().await else { - // the stream ends, could be a restart of the full node, or just - // a temporary gap - self.stream = None; - if let (Some(end_at_block), Some(last_seen_block)) = - (self.end_at_block, self.last_seen_block) - { - if end_at_block == last_seen_block { - eprintln!( - "Nothing to read, reached end of block range" - ); - return None; - } - } - eprintln!("Nothing to read, retrying"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - }; - match (self.last_seen_block, log.block_number) { - (Some(last_seen_block), Some(block_number)) => { - self.last_seen_block = Some(last_seen_block.max(block_number)); - } - (None, Some(block_number)) => { - self.last_seen_block = Some(block_number); - } - _ => (), - } - return Some(log); - } - } -} - #[tokio::main] async fn main() { - let args = Args::parse(); - let mut log_iter = InfiniteLogIter::new(&args); - if let Some(acl_contract_address) = &args.acl_contract_address { - if let Err(err) = Address::from_str(acl_contract_address) { - panic!("Invalid acl contract address: {err}"); - }; - }; - if let Some(tfhe_contract_address) = &args.tfhe_contract_address { - if let Err(err) = Address::from_str(tfhe_contract_address) { - panic!("Invalid tfhe contract address: {err}"); - }; - } - - let mut db: Option = None; - if !args.database_url.is_empty() { - if let Some(coprocessor_api_key) = args.coprocessor_api_key { - db = Some( - Database::new(&args.database_url, &coprocessor_api_key).await, - ); - } else { - panic!("A Coprocessor API key is required to access the database"); - } - } - log_iter.new_log_stream(true).await; - while let Some(log) = log_iter.next().await { - if let Some(block_number) = log.block_number { - if log.block_number != log_iter.last_seen_block { - eprintln!("\n--------------------"); - eprintln!("Block {block_number}"); - } - log_iter.last_seen_block = Some(block_number); - } - if !args.ignore_tfhe_events { - if let Ok(event) = - TfheContract::TfheContractEvents::decode_log(&log.inner, true) - { - // TODO: filter on contract address if known - println!("TFHE {event:#?}"); - if let Some(ref mut db) = db { - match db.insert_tfhe_event(&event).await { - Ok(_) => db.notify_database(EVENT_WORK_AVAILABLE).await, /* we always notify, e.g. for catchup */ - Err(err) => { - eprintln!("Error inserting tfhe event: {err}") - } - } - } - continue; - } - } - if !args.ignore_acl_events { - if let Ok(event) = AclContract::AclContractEvents::decode_log(&log.inner, true) { - println!("ACL {event:#?}"); - - if let Some(ref mut db) = db { - let _ = db.handle_acl_event(&event).await; - } - continue; - } - } - } + let args = fhevm_listener::cmd::Args::parse(); + fhevm_listener::cmd::main(args).await; } diff --git a/fhevm-engine/fhevm-listener/src/cmd/mod.rs b/fhevm-engine/fhevm-listener/src/cmd/mod.rs new file mode 100644 index 00000000..585c2e87 --- /dev/null +++ b/fhevm-engine/fhevm-listener/src/cmd/mod.rs @@ -0,0 +1,344 @@ +use alloy_provider::fillers::{BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller}; +use futures_util::stream::StreamExt; +use sqlx::types::Uuid; +use std::collections::VecDeque; +use std::str::FromStr; +use std::time::Duration; + +use alloy::primitives::Address; +use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect}; +use alloy::pubsub::SubscriptionStream; +use alloy::rpc::types::{BlockNumberOrTag, Filter, Log}; + +use alloy_sol_types::SolEventInterface; + +use clap::Parser; + +use crate::contracts::{AclContract, TfheContract}; +use crate::database::tfhe_event_propagate::{ChainId, Database}; + +#[derive(Parser, Debug, Clone)] +#[command(version, about, long_about = None)] +pub struct Args { + #[arg(long, default_value = "ws://0.0.0.0:8746")] + pub url: String, + + #[arg(long, default_value = "false")] + pub ignore_tfhe_events: bool, + + #[arg(long, default_value = "false")] + pub ignore_acl_events: bool, + + #[arg(long, default_value = None)] + pub acl_contract_address: Option, + + #[arg(long, default_value = None)] + pub tfhe_contract_address: Option, + + #[arg(long, default_value = "postgresql://postgres:testmdp@localhost:5432/postgres")] + pub database_url: String, + + #[arg(long, default_value = None, help = "Can be negative from last block", allow_hyphen_values = true)] + pub start_at_block: Option, + + #[arg(long, default_value = None)] + pub end_at_block: Option, + + #[arg(long, default_value = None, help = "A Coprocessor API key is needed for database access")] + pub coprocessor_api_key: Option, + + #[arg(long, default_value = "5", help = "Catchup margin relative the last seen block")] + pub catchup_margin: u64, +} + +type RProvider = FillProvider< + JoinFill< + alloy::providers::Identity, + JoinFill>>, + >, + RootProvider, +>; + +// TODO: to merge with Levent works +struct InfiniteLogIter { + url: String, + contract_addresses: Vec
, + catchup_logs: VecDeque, + stream: Option>, + provider: Option, // required to maintain the stream + last_valid_block: Option, + start_at_block: Option, + end_at_block: Option, + catchup_margin: u64, + prev_event: Option, + current_event: Option, +} + +impl InfiniteLogIter { + fn new(args: &Args) -> Self { + let mut contract_addresses = vec![]; + if let Some(acl_contract_address) = &args.acl_contract_address { + contract_addresses.push(Address::from_str(acl_contract_address).unwrap()); + }; + if let Some(tfhe_contract_address) = &args.tfhe_contract_address { + contract_addresses.push(Address::from_str(tfhe_contract_address).unwrap()); + }; + Self { + url: args.url.clone(), + contract_addresses, + catchup_logs: VecDeque::new(), + stream: None, + provider: None, + last_valid_block: None, + start_at_block: args.start_at_block, + end_at_block: args.end_at_block, + catchup_margin: args.catchup_margin, + prev_event: None, + current_event: None, + } + } + + async fn get_chain_id_or_panic(&self) -> ChainId { + let ws = WsConnect::new(&self.url); + let provider = ProviderBuilder::new().on_ws(ws).await.unwrap(); + provider.get_chain_id().await.unwrap() + } + + async fn catchup_block_from(&self, provider: &RProvider) -> BlockNumberOrTag { + if let Some(last_seen_block) = self.last_valid_block { + return BlockNumberOrTag::Number(last_seen_block - self.catchup_margin + 1); + } + if let Some(start_at_block) = self.start_at_block { + if start_at_block >= 0 { + return BlockNumberOrTag::Number(start_at_block.try_into().unwrap()); + } + } + let Ok(last_block) = provider.get_block_number().await else { + return BlockNumberOrTag::Earliest; // should not happend + }; + let catch_size = if let Some(start_at_block) = self.start_at_block { + (-start_at_block).try_into().unwrap() + } else { + self.catchup_margin + }; + BlockNumberOrTag::Number(last_block - catch_size.min(last_block)) + } + + async fn fill_catchup_events(&mut self, provider: &RProvider, filter: &Filter) { + let logs = provider.get_logs(&filter).await.expect("BLA2"); + self.catchup_logs.extend(logs); + } + + async fn new_log_stream(&mut self, not_initialized: bool) { + let mut retry = 20; + loop { + let ws = WsConnect::new(&self.url); + match ProviderBuilder::new().on_ws(ws).await { + Ok(provider) => { + let catch_up_from = self.catchup_block_from(&provider).await; + let mut filter = Filter::new().from_block(catch_up_from); + if let Some(end_at_block) = self.end_at_block { + filter = filter + .to_block(BlockNumberOrTag::Number(end_at_block)); + // inclusive + } + if !self.contract_addresses.is_empty() { + filter = filter.address(self.contract_addresses.clone()) + } + eprintln!("Listening on {}", &self.url); + eprintln!("Contracts {:?}", &self.contract_addresses); + // note subcribing to real-time before reading catchup events to have the minimal gap between the two + // TODO: but it does not guarantee no gap for now (implementation dependant) + self.stream = Some( + provider + .subscribe_logs(&filter) + .await + .expect("BLA2") + .into_stream(), + ); + self.fill_catchup_events(&provider, &filter).await; + self.provider = Some(provider); + return; + } + Err(err) => { + let delay = if not_initialized { + if retry == 0 { + panic!( + "Cannot connect to {} due to {err}.", + &self.url + ) + } + 5 + } else { + 1 + }; + if not_initialized { + eprintln!( + "Cannot connect to {} due to {err}. Will retry in {delay} secs, {retry} times.", + &self.url + ); + } else { + eprintln!( + "Cannot connect to {} due to {err}. Will retry in {delay} secs, indefinitively.", + &self.url + ); + } + retry -= 1; + tokio::time::sleep(Duration::from_secs(delay)).await; + } + } + } + } + + async fn next(&mut self) -> Option { + let mut not_initialized = true; + self.prev_event = self.current_event.take(); + while self.current_event.is_none() { + let Some(stream) = &mut self.stream else { + self.new_log_stream(not_initialized).await; + not_initialized = false; + continue; + }; + if let Some(log) = self.catchup_logs.pop_front() { + if self.catchup_logs.is_empty() { + eprintln!("Last catchup event"); + }; + self.current_event = Some(log); + break; + }; + let Some(log) = stream.next().await else { + // the stream ends, could be a restart of the full node, or just a temporary gap + self.stream = None; + if let (Some(end_at_block), Some(last_seen_block)) = + (self.end_at_block, self.last_valid_block) + { + if end_at_block == last_seen_block { + eprintln!( + "Nothing to read, reached end of block range" + ); + return None; + } + } + eprintln!("Nothing to read, retrying"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + self.current_event = Some(log); + break; + } + let Some(current_event) = &self.current_event else { + return None; + }; + if let Some(block_number) = current_event.block_number { + // we subtract one because the current block is on going + self.last_valid_block = Some(block_number.max(self.last_valid_block.unwrap_or_default()) - 1); + } + return self.current_event.clone(); + } + + fn is_first_of_block(&self) -> bool { + match (&self.current_event, &self.prev_event) { + (Some(current_event), Some(prev_event)) => { + current_event.block_number != prev_event.block_number + } + _ => false, + } + } +} + +pub async fn main(args: Args) { + if let Some(acl_contract_address) = &args.acl_contract_address { + if let Err(err) = Address::from_str(acl_contract_address) { + panic!("Invalid acl contract address: {err}"); + }; + }; + if let Some(tfhe_contract_address) = &args.tfhe_contract_address { + if let Err(err) = Address::from_str(tfhe_contract_address) { + panic!("Invalid tfhe contract address: {err}"); + }; + } + + let mut log_iter = InfiniteLogIter::new(&args); + let chain_id = log_iter.get_chain_id_or_panic().await; + eprintln!("Chain ID: {chain_id}"); + + let mut db = if !args.database_url.is_empty() { + if let Some(coprocessor_api_key) = args.coprocessor_api_key { + let mut db = Database::new( + &args.database_url, + &coprocessor_api_key, + chain_id, + ) + .await; + if log_iter.start_at_block.is_none() { + log_iter.start_at_block = db + .read_last_valid_block() + .await + .map(|n| n - args.catchup_margin as i64); + } + Some(db) + } else { + panic!("A Coprocessor API key is required to access the database"); + } + } else { + None + }; + + log_iter.new_log_stream(true).await; + + let mut block_error_event_fthe = 0; + while let Some(log) = log_iter.next().await { + if log_iter.is_first_of_block() { + if let Some(block_number) = log.block_number { + if block_error_event_fthe == 0 { + if let Some(ref mut db) = db { + db.mark_prev_block_as_valid( + &log_iter.current_event, + &log_iter.prev_event, + ) + .await; + } + } else { + eprintln!( + "Errors in tfhe events: {block_error_event_fthe}" + ); + block_error_event_fthe = 0; + } + eprintln!("\n--------------------"); + eprintln!("Block {block_number}"); + } + }; + if block_error_event_fthe > 0 { + eprintln!("Errors in block {block_error_event_fthe}"); + } + if !args.ignore_tfhe_events { + if let Ok(event) = + TfheContract::TfheContractEvents::decode_log(&log.inner, true) + { + // TODO: filter on contract address if known + println!("TFHE {event:#?}"); + if let Some(ref mut db) = db { + match db.insert_tfhe_event(&event).await { + Ok(_) => db.notify_scheduler().await, + Err(err) => { + block_error_event_fthe += 1; + eprintln!("Error inserting tfhe event: {err}") + } + } + } + continue; + } + } + if !args.ignore_acl_events { + if let Ok(event) = + AclContract::AclContractEvents::decode_log(&log.inner, true) + { + println!("ACL {event:#?}"); + if let Some(ref mut db) = db { + let _ = db.handle_acl_event(&event).await; + } + continue; + } + } + } +} diff --git a/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs b/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs index fb8eaa18..fa41c231 100644 --- a/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs +++ b/fhevm-engine/fhevm-listener/src/database/tfhe_event_propagate.rs @@ -18,6 +18,7 @@ type CoprocessorApiKey = Uuid; type FheOperation = i32; pub type Handle = Uint<256, 4>; pub type TenantId = i32; +pub type ChainId = u64; pub type ToType = FixedBytes<1>; pub type ScalarByte = FixedBytes<1>; @@ -43,12 +44,14 @@ pub struct Database { url: String, pool: sqlx::Pool, tenant_id: TenantId, + chain_id: ChainId, } impl Database { pub async fn new( url: &str, coprocessor_api_key: &CoprocessorApiKey, + chain_id: ChainId, ) -> Self { let pool = Self::new_pool(url).await; let tenant_id = @@ -56,6 +59,7 @@ impl Database { Database { url: url.into(), tenant_id, + chain_id, pool, } } @@ -288,6 +292,61 @@ impl Database { } } + pub async fn mark_prev_block_as_valid( + &mut self, + opt_event: &Option, + opt_prev_event: &Option, + ) { + let Some(prev_event) = opt_prev_event else { + return; + }; + let Some(event) = opt_event else { + return; + }; + if prev_event.block_number == event.block_number { + return; + } + let prev_event = if prev_event.block_number < event.block_number { + event + } else { + prev_event + }; + let Some(block_number) = prev_event.block_number else { + return; + }; + let Some(block_hash) = prev_event.block_hash else { + return; + }; + let _ = sqlx::query!( + r#" + INSERT INTO blocks_valid (chain_id, block_hash, block_number, listener_tfhe) + VALUES ($1, $2, $3, true) + ON CONFLICT (chain_id, block_hash) DO UPDATE SET listener_tfhe = true; + "#, + self.chain_id as i64, + block_hash.to_vec(), + block_number as i64, + ) + .execute(&self.pool) + .await; + } + + pub async fn read_last_valid_block(&mut self) -> Option { + let query = || { + sqlx::query!( + r#" + SELECT block_number FROM blocks_valid WHERE chain_id = $1 ORDER BY block_number DESC LIMIT 1; + "#, + self.chain_id as i64, + ) + .fetch_one(&self.pool) + }; + match query().await { + Ok(record) => Some(record.block_number), + Err(_err) => None, // table could be empty + } + } + /// Makes attempts to notify a specified DB channel pub async fn notify_database(&mut self, channel: &str) { let query = || sqlx::query!("SELECT pg_notify($1, '')", channel); diff --git a/fhevm-engine/fhevm-listener/src/lib.rs b/fhevm-engine/fhevm-listener/src/lib.rs index de934b9b..7c837161 100644 --- a/fhevm-engine/fhevm-listener/src/lib.rs +++ b/fhevm-engine/fhevm-listener/src/lib.rs @@ -1,2 +1,3 @@ +pub mod cmd; pub mod contracts; pub mod database; diff --git a/fhevm-engine/fhevm-listener/tests/integration_test.rs b/fhevm-engine/fhevm-listener/tests/integration_test.rs new file mode 100644 index 00000000..32dcdde4 --- /dev/null +++ b/fhevm-engine/fhevm-listener/tests/integration_test.rs @@ -0,0 +1,197 @@ +use futures_util::future::try_join_all; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; + +use alloy::network::EthereumWallet; +use alloy::node_bindings::Anvil; +use alloy::signers::local::PrivateKeySigner; +use alloy::sol; + +use alloy_primitives::U256; +use alloy_provider::{Provider, ProviderBuilder, WsConnect}; + +use alloy_rpc_types::TransactionRequest; +use serial_test::serial; +use sqlx::postgres::PgPoolOptions; + +use fhevm_listener::cmd::main; +use fhevm_listener::cmd::Args; +use fhevm_listener::database::tfhe_event_propagate::{Database, ToType}; + +sol!( + #[sol(rpc)] + #[derive(Debug, serde::Serialize, serde::Deserialize)] + TFHEExecutorTest, + "artifacts/TFHEExecutorTest.sol/TFHEExecutorTest.json" +); + +use crate::TFHEExecutorTest::TFHEExecutorTestInstance; + +const NB_EVENTS_PER_WALLET: i64 = 400; + +async fn emit_events( + wallets: &[EthereumWallet], + url: &String, + tfhe_contract: TFHEExecutorTestInstance<(), P, N>, +) where + P: Clone + alloy_provider::Provider + 'static, + N: Clone + + alloy_provider::Network + + 'static, +{ + let url_clone = url.clone(); + let mut providers = vec![]; + for wallet in wallets { + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .on_ws(WsConnect::new(url_clone.clone())) + .await + .unwrap(); + providers.push(provider); + } + static UNIQUE_INT: AtomicU32 = AtomicU32::new(1); // to counter avoid idempotency + let mut threads = vec![]; + for provider in providers.iter() { + let tfhe_contract = tfhe_contract.clone(); + let provider = provider.clone(); + let thread = tokio::spawn(async move { + for _ in 1..=NB_EVENTS_PER_WALLET { + let to_type = ToType::from_slice(&[4]); + let pt = U256::from(UNIQUE_INT.fetch_add(1, Ordering::SeqCst)); + let txn_req = tfhe_contract + .trivialEncrypt_1(pt.clone(), to_type.clone()) + .into_transaction_request() + .into(); + let pending_txn = + provider.send_transaction(txn_req).await.unwrap(); + let receipt = pending_txn.get_receipt().await.unwrap(); + } + }); + threads.push(thread); + } + if let Err(err) = try_join_all(threads).await { + eprintln!("{err}"); + assert!(false); + } +} + +#[tokio::test] +#[serial(db)] +async fn test_listener_restart() -> Result<(), anyhow::Error> { + let anvil = Anvil::new() + .block_time_f64(1.0) + .args(["--accounts", "15"]) + .spawn(); + let chain_id = anvil.chain_id(); + let nb_wallet = anvil.keys().len() as i64; + eprintln!("Nb wallet {}", nb_wallet); + let mut wallets = vec![]; + for key in anvil.keys().iter() { + let signer: PrivateKeySigner = key.clone().into(); + let wallet = EthereumWallet::new(signer); + wallets.push(wallet); + } + let url = anvil.ws_endpoint(); + + let database_url = + "postgresql://postgres:postgres@localhost:5432/coprocessor"; + + let db_pool = PgPoolOptions::new() + .max_connections(1) + .connect(database_url) + .await?; + + sqlx::query!("TRUNCATE computations") + .execute(&db_pool) + .await?; + sqlx::query!("TRUNCATE blocks_valid") + .execute(&db_pool) + .await?; + let count = sqlx::query!("SELECT COUNT(*) FROM computations") + .fetch_one(&db_pool) + .await? + .count + .unwrap_or(0); + assert_eq!(count, 0); + + let coprocessor_api_key = Some( + sqlx::query!("SELECT tenant_api_key FROM tenants LIMIT 1") + .fetch_one(&db_pool) + .await? + .tenant_api_key, + ); + + let provider = ProviderBuilder::new() + .wallet(wallets[0].clone()) + .on_ws(WsConnect::new(url.clone())) + .await?; + let tfhe_contract = TFHEExecutorTest::deploy(provider.clone()).await?; + let args = Args { + url: url.clone(), + ignore_tfhe_events: false, + ignore_acl_events: false, + acl_contract_address: None, + tfhe_contract_address: None, + database_url: database_url.into(), + coprocessor_api_key, + start_at_block: None, + end_at_block: None, + catchup_margin: 5, + }; + + // Start listener in background task + let listener_handle = tokio::spawn(main(args.clone())); + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Emit first batch of events + let wallets_clone = wallets.clone(); + let url_clone = url.clone(); + let tfhe_contract_clone = tfhe_contract.clone(); + let event_source = tokio::spawn(async move { + emit_events(&wallets_clone, &url_clone, tfhe_contract_clone).await; + }); + + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + // Kill the listener + eprintln!("First kill, check database valid block has been updated"); + listener_handle.abort(); + let mut database = + Database::new(&database_url, &coprocessor_api_key.unwrap(), chain_id) + .await; + let last_block = database.read_last_valid_block().await; + assert!(last_block.is_some()); + assert!(last_block.unwrap() > 1); + + let db_pool = PgPoolOptions::new() + .max_connections(1) + .connect(database_url) + .await?; + let mut events_count = 0; + let mut nb_kill = 1; + // Restart/kill many time until no more events are consumned. + loop { + let listener_handle = tokio::spawn(main(args.clone())); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + let new_count = sqlx::query!("SELECT COUNT(*) FROM computations") + .fetch_one(&db_pool) + .await? + .count + .unwrap_or(0); + if event_source.is_finished() && events_count == new_count { + listener_handle.abort(); + break; + }; + events_count = new_count; + listener_handle.abort(); + nb_kill += 1; + eprintln!("Kill {nb_kill} ongoing"); + tokio::time::sleep(tokio::time::Duration::from_secs_f64(1.5)).await; + } + + assert_eq!(events_count, nb_wallet * NB_EVENTS_PER_WALLET); + eprintln!("Total kills: {nb_kill}"); + assert!(3 < nb_kill); + Ok(()) +}