From cb5f20f397e01966005cce95ddc40f78b461f0a2 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 10 May 2022 02:52:48 +0200 Subject: [PATCH] Recursive, parallel crawl --- Cargo.lock | 13 ++++ Cargo.toml | 2 + src/crawl.rs | 170 ++++++++++++++++++++++++++++++--------------------- 3 files changed, 117 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26285dc..1fa82b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,6 +220,17 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2 1.0.37", + "quote 1.0.18", + "syn 1.0.92", +] + [[package]] name = "async-trait" version = "0.1.53" @@ -1295,9 +1306,11 @@ name = "lemmy-stats-crawler" version = "0.1.0" dependencies = [ "anyhow", + "async-recursion", "clap", "futures", "lemmy_api_common", + "log", "once_cell", "reqwest", "semver", diff --git a/Cargo.toml b/Cargo.toml index dd23dee..4e9ecb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,5 @@ clap = "3.1.15" semver = "1.0.9" once_cell = "1.10.0" lemmy_api_common = "0.16.0" +async-recursion = "1.0.0" +log = "0.4.17" diff --git a/src/crawl.rs b/src/crawl.rs index e849cc2..f136356 100644 --- a/src/crawl.rs +++ b/src/crawl.rs @@ -1,56 +1,46 @@ use crate::REQUEST_TIMEOUT; -use anyhow::anyhow; use anyhow::Error; +use async_recursion::async_recursion; +use futures::future::try_join_all; use lemmy_api_common::site::GetSiteResponse; +use log::info; use once_cell::sync::Lazy; use reqwest::Client; use semver::Version; use serde::Serialize; -use std::collections::VecDeque; +use std::ops::Deref; +use std::sync::Arc; +use tokio::sync::Mutex; static CLIENT: Lazy = Lazy::new(Client::default); pub async fn crawl( start_instances: Vec, - exclude: Vec, + exclude_domains: Vec, max_depth: i32, ) -> Result<(Vec, i32), Error> { - let mut pending_instances: VecDeque = start_instances - .iter() - .map(|s| CrawlInstance::new(s.to_string(), 0)) - .collect(); - let min_lemmy_version = min_lemmy_version().await?; - let mut crawled_instances = vec![]; - let mut instance_details = vec![]; - let mut failed_instances = 0; - while let Some(current_instance) = pending_instances.pop_back() { - crawled_instances.push(current_instance.domain.clone()); - if current_instance.depth > max_depth || exclude.contains(¤t_instance.domain) { - continue; - } - match fetch_instance_details(¤t_instance.domain, &min_lemmy_version).await { - Ok(details) => { - if let Some(federated) = &details.site_info.federated_instances.as_ref() { - for i in &federated.linked { - let is_in_crawled = crawled_instances.contains(i); - let is_in_pending = pending_instances.iter().any(|p| &p.domain == i); - if !is_in_crawled && !is_in_pending { - let ci = CrawlInstance::new(i.clone(), current_instance.depth + 1); - pending_instances.push_back(ci); - } - } - } - instance_details.push(details); - } - Err(e) => { - failed_instances += 1; - eprintln!("Failed to crawl {}: {}", current_instance.domain, e) - } - } + let params = Arc::new(CrawlParams { + min_lemmy_version: min_lemmy_version().await?, + exclude_domains, + max_depth, + }); + let crawled_instances = Arc::new(Mutex::new(vec![])); + let mut jobs = vec![]; + for domain in start_instances.into_iter() { + let job = CrawlJob { + domain, + current_depth: 0, + params: params.clone(), + crawled_instances: crawled_instances.clone(), + }; + jobs.push(job.crawl()); } + let mut instance_details: Vec = + try_join_all(jobs).await?.into_iter().flatten().collect(); + // Sort by active monthly users descending - instance_details.sort_by_key(|i| { + instance_details.sort_unstable_by_key(|i| { i.site_info .site_view .as_ref() @@ -59,7 +49,7 @@ pub async fn crawl( }); instance_details.reverse(); - Ok((instance_details, failed_instances)) + Ok((instance_details, 0)) } #[derive(Serialize, Debug)] @@ -68,43 +58,87 @@ pub struct InstanceDetails { pub site_info: GetSiteResponse, } -struct CrawlInstance { +struct CrawlParams { + min_lemmy_version: Version, + exclude_domains: Vec, + max_depth: i32, +} + +struct CrawlJob { domain: String, - depth: i32, + current_depth: i32, + params: Arc, + crawled_instances: Arc>>, } -impl CrawlInstance { - pub fn new(domain: String, depth: i32) -> CrawlInstance { - CrawlInstance { domain, depth } +impl CrawlJob { + #[async_recursion] + pub async fn crawl(self) -> Result, Error> { + // need to acquire and release mutix before recursing, otherwise it will deadlock + { + let mut crawled_instances = self.crawled_instances.deref().lock().await; + if crawled_instances.contains(&self.domain) { + return Ok(vec![]); + } else { + crawled_instances.push(self.domain.clone()); + } + } + + if self.current_depth > self.params.max_depth + || self.params.exclude_domains.contains(&self.domain) + { + return Ok(vec![]); + } + info!("Starting crawl for {}", &self.domain); + + let site_info_url = format!("https://{}/api/v3/site", &self.domain); + let site_info = CLIENT + .get(&site_info_url) + .timeout(REQUEST_TIMEOUT) + .send() + .await + .ok(); + + if let Some(site_info2) = site_info { + let site_info3 = site_info2.json::().await.ok(); + if let Some(site_info4) = site_info3 { + let version = Version::parse(&site_info4.version).ok(); + + if let Some(version) = version { + if version < self.params.min_lemmy_version { + return Ok(vec![]); + } + } + + let mut result = vec![]; + if let Some(federated) = &site_info4.federated_instances { + for domain in federated.linked.iter() { + let crawl_job = CrawlJob { + domain: domain.clone(), + current_depth: self.current_depth + 1, + params: self.params.clone(), + crawled_instances: self.crawled_instances.clone(), + }; + result.push(crawl_job.crawl()); + } + } + + let mut result2: Vec = + try_join_all(result).await?.into_iter().flatten().collect(); + info!("Successfully finished crawl for {}", &self.domain); + result2.push(InstanceDetails { + domain: self.domain, + site_info: site_info4, + }); + + return Ok(result2); + } + } + + Ok(vec![]) } } -async fn fetch_instance_details( - domain: &str, - min_lemmy_version: &Version, -) -> Result { - let client = Client::default(); - - let site_info_url = format!("https://{}/api/v3/site", domain); - let site_info = client - .get(&site_info_url) - .timeout(REQUEST_TIMEOUT) - .send() - .await? - .json::() - .await?; - - let version = Version::parse(&site_info.version)?; - if &version < min_lemmy_version { - return Err(anyhow!("lemmy version is too old ({})", version)); - } - - Ok(InstanceDetails { - domain: domain.to_owned(), - site_info, - }) -} - /// calculate minimum allowed lemmy version based on current version. in case of current version /// 0.16.3, the minimum from this function is 0.15.3. this is to avoid rejecting all instances on /// the previous version when a major lemmy release is published.