From b43c6c15705356e8634ca612c76dd1c35db157b7 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Fri, 1 Apr 2022 00:31:52 +0200 Subject: [PATCH] wip: try to implement parallel crawl --- src/crawl.rs | 96 ++++++++++++++++++++++++-------------- src/federated_instances.rs | 2 +- 2 files changed, 62 insertions(+), 36 deletions(-) diff --git a/src/crawl.rs b/src/crawl.rs index f522c71..bbfb273 100644 --- a/src/crawl.rs +++ b/src/crawl.rs @@ -3,52 +3,74 @@ use crate::node_info::NodeInfo; use crate::REQUEST_TIMEOUT; use anyhow::anyhow; use anyhow::Error; -use futures::try_join; +use futures::future::join_all; +use futures::stream::FuturesUnordered; +use futures::{stream, try_join, StreamExt, TryStreamExt}; use reqwest::Client; use serde::Serialize; use std::collections::VecDeque; +use std::future::Future; pub async fn crawl( start_instances: Vec, exclude: Vec, max_depth: i32, ) -> Result<(Vec, i32), Error> { - let mut pending_instances: VecDeque = start_instances + let mut pending_instances: VecDeque = start_instances .iter() - .map(|s| CrawlInstance::new(s.to_string(), 0)) + .map(|s| CrawlInstanceTask::new(s.to_string(), 0)) .collect(); let mut crawled_instances = vec![]; let mut instance_details = vec![]; - let mut failed_instances = 0; - while let Some(current_instance) = pending_instances.pop_back() { - crawled_instances.push(current_instance.domain.clone()); - if current_instance.depth > max_depth || exclude.contains(¤t_instance.domain) { - continue; - } - match fetch_instance_details(¤t_instance.domain).await { - Ok(details) => { - instance_details.push(details.to_owned()); - for i in details.linked_instances { - let is_in_crawled = crawled_instances.contains(&i); - let is_in_pending = pending_instances.iter().any(|p| p.domain == i); - if !is_in_crawled && !is_in_pending { - let ci = CrawlInstance::new(i, current_instance.depth + 1); - pending_instances.push_back(ci); - } - } - } - Err(e) => { - failed_instances += 1; - eprintln!("Failed to crawl {}: {}", current_instance.domain, e) - } - } - } + //let mut failed_instances = 0; + let mut futures = stream::iter(pending_instances) + .then(|instance: CrawlInstanceTask| async { + crawled_instances.push(instance.domain.clone()); + crawl_instance(instance, exclude.clone(), max_depth).await? + }) + .flat_map(|(instance_details, depth)| { + let futures = instance_details + .linked_instances + .iter() + .map(|i| { + crawled_instances.push(i.clone()); + crawl_instance( + CrawlInstanceTask::new(i.clone(), depth), + exclude.clone(), + max_depth, + ) + }) + .collect(); + + stream::iter(futures) + }) + .collect::>>>() + .await; + + todo!() + /* + let mut crawl_result: Vec = todo!(); // Sort by active monthly users descending - instance_details.sort_by_key(|i| i.users_active_month); - instance_details.reverse(); + crawl_result.sort_by_key(|i| i.users_active_month); + crawl_result.reverse(); - Ok((instance_details, failed_instances)) + Ok((crawl_result, failed_instances)) + */ +} + +async fn crawl_instance( + current_instance: CrawlInstanceTask, + exclude: Vec, + max_depth: i32, +) -> Result<(InstanceDetails, i32), Error> { + if current_instance.depth > max_depth || exclude.contains(¤t_instance.domain) { + return Err(anyhow!("max depth reached")); + } + Ok(( + fetch_instance_details(¤t_instance.domain).await?, + current_instance.depth + 1, + )) } #[derive(Serialize, Clone)] @@ -70,14 +92,14 @@ pub struct InstanceDetails { pub linked_instances: Vec, } -struct CrawlInstance { +struct CrawlInstanceTask { domain: String, depth: i32, } -impl CrawlInstance { - pub fn new(domain: String, depth: i32) -> CrawlInstance { - CrawlInstance { domain, depth } +impl CrawlInstanceTask { + pub fn new(domain: String, depth: i32) -> CrawlInstanceTask { + CrawlInstanceTask { domain, depth } } } @@ -136,7 +158,11 @@ async fn fetch_instance_details(domain: &str) -> Result users_active_month: node_info.usage.users.active_month, open_registrations: node_info.open_registrations, linked_instances_count: linked_instances.len() as i32, - require_application: site_info.site_view.site.require_application.unwrap_or(false), + require_application: site_info + .site_view + .site + .require_application + .unwrap_or(false), linked_instances, }) } diff --git a/src/federated_instances.rs b/src/federated_instances.rs index f1102a4..7ed1c12 100644 --- a/src/federated_instances.rs +++ b/src/federated_instances.rs @@ -24,5 +24,5 @@ pub struct Site { pub name: String, pub icon: Option, pub description: Option, - pub require_application: Option + pub require_application: Option, }