From 0aa0ea19fb0bf626a5d19a76a6fd5aadc2708471 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 29 Sep 2020 15:10:55 +0200 Subject: [PATCH] Use reqwest to send activities --- Cargo.lock | 1 + lemmy_apub/Cargo.toml | 1 + lemmy_apub/src/activity_queue.rs | 20 ++++---- lemmy_apub/src/extensions/signatures.rs | 68 ++++++++++++++++--------- 4 files changed, 57 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65f718b3..7c1e173c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1865,6 +1865,7 @@ dependencies = [ "diesel", "futures", "http", + "http-signature-normalization", "http-signature-normalization-actix", "itertools", "lazy_static", diff --git a/lemmy_apub/Cargo.toml b/lemmy_apub/Cargo.toml index 66dbbe4f..59facad4 100644 --- a/lemmy_apub/Cargo.toml +++ b/lemmy_apub/Cargo.toml @@ -33,6 +33,7 @@ url = { version = "2.1", features = ["serde"] } percent-encoding = "2.1" openssl = "0.10" http = "0.2" +http-signature-normalization = "0.5" http-signature-normalization-actix = { version = "0.4", default-features = false, features = ["sha-2"] } base64 = "0.12" tokio = "0.2" diff --git a/lemmy_apub/src/activity_queue.rs b/lemmy_apub/src/activity_queue.rs index 80c92c2f..cb3dbe88 100644 --- a/lemmy_apub/src/activity_queue.rs +++ b/lemmy_apub/src/activity_queue.rs @@ -4,7 +4,6 @@ use activitystreams::{ object::AsObject, }; use anyhow::{anyhow, Context, Error}; -use awc::Client; use background_jobs::{ create_server, memory_storage::Storage, @@ -16,8 +15,9 @@ use background_jobs::{ }; use lemmy_utils::{location_info, settings::Settings, LemmyError}; use log::warn; +use reqwest::Client; use serde::{Deserialize, Serialize}; -use std::{future::Future, pin::Pin}; +use std::{collections::BTreeMap, future::Future, pin::Pin}; use url::Url; pub fn send_activity( @@ -50,6 +50,7 @@ where actor_id: actor.actor_id()?, private_key: actor.private_key().context(location_info!())?, }; + activity_sender.queue::(message)?; Ok(()) @@ -74,18 +75,18 @@ impl ActixJob for SendActivityTask { fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { for to_url in &self.to { - let request = state - .client - .post(to_url.as_str()) - .header("Content-Type", "application/json"); - + let mut headers = BTreeMap::::new(); + headers.insert("Content-Type".into(), "application/json".into()); let signed = sign( - request, + &state.client, + headers, + to_url, self.activity.clone(), &self.actor_id, self.private_key.to_owned(), ) .await; + let signed = match signed { Ok(s) => s, Err(e) => { @@ -94,7 +95,7 @@ impl ActixJob for SendActivityTask { return Ok(()); } }; - if let Err(e) = signed.send().await { + if let Err(e) = state.client.execute(signed).await { warn!("{}", e); return Err(anyhow!( "Failed to send activity {} to {}", @@ -103,7 +104,6 @@ impl ActixJob for SendActivityTask { )); } } - Ok(()) }) } diff --git a/lemmy_apub/src/extensions/signatures.rs b/lemmy_apub/src/extensions/signatures.rs index 5471e19e..3ab42c8d 100644 --- a/lemmy_apub/src/extensions/signatures.rs +++ b/lemmy_apub/src/extensions/signatures.rs @@ -1,12 +1,11 @@ use crate::ActorType; use activitystreams::unparsed::UnparsedMutExt; use activitystreams_ext::UnparsedExtension; -use actix_web::{client::ClientRequest, HttpRequest}; +use actix_web::HttpRequest; use anyhow::{anyhow, Context}; -use http_signature_normalization_actix::{ - digest::{DigestClient, SignExt}, - Config, -}; +use http::{header::HeaderName, HeaderMap, HeaderValue}; +use http_signature_normalization::Config; +use http_signature_normalization_actix::{digest::DigestCreate, Config as ConfigActix}; use lemmy_utils::{location_info, LemmyError}; use log::debug; use openssl::{ @@ -14,45 +13,68 @@ use openssl::{ pkey::PKey, sign::{Signer, Verifier}, }; +use reqwest::{Client, Request}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; +use std::{collections::BTreeMap, str::FromStr}; use url::Url; lazy_static! { + static ref CONFIG2: ConfigActix = ConfigActix::new(); static ref HTTP_SIG_CONFIG: Config = Config::new(); } /// Signs request headers with the given keypair. pub async fn sign( - request: ClientRequest, + client: &Client, + headers: BTreeMap, + url: &Url, activity: String, actor_id: &Url, private_key: String, -) -> Result, LemmyError> { +) -> Result { let signing_key_id = format!("{}#main-key", actor_id); - let digest_client = request - .signature_with_digest( - HTTP_SIG_CONFIG.clone(), - signing_key_id, - Sha256::new(), - activity, - move |signing_string| { - let private_key = PKey::private_key_from_pem(private_key.as_bytes())?; - let mut signer = Signer::new(MessageDigest::sha256(), &private_key)?; - signer.update(signing_string.as_bytes())?; + let mut path_and_query = url.path().to_string(); + if let Some(query) = url.query() { + path_and_query = format!("{}?{}", path_and_query, query); + } + let signature_header_value = HTTP_SIG_CONFIG + .begin_sign("POST", &path_and_query, headers.clone())? + .sign(signing_key_id, |signing_string| { + let private_key = PKey::private_key_from_pem(private_key.as_bytes())?; + let mut signer = Signer::new(MessageDigest::sha256(), &private_key)?; + signer.update(signing_string.as_bytes())?; - Ok(base64::encode(signer.sign_to_vec()?)) as Result<_, LemmyError> - }, - ) - .await?; + Ok(base64::encode(signer.sign_to_vec()?)) as Result<_, LemmyError> + })? + .signature_header(); + let digest = format!( + "{}={}", + Sha256::NAME, + Sha256::new().compute(activity.as_bytes()) + ); - Ok(digest_client) + let mut header_map = HeaderMap::new(); + for h in headers { + header_map.insert( + HeaderName::from_str(h.0.as_str())?, + HeaderValue::from_str(h.1.as_str())?, + ); + } + let signed_request = client + .post(&url.to_string()) + .headers(header_map) + .header("Signature", signature_header_value) + .header("Digest", digest) + .body(activity); + + Ok(signed_request.build()?) } pub fn verify(request: &HttpRequest, actor: &dyn ActorType) -> Result<(), LemmyError> { let public_key = actor.public_key().context(location_info!())?; - let verified = HTTP_SIG_CONFIG + let verified = CONFIG2 .begin_verify( request.method(), request.uri().path_and_query(),