got it working

This commit is contained in:
Felix Ableitner 2020-08-21 17:44:28 +02:00
parent 1b8eb9b970
commit 74af8b8014

View file

@ -6,20 +6,22 @@ use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use actix::prelude::*;
use anyhow::Context;
use anyhow::{anyhow, Context, Error};
use awc::Client;
use background_jobs::{
create_server,
memory_storage::Storage,
ActixJob,
Backoff,
MaxRetries,
QueueHandle,
WorkerConfig,
};
use lemmy_utils::{location_info, settings::Settings};
use log::{debug, warn};
use serde::Serialize;
use log::warn;
use serde::{Deserialize, Serialize};
use std::{future::Future, pin::Pin};
use url::Url;
use background_jobs::{Backoff, MaxRetries, WorkerConfig, QueueHandle, Job, create_server};
use background_jobs::memory_storage::Storage;
use serde::Deserialize;
use anyhow::Error;
use futures::future::{Ready, ok};
use std::pin::Pin;
use std::future::Future;
pub fn send_activity<T, Kind>(
activity_sender: &QueueHandle,
@ -64,7 +66,7 @@ struct SendActivityTask {
private_key: String,
}
impl Job for SendActivityTask {
impl ActixJob for SendActivityTask {
type State = ();
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "SendActivityTask";
@ -75,7 +77,6 @@ impl Job for SendActivityTask {
fn run(self, _: Self::State) -> Self::Future {
Box::pin(async move {
for to_url in &self.to {
// TODO: should pass this in somehow instead of creating a new client every time
// i suppose this can be done through a state
let client = Client::default();
@ -91,8 +92,23 @@ impl Job for SendActivityTask {
&self.actor_id,
self.private_key.to_owned(),
)
.await?;
signed.send().await?;
.await;
let signed = match signed {
Ok(s) => s,
Err(e) => {
warn!("{}", e);
// dont return an error because retrying would probably not fix the signing
return Ok(());
}
};
if let Err(e) = signed.send().await {
warn!("{}", e);
return Err(anyhow!(
"Failed to send activity {} to {}",
&self.activity,
to_url
));
}
}
Ok(())
@ -101,12 +117,11 @@ impl Job for SendActivityTask {
}
pub fn create_activity_queue() -> QueueHandle {
// Start the application server. This guards access to to the jobs store
let queue_handle = create_server(Storage::new());
// Configure and start our workers
WorkerConfig::new(||{})
WorkerConfig::new(|| {})
.register::<SendActivityTask>()
.start(queue_handle.clone());