From 8224b70405d91063944c594675f00f06f4318fa4 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 5 Apr 2022 14:31:12 +0200 Subject: [PATCH] some progress --- src/crawl.rs | 72 ++++++++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/src/crawl.rs b/src/crawl.rs index f811776..a178c1a 100644 --- a/src/crawl.rs +++ b/src/crawl.rs @@ -3,53 +3,58 @@ use crate::node_info::NodeInfo; use crate::REQUEST_TIMEOUT; use anyhow::anyhow; use anyhow::Error; +use futures::executor::block_on_stream; use futures::future::join_all; use futures::stream::FuturesUnordered; -use futures::{stream, try_join, StreamExt, TryStreamExt}; +use futures::{future, stream, try_join, StreamExt, TryStreamExt}; use reqwest::Client; use serde::Serialize; +use std::cmp::max; use std::collections::VecDeque; use std::future::Future; +use std::sync::{Arc, Mutex}; pub async fn crawl( start_instances: Vec, exclude: Vec, max_depth: i32, ) -> Result<(Vec, i32), Error> { + let exclude = Arc::new(exclude); let mut pending_instances: VecDeque = start_instances .iter() - .map(|s| CrawlInstanceTask::new(s.to_string(), 0)) + .map(|s| CrawlInstanceTask::new(s.to_string(), 0, exclude.clone())) .collect(); - let mut crawled_instances = vec![]; - let mut instance_details = vec![]; + let mut crawled_instances = Mutex::new(vec![]); + //let mut instance_details = vec![]; //let mut failed_instances = 0; - let mut futures = stream::iter(pending_instances) - .then(|instance: CrawlInstanceTask| async { - crawled_instances.push(instance.domain.clone()); - crawl_instance(instance, exclude.clone(), max_depth).await? - }) - .flat_map(|(instance_details, depth)| { - let futures = instance_details - .linked_instances - .iter() - .map(|i| { - crawled_instances.push(i.clone()); + + let stream = Box::pin( + stream::iter(pending_instances) + .then(|task: CrawlInstanceTask| async { + crawled_instances.lock().unwrap().push(task.domain.clone()); + crawl_instance(task, max_depth).await.unwrap() + }) + .flat_map(|(instance_details, task)| { + let futures = instance_details.linked_instances.iter().map(|i| { + crawled_instances.lock().unwrap().push(i.clone()); crawl_instance( - CrawlInstanceTask::new(i.clone(), depth), - exclude.clone(), + CrawlInstanceTask::new(i.clone(), task.depth + 1, task.exclude.clone()), max_depth, ) - }) - .collect(); + }); - stream::iter(futures) - }) - .collect::>>>() + stream::iter(futures) + }), + ); + + let crawl_result: Vec> = stream + .buffer_unordered(10) + .map_ok(|(details, _)| details) + .collect() .await; todo!() /* - let mut crawl_result: Vec = todo!(); // Sort by active monthly users descending crawl_result.sort_by_key(|i| i.users_active_month); @@ -60,17 +65,13 @@ pub async fn crawl( } async fn crawl_instance( - current_instance: CrawlInstanceTask, - exclude: Vec, + task: CrawlInstanceTask, max_depth: i32, -) -> Result<(InstanceDetails, i32), Error> { - if current_instance.depth > max_depth || exclude.contains(¤t_instance.domain) { +) -> Result<(InstanceDetails, CrawlInstanceTask), Error> { + if task.depth > max_depth || task.exclude.contains(&task.domain) { return Err(anyhow!("max depth reached")); } - Ok(( - fetch_instance_details(¤t_instance.domain).await?, - current_instance.depth + 1, - )) + Ok((fetch_instance_details(&task.domain).await?, task)) } #[derive(Serialize, Clone)] @@ -95,11 +96,16 @@ pub struct InstanceDetails { struct CrawlInstanceTask { domain: String, depth: i32, + exclude: Arc>, } impl CrawlInstanceTask { - pub fn new(domain: String, depth: i32) -> CrawlInstanceTask { - CrawlInstanceTask { domain, depth } + pub fn new(domain: String, depth: i32, exclude: Arc>) -> CrawlInstanceTask { + CrawlInstanceTask { + domain, + depth, + exclude, + } } }