Skip to content

Commit df1c0bd

Browse files
authored
Clean up structure
1 parent 153df86 commit df1c0bd

File tree

4 files changed

+123
-119
lines changed

4 files changed

+123
-119
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* @davidmisiak @priner @miksax

src/main.rs

Lines changed: 5 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
1-
use anyhow::anyhow;
21
use clap::Parser;
3-
use config::PoolConfig;
4-
use oura::{
5-
filters::selection::{self, Predicate},
6-
mapper,
7-
model::EventData,
8-
pipelining::{FilterProvider, SourceProvider, StageReceiver},
9-
sources::{n2n, AddressArg, BearerKind, IntersectArg, MagicArg, PointArg},
10-
utils::{ChainWellKnownInfo, Utils, WithUtils},
11-
};
12-
use std::{fs, str::FromStr, sync::Arc, thread::JoinHandle};
2+
use std::fs;
133
use tracing_subscriber::prelude::*;
144

155
mod config;
6+
mod setup;
7+
mod sink;
168

179
#[derive(Parser, Debug)]
1810
#[command(version)]
@@ -45,113 +37,7 @@ async fn main() -> anyhow::Result<()> {
4537
.with(filter)
4638
.init();
4739

48-
let (_handles, input) = oura_bootstrap(args.start, args.socket)?;
49-
start(input, &config.pools).await?;
40+
let (_handles, input) = setup::oura_bootstrap(args.start, args.socket)?;
41+
sink::start(input, &config.pools).await?;
5042
Ok(())
5143
}
52-
53-
pub async fn start(input: StageReceiver, pools: &[PoolConfig]) -> anyhow::Result<()> {
54-
tracing::info!("Starting");
55-
loop {
56-
let event = input.recv()?;
57-
58-
match &event.data {
59-
EventData::Transaction(transaction_record) => {
60-
if let Some(outputs) = &transaction_record.outputs {
61-
let mut pool = None;
62-
for output in outputs {
63-
pool = pool.or_else(|| pools.iter().find(|p| p.address == output.address));
64-
}
65-
if let Some(pool) = pool {
66-
tracing::info!("Found transaction for addr: {}", pool.address);
67-
for output in outputs {
68-
tracing::info!(
69-
"Address: {} {}, {:?}",
70-
output.address,
71-
output.amount,
72-
output.assets
73-
);
74-
}
75-
}
76-
}
77-
}
78-
_ => {
79-
tracing::info!("{:?}", event.data);
80-
}
81-
}
82-
}
83-
}
84-
85-
pub fn oura_bootstrap(
86-
start_block: Option<String>,
87-
socket: String,
88-
) -> anyhow::Result<(Vec<JoinHandle<()>>, StageReceiver)> {
89-
let magic = MagicArg::from_str("mainnet").unwrap();
90-
91-
let well_known = ChainWellKnownInfo::try_from_magic(*magic)
92-
.map_err(|_| anyhow!("chain well known info failed"))?;
93-
94-
let utils = Arc::new(Utils::new(well_known));
95-
96-
let mapper = mapper::Config {
97-
include_transaction_details: true,
98-
include_block_cbor: true,
99-
..Default::default()
100-
};
101-
102-
let intersect = match start_block {
103-
Some(s) => {
104-
let (slot, hash) = match s.split_once(':') {
105-
Some((s, h)) => (s.parse::<u64>()?, h),
106-
None => return Err(anyhow!("invalid start")),
107-
};
108-
println!("{} {}", slot, hash);
109-
Some(IntersectArg::Point(PointArg(slot, hash.to_string())))
110-
}
111-
None => None,
112-
};
113-
114-
#[allow(deprecated)]
115-
let source_config = n2n::Config {
116-
address: if socket.contains(':') {
117-
AddressArg(BearerKind::Tcp, socket)
118-
} else {
119-
AddressArg(BearerKind::Unix, socket)
120-
},
121-
magic: Some(magic),
122-
well_known: None,
123-
mapper,
124-
since: None,
125-
min_depth: 0,
126-
intersect,
127-
retry_policy: None,
128-
finalize: None, // TODO: configurable
129-
};
130-
131-
let source_setup = WithUtils::new(source_config, utils);
132-
133-
let check = Predicate::VariantIn(vec![String::from("Transaction")]);
134-
135-
let filter_setup = selection::Config { check };
136-
137-
let mut handles = Vec::new();
138-
139-
tracing::info!("{}", "Attempting to connect to node...");
140-
141-
let (source_handle, source_rx) = source_setup.bootstrap().map_err(|e| {
142-
tracing::error!("{}", e);
143-
anyhow!("failed to bootstrap source. Are you sure cardano-node is running?")
144-
})?;
145-
146-
tracing::info!("{}", "Connection to node established");
147-
148-
handles.push(source_handle);
149-
150-
let (filter_handle, filter_rx) = filter_setup
151-
.bootstrap(source_rx)
152-
.map_err(|_| anyhow!("failed to bootstrap filter"))?;
153-
154-
handles.push(filter_handle);
155-
156-
Ok((handles, filter_rx))
157-
}

