diff --git a/Cargo.lock b/Cargo.lock index 325b632..2bb7dac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1380,6 +1380,7 @@ dependencies = [ "derive-new", "futures", "lemmy_api_common", + "lemmy_db_schema", "log", "once_cell", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 4c446d7..ee3c163 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ serde_json = "1.0.89" semver = "1.0.14" once_cell = "1.16.0" lemmy_api_common = "=0.16.0" +lemmy_db_schema = "=0.16.0" async-recursion = "1.0.0" log = "0.4.17" derive-new = "0.5.9" diff --git a/src/crawl.rs b/src/crawl.rs index 8137594..4e14a4d 100644 --- a/src/crawl.rs +++ b/src/crawl.rs @@ -1,11 +1,12 @@ +use crate::node_info::{NodeInfo, NodeInfoWellKnown}; use crate::CLIENT; -use anyhow::Error; +use anyhow::{anyhow, Error}; use async_recursion::async_recursion; use futures::future::join_all; use lemmy_api_common::site::GetSiteResponse; use log::debug; +use reqwest::Url; use semver::Version; -use serde::Serialize; use std::collections::HashSet; use std::ops::Deref; use std::sync::Arc; @@ -26,15 +27,16 @@ pub struct CrawlParams { crawled_instances: Arc>>, } -#[derive(Serialize, Debug)] -pub struct InstanceDetails { +#[derive(Debug)] +pub struct CrawlResult { pub domain: String, - pub site_info: GetSiteResponse, + pub node_info: NodeInfo, + pub site_info: Option, } impl CrawlJob { #[async_recursion] - pub async fn crawl(self) -> Vec> { + pub async fn crawl(self) -> Vec> { // need to acquire and release mutex before recursing, otherwise it will deadlock { let mut crawled_instances = self.params.crawled_instances.deref().lock().await; @@ -55,47 +57,79 @@ impl CrawlJob { "Starting crawl for {}, distance {}", &self.domain, &self.current_distance ); - let site_info = match self.fetch_instance_details().await { + let (node_info, site_info) = match self.fetch_instance_details().await { Ok(o) => o, Err(e) => return vec![Err(e)], }; + let mut crawl_result = CrawlResult { + domain: self.domain.clone(), + node_info, + site_info: None, + }; - if site_info.1 < self.params.min_lemmy_version { - return vec![]; - } - - let mut result = vec![]; - if let Some(federated) = &site_info.0.federated_instances { - for domain in federated.linked.iter() { - let crawl_job = CrawlJob::new( - domain.clone(), - self.current_distance + 1, - self.params.clone(), - ); - result.push(crawl_job.crawl()); + if let Some(site_info) = site_info { + match Version::parse(&site_info.version) { + Ok(version) => { + if version < self.params.min_lemmy_version { + return vec![Ok(crawl_result)]; + } + } + Err(e) => return vec![Err(e.into())], } + + let mut result = vec![]; + if let Some(federated) = &site_info.federated_instances { + for domain in federated.linked.iter() { + let crawl_job = CrawlJob::new( + domain.clone(), + self.current_distance + 1, + self.params.clone(), + ); + result.push(crawl_job.crawl()); + } + } + + let mut result2: Vec> = + join_all(result).await.into_iter().flatten().collect(); + debug!("Successfully finished crawl for {}", &self.domain); + crawl_result.site_info = Some(site_info); + result2.push(Ok(crawl_result)); + + result2 + } else { + vec![Ok(crawl_result)] } - - let mut result2: Vec> = - join_all(result).await.into_iter().flatten().collect(); - debug!("Successfully finished crawl for {}", &self.domain); - result2.push(Ok(InstanceDetails { - domain: self.domain, - site_info: site_info.0, - })); - - result2 } - async fn fetch_instance_details(&self) -> Result<(GetSiteResponse, Version), Error> { - let site_info_url = format!("https://{}/api/v3/site", &self.domain); + async fn fetch_instance_details(&self) -> Result<(NodeInfo, Option), Error> { + let rel_node_info: Url = Url::parse("http://nodeinfo.diaspora.software/ns/schema/2.0") + .expect("parse nodeinfo relation url"); + let node_info_well_known = CLIENT + .get(&format!("https://{}/.well-known/nodeinfo", &self.domain)) + .send() + .await? + .json::() + .await?; + let node_info_url = node_info_well_known + .links + .into_iter() + .find(|l| l.rel == rel_node_info) + .ok_or_else(|| anyhow!("failed to find nodeinfo link for {}", &self.domain))? + .href; + let node_info = CLIENT + .get(node_info_url) + .send() + .await? + .json::() + .await?; + let site_info = CLIENT - .get(&site_info_url) + .get(&format!("https://{}/api/v3/site", &self.domain)) .send() .await? .json::() - .await?; - let version = Version::parse(&site_info.version)?; - Ok((site_info, version)) + .await + .ok(); + Ok((node_info, site_info)) } } diff --git a/src/lib.rs b/src/lib.rs index 245a8b3..08210e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,23 @@ #[macro_use] extern crate derive_new; -use crate::crawl::{CrawlJob, CrawlParams, InstanceDetails}; +use crate::crawl::{CrawlJob, CrawlParams, CrawlResult}; +use crate::node_info::{NodeInfo, NodeInfoUsage, NodeInfoUsers}; use anyhow::Error; use futures::future::join_all; +use lemmy_api_common::site::GetSiteResponse; use log::warn; use once_cell::sync::Lazy; use reqwest::{Client, ClientBuilder}; use semver::Version; +use serde::Serialize; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; pub mod crawl; +mod node_info; const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); @@ -22,14 +26,21 @@ static CLIENT: Lazy = Lazy::new(|| { .timeout(REQUEST_TIMEOUT) .user_agent("lemmy-stats-crawler") .build() - .unwrap() + .expect("build reqwest client") }); +#[derive(Serialize, Debug)] +pub struct CrawlResult2 { + pub domain: String, + pub site_info: GetSiteResponse, + pub federated_counts: Option, +} + pub async fn start_crawl( start_instances: Vec, exclude_domains: Vec, max_distance: i32, -) -> Result, Error> { +) -> Result, Error> { let params = Arc::new(CrawlParams::new( min_lemmy_version().await?, exclude_domains, @@ -42,8 +53,7 @@ pub async fn start_crawl( jobs.push(job.crawl()); } - // TODO: log the errors - let mut instance_details: Vec = join_all(jobs) + let crawl_results: Vec = join_all(jobs) .await .into_iter() .flatten() @@ -52,20 +62,20 @@ pub async fn start_crawl( warn!("{}", e) } }) - .filter_map(|r| r.ok()) + .filter_map(Result::ok) .collect(); + let mut crawl_results = calculate_federated_site_aggregates(crawl_results)?; // Sort by active monthly users descending - instance_details.sort_unstable_by_key(|i| { + crawl_results.sort_unstable_by_key(|i| { i.site_info .site_view .as_ref() .map(|s| s.counts.users_active_month) .unwrap_or(0) }); - instance_details.reverse(); - - Ok(instance_details) + crawl_results.reverse(); + Ok(crawl_results) } /// calculate minimum allowed lemmy version based on current version. in case of current version @@ -82,3 +92,47 @@ async fn min_lemmy_version() -> Result { version.minor -= 1; Ok(version) } + +fn calculate_federated_site_aggregates( + crawl_results: Vec, +) -> Result, Error> { + let node_info: Vec<(String, NodeInfo)> = crawl_results + .iter() + .map(|c| (c.domain.clone(), c.node_info.clone())) + .collect(); + let lemmy_instances: Vec<(String, GetSiteResponse)> = crawl_results + .into_iter() + .filter_map(|c| { + let domain = c.domain; + c.site_info.map(|c2| (domain, c2)) + }) + .collect(); + let mut ret = vec![]; + for instance in &lemmy_instances { + let federated_counts = if let Some(federated_instances) = &instance.1.federated_instances { + node_info + .iter() + .filter(|i| federated_instances.linked.contains(&i.0) || i.0 == instance.0) + .map(|i| i.1.usage.clone()) + .reduce(|a, b| NodeInfoUsage { + users: NodeInfoUsers { + total: a.users.total + b.users.total, + active_halfyear: a.users.active_halfyear + b.users.active_halfyear, + active_month: a.users.active_month + b.users.active_month, + }, + posts: a.posts + b.posts, + comments: a.comments + b.comments, + }) + } else { + None + }; + // TODO: workaround because GetSiteResponse doesnt implement clone + let site_info = serde_json::from_str(&serde_json::to_string(&instance.1)?)?; + ret.push(CrawlResult2 { + domain: instance.0.clone(), + site_info, + federated_counts, + }); + } + Ok(ret) +} diff --git a/src/main.rs b/src/main.rs index a8447c8..ec0792d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ use anyhow::Error; -use lemmy_stats_crawler::crawl::InstanceDetails; -use lemmy_stats_crawler::start_crawl; +use lemmy_stats_crawler::{start_crawl, CrawlResult2}; use serde::Serialize; use structopt::StructOpt; @@ -57,10 +56,10 @@ struct TotalStats { users_active_week: i64, users_active_month: i64, users_active_halfyear: i64, - instance_details: Vec, + instance_details: Vec, } -fn aggregate(instance_details: Vec) -> TotalStats { +fn aggregate(instance_details: Vec) -> TotalStats { let mut online_users = 0; let mut total_users = 0; let mut users_active_day = 0; diff --git a/src/node_info.rs b/src/node_info.rs new file mode 100644 index 0000000..21a8f4f --- /dev/null +++ b/src/node_info.rs @@ -0,0 +1,47 @@ +use reqwest::Url; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Debug)] +pub struct NodeInfoWellKnown { + pub links: Vec, +} + +#[derive(Deserialize, Debug)] +pub struct NodeInfoWellKnownLinks { + pub rel: Url, + pub href: Url, +} + +#[derive(Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct NodeInfo { + pub version: String, + pub software: NodeInfoSoftware, + pub protocols: Vec, + pub usage: NodeInfoUsage, + pub open_registrations: bool, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct NodeInfoSoftware { + pub name: String, + pub version: String, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[serde(rename_all = "camelCase", default)] +pub struct NodeInfoUsage { + pub users: NodeInfoUsers, + #[serde(rename(deserialize = "localPosts"))] + pub posts: i64, + #[serde(rename(deserialize = "localComments"))] + pub comments: i64, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[serde(rename_all = "camelCase", default)] +pub struct NodeInfoUsers { + pub total: i64, + pub active_halfyear: i64, + pub active_month: i64, +}