From c254e50211f3c5d0e2c09d787dc014f4dbeb35ae Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 10 May 2022 02:52:48 +0200 Subject: [PATCH] Recursive, parallel crawl --- Cargo.lock | 25 +++++++ Cargo.toml | 3 + src/crawl.rs | 180 ++++++++++++++++++++---------------------------- src/defaults.rs | 4 ++ src/lib.rs | 73 ++++++++++++++++++-- src/main.rs | 16 ++--- 6 files changed, 185 insertions(+), 116 deletions(-) create mode 100644 src/defaults.rs diff --git a/Cargo.lock b/Cargo.lock index 26285dc..82c71a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,6 +220,17 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2 1.0.37", + "quote 1.0.18", + "syn 1.0.92", +] + [[package]] name = "async-trait" version = "0.1.53" @@ -560,6 +571,17 @@ dependencies = [ "syn 1.0.92", ] +[[package]] +name = "derive-new" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3418329ca0ad70234b9735dc4ceed10af4df60eff9c8e7b06cb5e520d92c3535" +dependencies = [ + "proc-macro2 1.0.37", + "quote 1.0.18", + "syn 1.0.92", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -1295,9 +1317,12 @@ name = "lemmy-stats-crawler" version = "0.1.0" dependencies = [ "anyhow", + "async-recursion", "clap", + "derive-new", "futures", "lemmy_api_common", + "log", "once_cell", "reqwest", "semver", diff --git a/Cargo.toml b/Cargo.toml index dd23dee..1e20612 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,6 @@ clap = "3.1.15" semver = "1.0.9" once_cell = "1.10.0" lemmy_api_common = "0.16.0" +async-recursion = "1.0.0" +log = "0.4.17" +derive-new = "0.5.9" diff --git a/src/crawl.rs b/src/crawl.rs index e849cc2..f7f9394 100644 --- a/src/crawl.rs +++ b/src/crawl.rs @@ -1,66 +1,16 @@ +use crate::CLIENT; use crate::REQUEST_TIMEOUT; -use anyhow::anyhow; use anyhow::Error; +use async_recursion::async_recursion; +use futures::future::join_all; use lemmy_api_common::site::GetSiteResponse; -use once_cell::sync::Lazy; -use reqwest::Client; +use log::info; use semver::Version; use serde::Serialize; -use std::collections::VecDeque; - -static CLIENT: Lazy = Lazy::new(Client::default); - -pub async fn crawl( - start_instances: Vec, - exclude: Vec, - max_depth: i32, -) -> Result<(Vec, i32), Error> { - let mut pending_instances: VecDeque = start_instances - .iter() - .map(|s| CrawlInstance::new(s.to_string(), 0)) - .collect(); - let min_lemmy_version = min_lemmy_version().await?; - let mut crawled_instances = vec![]; - let mut instance_details = vec![]; - let mut failed_instances = 0; - while let Some(current_instance) = pending_instances.pop_back() { - crawled_instances.push(current_instance.domain.clone()); - if current_instance.depth > max_depth || exclude.contains(¤t_instance.domain) { - continue; - } - match fetch_instance_details(¤t_instance.domain, &min_lemmy_version).await { - Ok(details) => { - if let Some(federated) = &details.site_info.federated_instances.as_ref() { - for i in &federated.linked { - let is_in_crawled = crawled_instances.contains(i); - let is_in_pending = pending_instances.iter().any(|p| &p.domain == i); - if !is_in_crawled && !is_in_pending { - let ci = CrawlInstance::new(i.clone(), current_instance.depth + 1); - pending_instances.push_back(ci); - } - } - } - instance_details.push(details); - } - Err(e) => { - failed_instances += 1; - eprintln!("Failed to crawl {}: {}", current_instance.domain, e) - } - } - } - - // Sort by active monthly users descending - instance_details.sort_by_key(|i| { - i.site_info - .site_view - .as_ref() - .map(|s| s.counts.users_active_month) - .unwrap_or(0) - }); - instance_details.reverse(); - - Ok((instance_details, failed_instances)) -} +use std::collections::HashSet; +use std::ops::Deref; +use std::sync::Arc; +use tokio::sync::Mutex; #[derive(Serialize, Debug)] pub struct InstanceDetails { @@ -68,54 +18,76 @@ pub struct InstanceDetails { pub site_info: GetSiteResponse, } -struct CrawlInstance { +#[derive(new)] +pub struct CrawlParams { + min_lemmy_version: Version, + exclude_domains: Vec, + max_depth: i32, + crawled_instances: Arc>>, +} + +#[derive(new)] +pub struct CrawlJob { domain: String, - depth: i32, + current_depth: i32, + params: Arc, } -impl CrawlInstance { - pub fn new(domain: String, depth: i32) -> CrawlInstance { - CrawlInstance { domain, depth } +impl CrawlJob { + #[async_recursion] + pub async fn crawl(self) -> Result>, Error> { + // need to acquire and release mutix before recursing, otherwise it will deadlock + { + let mut crawled_instances = self.params.crawled_instances.deref().lock().await; + if crawled_instances.contains(&self.domain) { + return Ok(vec![]); + } else { + crawled_instances.insert(self.domain.clone()); + } + } + + if self.current_depth > self.params.max_depth + || self.params.exclude_domains.contains(&self.domain) + { + return Ok(vec![]); + } + info!("Starting crawl for {}", &self.domain); + + let site_info_url = format!("https://{}/api/v3/site", &self.domain); + let site_info = CLIENT + .get(&site_info_url) + .timeout(REQUEST_TIMEOUT) + .send() + .await? + .json::() + .await?; + + let version = Version::parse(&site_info.version)?; + if version < self.params.min_lemmy_version { + return Ok(vec![]); + } + + let mut result = vec![]; + if let Some(federated) = &site_info.federated_instances { + for domain in federated.linked.iter() { + let crawl_job = + CrawlJob::new(domain.clone(), self.current_depth + 1, self.params.clone()); + result.push(crawl_job.crawl()); + } + } + + let mut result2: Vec> = join_all(result) + .await + .into_iter() + .filter_map(|r| r.ok()) + .flat_map(|r| r.into_iter()) + .collect(); + info!("Successfully finished crawl for {}", &self.domain); + result2.push(Ok(InstanceDetails { + domain: self.domain, + site_info, + })); + + Ok(result2) } } - -async fn fetch_instance_details( - domain: &str, - min_lemmy_version: &Version, -) -> Result { - let client = Client::default(); - - let site_info_url = format!("https://{}/api/v3/site", domain); - let site_info = client - .get(&site_info_url) - .timeout(REQUEST_TIMEOUT) - .send() - .await? - .json::() - .await?; - - let version = Version::parse(&site_info.version)?; - if &version < min_lemmy_version { - return Err(anyhow!("lemmy version is too old ({})", version)); - } - - Ok(InstanceDetails { - domain: domain.to_owned(), - site_info, - }) -} - -/// calculate minimum allowed lemmy version based on current version. in case of current version -/// 0.16.3, the minimum from this function is 0.15.3. this is to avoid rejecting all instances on -/// the previous version when a major lemmy release is published. -async fn min_lemmy_version() -> Result { - let lemmy_version_url = "https://raw.githubusercontent.com/LemmyNet/lemmy-ansible/main/VERSION"; - let req = CLIENT - .get(lemmy_version_url) - .timeout(REQUEST_TIMEOUT) - .send() - .await?; - let mut version = Version::parse(req.text().await?.trim())?; - version.minor -= 1; - Ok(version) -} diff --git a/src/defaults.rs b/src/defaults.rs new file mode 100644 index 0000000..f9b09bb --- /dev/null +++ b/src/defaults.rs @@ -0,0 +1,4 @@ +pub const DEFAULT_START_INSTANCES: &str = "lemmy.ml"; +pub const DEFAULT_MAX_CRAWL_DEPTH: &str = "20"; +pub const EXCLUDE_INSTANCES: &str = + "ds9.lemmy.ml, enterprise.lemmy.ml, voyager.lemmy.ml, test.lemmy.ml"; diff --git a/src/lib.rs b/src/lib.rs index b36d9dc..5fb76a0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,74 @@ +#[macro_use] +extern crate derive_new; + +use crate::crawl::{CrawlJob, CrawlParams, InstanceDetails}; +use anyhow::Error; +use futures::future::join_all; +use once_cell::sync::Lazy; +use reqwest::Client; +use semver::Version; +use std::collections::HashSet; +use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; pub mod crawl; +pub mod defaults; pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); -pub const DEFAULT_START_INSTANCES: &str = "lemmy.ml"; -pub const DEFAULT_MAX_CRAWL_DEPTH: &str = "20"; -pub const EXCLUDE_INSTANCES: &str = - "ds9.lemmy.ml, enterprise.lemmy.ml, voyager.lemmy.ml, test.lemmy.ml"; + +static CLIENT: Lazy = Lazy::new(Client::default); + +pub async fn start_crawl( + start_instances: Vec, + exclude_domains: Vec, + max_depth: i32, +) -> Result, Error> { + let params = Arc::new(CrawlParams::new( + min_lemmy_version().await?, + exclude_domains, + max_depth, + Arc::new(Mutex::new(HashSet::new())), + )); + let mut jobs = vec![]; + for domain in start_instances.into_iter() { + let job = CrawlJob::new(domain, 0, params.clone()); + jobs.push(job.crawl()); + } + + // TODO: optionally log the errors + let mut instance_details: Vec = join_all(jobs) + .await + .into_iter() + .filter_map(|r| r.ok()) + .flat_map(|r| r.into_iter()) + .filter_map(|r| r.ok()) + .collect(); + + // Sort by active monthly users descending + instance_details.sort_unstable_by_key(|i| { + i.site_info + .site_view + .as_ref() + .map(|s| s.counts.users_active_month) + .unwrap_or(0) + }); + instance_details.reverse(); + + Ok(instance_details) +} + +/// calculate minimum allowed lemmy version based on current version. in case of current version +/// 0.16.3, the minimum from this function is 0.15.3. this is to avoid rejecting all instances on +/// the previous version when a major lemmy release is published. +async fn min_lemmy_version() -> Result { + let lemmy_version_url = "https://raw.githubusercontent.com/LemmyNet/lemmy-ansible/main/VERSION"; + let req = CLIENT + .get(lemmy_version_url) + .timeout(REQUEST_TIMEOUT) + .send() + .await?; + let mut version = Version::parse(req.text().await?.trim())?; + version.minor -= 1; + Ok(version) +} diff --git a/src/main.rs b/src/main.rs index 0d62f1c..20bc41c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,10 @@ use anyhow::Error; use clap::{Arg, Command}; -use lemmy_stats_crawler::crawl::{crawl, InstanceDetails}; -use lemmy_stats_crawler::{DEFAULT_MAX_CRAWL_DEPTH, DEFAULT_START_INSTANCES, EXCLUDE_INSTANCES}; +use lemmy_stats_crawler::crawl::InstanceDetails; +use lemmy_stats_crawler::defaults::{ + DEFAULT_MAX_CRAWL_DEPTH, DEFAULT_START_INSTANCES, EXCLUDE_INSTANCES, +}; +use lemmy_stats_crawler::start_crawl; use serde::Serialize; #[tokio::main] @@ -37,9 +40,8 @@ pub async fn main() -> Result<(), Error> { .parse()?; eprintln!("Crawling..."); - let (instance_details, failed_instances) = - crawl(start_instances, exclude, max_crawl_depth).await?; - let total_stats = aggregate(instance_details, failed_instances); + let instance_details = start_crawl(start_instances, exclude, max_crawl_depth).await?; + let total_stats = aggregate(instance_details); println!("{}", serde_json::to_string_pretty(&total_stats)?); Ok(()) @@ -48,7 +50,6 @@ pub async fn main() -> Result<(), Error> { #[derive(Serialize)] struct TotalStats { crawled_instances: i32, - failed_instances: i32, online_users: usize, total_users: i64, users_active_day: i64, @@ -58,7 +59,7 @@ struct TotalStats { instance_details: Vec, } -fn aggregate(instance_details: Vec, failed_instances: i32) -> TotalStats { +fn aggregate(instance_details: Vec) -> TotalStats { let mut online_users = 0; let mut total_users = 0; let mut users_active_day = 0; @@ -79,7 +80,6 @@ fn aggregate(instance_details: Vec, failed_instances: i32) -> T } TotalStats { crawled_instances, - failed_instances, online_users, total_users, users_active_day,