src/setup.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use anyhow::anyhow;
2+
use oura::{
3+
filters::selection::{self, Predicate},
4+
mapper,
5+
pipelining::{FilterProvider, SourceProvider, StageReceiver},
6+
sources::{n2n, AddressArg, BearerKind, IntersectArg, MagicArg, PointArg},
7+
utils::{ChainWellKnownInfo, Utils, WithUtils},
8+
};
9+
use std::{str::FromStr, sync::Arc, thread::JoinHandle};
10+
11+
pub fn oura_bootstrap(
12+
start_block: Option<String>,
13+
socket: String,
14+
) -> anyhow::Result<(Vec<JoinHandle<()>>, StageReceiver)> {
15+
let magic = MagicArg::from_str("mainnet").unwrap();
16+
17+
let well_known = ChainWellKnownInfo::try_from_magic(*magic)
18+
.map_err(|_| anyhow!("chain well known info failed"))?;
19+
20+
let utils = Arc::new(Utils::new(well_known));
21+
22+
let mapper = mapper::Config {
23+
include_transaction_details: true,
24+
include_block_cbor: true,
25+
..Default::default()
26+
};
27+
28+
let intersect = match start_block {
29+
Some(s) => {
30+
let (slot, hash) = match s.split_once(':') {
31+
Some((s, h)) => (s.parse::<u64>()?, h),
32+
None => return Err(anyhow!("invalid start")),
33+
};
34+
println!("{} {}", slot, hash);
35+
Some(IntersectArg::Point(PointArg(slot, hash.to_string())))
36+
}
37+
None => None,
38+
};
39+
40+
#[allow(deprecated)]
41+
let source_config = n2n::Config {
42+
address: if socket.contains(':') {
43+
AddressArg(BearerKind::Tcp, socket)
44+
} else {
45+
AddressArg(BearerKind::Unix, socket)
46+
},
47+
magic: Some(magic),
48+
well_known: None,
49+
mapper,
50+
since: None,
51+
min_depth: 0,
52+
intersect,
53+
retry_policy: None,
54+
finalize: None, // TODO: configurable
55+
};
56+
57+
let source_setup = WithUtils::new(source_config, utils);
58+
59+
let check = Predicate::VariantIn(vec![String::from("Transaction")]);
60+
61+
let filter_setup = selection::Config { check };
62+
63+
let mut handles = Vec::new();
64+
65+
tracing::info!("{}", "Attempting to connect to node...");
66+
67+
let (source_handle, source_rx) = source_setup.bootstrap().map_err(|e| {
68+
tracing::error!("{}", e);
69+
anyhow!("failed to bootstrap source. Are you sure cardano-node is running?")
70+
})?;
71+
72+
tracing::info!("{}", "Connection to node established");
73+
74+
handles.push(source_handle);
75+
76+
let (filter_handle, filter_rx) = filter_setup
77+
.bootstrap(source_rx)
78+
.map_err(|_| anyhow!("failed to bootstrap filter"))?;
79+
80+
handles.push(filter_handle);
81+
82+
Ok((handles, filter_rx))
83+
}

src/sink.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use crate::config;
2+
use oura::{model::EventData, pipelining::StageReceiver};
3+
4+
pub async fn start(input: StageReceiver, pools: &[config::PoolConfig]) -> anyhow::Result<()> {
5+
tracing::info!("Starting");
6+
loop {
7+
let event = input.recv()?;
8+
9+
match &event.data {
10+
EventData::Transaction(transaction_record) => {
11+
if let Some(outputs) = &transaction_record.outputs {
12+
let mut pool = None;
13+
for output in outputs {
14+
pool = pool.or_else(|| pools.iter().find(|p| p.address == output.address));
15+
}
16+
if let Some(pool) = pool {
17+
tracing::info!("Found transaction for addr: {}", pool.address);
18+
for output in outputs {
19+
tracing::info!(
20+
"Address: {} {}, {:?}",
21+
output.address,
22+
output.amount,
23+
output.assets
24+
);
25+
}
26+
}
27+
}
28+
}
29+
_ => {
30+
tracing::info!("{:?}", event.data);
31+
}
32+
}
33+
}
34+
}

0 commit comments

Comments
 (0)