Skip to content

Commit

Permalink
Add status field
Browse files Browse the repository at this point in the history
  • Loading branch information
Mubelotix committed Jun 1, 2024
1 parent d7319ca commit 0dd3907
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 12 deletions.
44 changes: 32 additions & 12 deletions daemon/src/index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::*;
#[derive(Clone)]
pub struct DocumentIndex {
config: Arc<Args>,
status: Arc<RwLock<IndexingStatus>>,
inner: Arc<RwLock<DocumentIndexInner>>,
}

Expand All @@ -11,6 +12,7 @@ impl DocumentIndex {
pub async fn new(config: Arc<Args>) -> DocumentIndex {
DocumentIndex {
inner: Arc::new(RwLock::new(DocumentIndexInner::new(Arc::clone(&config)).await)),
status: Arc::new(RwLock::new(IndexingStatus::default())),
config,
}
}
Expand Down Expand Up @@ -48,7 +50,7 @@ impl DocumentIndex {
let mut to_list = Vec::new();
let mut to_load = HashMap::new();
let mut to_load_unprioritized = HashMap::new();

// List pinned elements
let pinned = match list_pinned(&self.config.ipfs_rpc).await {
Ok(pinned) => pinned,
Expand All @@ -64,13 +66,15 @@ impl DocumentIndex {
};
last_printed_error = None;
to_list.extend(pinned.iter().filter_map(normalize_cid).filter(|cid| !listed.contains(cid)));
self.set_status(listed.len(), to_list.len(), loaded.len(), to_load.len(), to_load_unprioritized.len()).await;

// Explore directories
let start = Instant::now();
let mut i = 0;
if !to_list.is_empty() {debug!("{} elements to list", to_list.len())}
while let Some(cid) = to_list.pop() {
if !listed.insert(cid.clone()) {continue}
self.set_status(listed.len(), to_list.len()+1, loaded.len(), to_load.len(), to_load_unprioritized.len()).await;

let new_links = match ls(ipfs_rpc, cid.clone()).await {
Ok(new_links) => new_links,
Err(e) => {
Expand All @@ -94,25 +98,21 @@ impl DocumentIndex {
}
to_list.sort();
to_list.dedup();
i += 1;
if i % 500 == 0 {
debug!("Still listing pinned files ({i} in {:.02})", start.elapsed().as_secs_f32());
}
}

// Load documents
i = 0;
if !to_load.is_empty() {debug!("{} documents to load ({:.02?}s)", to_load.len(), start.elapsed().as_secs_f32())}
for (cid, (name, parent_cid)) in to_load.drain().chain(to_load_unprioritized.drain()) {
let (to_load_len, to_load_unprioritized_len) = (to_load.len(), to_load_unprioritized.len());
for (i, (cid, (name, parent_cid))) in to_load.drain().chain(to_load_unprioritized.drain()).enumerate() {
let remaining_to_load = to_load_len.saturating_sub(i);
let remaining_unprioritized = std::cmp::min(to_load_unprioritized_len, to_load_len + to_load_unprioritized_len - i);
self.set_status(listed.len(), to_list.len(), loaded.len(), remaining_to_load, remaining_unprioritized).await;

if !loaded.insert(cid.clone()) {continue}
let Ok(document) = fetch_document(ipfs_rpc, &cid).await else {continue};
let Some(inspected) = inspect_document(document) else {continue};
self.add_document(&cid, inspected).await;
self.add_ancestor(&cid, name, false, &parent_cid).await;
i += 1;
if i % 500 == 0 {
debug!("Still loading files ({i} in {:.02})", start.elapsed().as_secs_f32());
}
}

// Update filter
Expand All @@ -126,6 +126,24 @@ impl DocumentIndex {
}
}

async fn set_status(&self, listed: usize, to_list: usize, loaded: usize, to_load: usize, to_load_unprioritized: usize) {
let mut status = self.status.write().await;
status.listed = listed;
status.to_list = to_list;
status.loaded = loaded;
status.to_load = to_load;
status.to_load_unprioritized = to_load_unprioritized;
}

async fn set_status_updating_filter(&self, updating_filter: bool) {
let mut status = self.status.write().await;
status.updating_filter = updating_filter;
}

pub async fn status(&self) -> IndexingStatus {
self.status.read().await.clone()
}

pub async fn documents(&self) -> HashSet<String> {
self.inner.read().await.documents()
}
Expand All @@ -147,7 +165,9 @@ impl DocumentIndex {
}

pub async fn update_filter(&self) {
self.set_status_updating_filter(true).await;
self.inner.write().await.update_filter().await;
self.set_status_updating_filter(false).await;
}
}

Expand Down
1 change: 1 addition & 0 deletions daemon/src/index/inner_im.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl DocumentIndexInner {
}
}

#[allow(dead_code)]
pub(super) async fn sweep(&mut self) {}

pub fn documents(&self) -> HashSet<String> {
Expand Down
2 changes: 2 additions & 0 deletions daemon/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ const REFRESH_INTERVAL: u64 = 100;
const SWEEP_INTERVAL: u64 = 30;

mod index;
mod status;
mod inner_common;
pub use index::*;
pub use status::*;

#[cfg(any(feature = "database-lmdb", feature = "database-mdbx"))]
mod inner_db;
Expand Down
11 changes: 11 additions & 0 deletions daemon/src/index/status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use serde::Serialize;

#[derive(Default, Debug, Clone, Serialize)]
pub struct IndexingStatus {
pub listed: usize,
pub to_list: usize,
pub loaded: usize,
pub to_load: usize,
pub to_load_unprioritized: usize,
pub updating_filter: bool,
}

0 comments on commit 0dd3907

Please sign in to comment.