2022-03-29 17:51:16 +00:00
|
|
|
use crate::{
|
2023-07-22 16:15:30 +00:00
|
|
|
concurrent_processor::ProcessMap,
|
2023-07-22 17:31:01 +00:00
|
|
|
config::Configuration,
|
2022-03-29 17:51:16 +00:00
|
|
|
error::Error,
|
2023-07-13 18:48:59 +00:00
|
|
|
formats::InputProcessableFormat,
|
2022-04-03 01:56:29 +00:00
|
|
|
repo::{
|
|
|
|
Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, UploadId,
|
|
|
|
},
|
2022-04-01 21:51:12 +00:00
|
|
|
serde_str::Serde,
|
2022-04-02 21:44:03 +00:00
|
|
|
store::{Identifier, Store},
|
2022-03-29 17:51:16 +00:00
|
|
|
};
|
2023-01-29 17:47:28 +00:00
|
|
|
use base64::{prelude::BASE64_STANDARD, Engine};
|
2022-04-01 21:51:12 +00:00
|
|
|
use std::{future::Future, path::PathBuf, pin::Pin};
|
2022-04-02 23:53:03 +00:00
|
|
|
use tracing::Instrument;
|
2022-04-01 16:51:46 +00:00
|
|
|
|
|
|
|
mod cleanup;
|
2022-04-01 21:51:12 +00:00
|
|
|
mod process;
|
2022-04-01 16:51:46 +00:00
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct Base64Bytes(Vec<u8>);
|
|
|
|
|
|
|
|
impl serde::Serialize for Base64Bytes {
|
|
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
|
|
where
|
|
|
|
S: serde::Serializer,
|
|
|
|
{
|
2023-01-29 17:47:28 +00:00
|
|
|
let s = BASE64_STANDARD.encode(&self.0);
|
2022-10-02 03:47:52 +00:00
|
|
|
s.serialize(serializer)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'de> serde::Deserialize<'de> for Base64Bytes {
|
|
|
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
|
|
|
where
|
|
|
|
D: serde::Deserializer<'de>,
|
|
|
|
{
|
|
|
|
let s: String = serde::Deserialize::deserialize(deserializer)?;
|
2023-01-29 17:47:28 +00:00
|
|
|
BASE64_STANDARD
|
|
|
|
.decode(s)
|
2022-10-02 03:47:52 +00:00
|
|
|
.map(Base64Bytes)
|
|
|
|
.map_err(|e| serde::de::Error::custom(e.to_string()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-01 16:51:46 +00:00
|
|
|
const CLEANUP_QUEUE: &str = "cleanup";
|
2022-04-01 21:51:12 +00:00
|
|
|
const PROCESS_QUEUE: &str = "process";
|
2022-03-29 17:51:16 +00:00
|
|
|
|
|
|
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
2022-04-01 21:51:12 +00:00
|
|
|
enum Cleanup {
|
2022-04-02 21:44:03 +00:00
|
|
|
Hash {
|
2022-10-02 03:47:52 +00:00
|
|
|
hash: Base64Bytes,
|
2022-04-02 21:44:03 +00:00
|
|
|
},
|
|
|
|
Identifier {
|
2022-10-02 03:47:52 +00:00
|
|
|
identifier: Base64Bytes,
|
2022-04-02 21:44:03 +00:00
|
|
|
},
|
|
|
|
Alias {
|
|
|
|
alias: Serde<Alias>,
|
|
|
|
token: Serde<DeleteToken>,
|
|
|
|
},
|
2022-04-11 21:56:39 +00:00
|
|
|
Variant {
|
2022-10-02 03:47:52 +00:00
|
|
|
hash: Base64Bytes,
|
2022-04-11 21:56:39 +00:00
|
|
|
},
|
|
|
|
AllVariants,
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
2022-04-01 21:51:12 +00:00
|
|
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
|
|
|
enum Process {
|
|
|
|
Ingest {
|
2022-10-02 03:47:52 +00:00
|
|
|
identifier: Base64Bytes,
|
2022-04-03 01:56:29 +00:00
|
|
|
upload_id: Serde<UploadId>,
|
2022-04-01 21:51:12 +00:00
|
|
|
declared_alias: Option<Serde<Alias>>,
|
|
|
|
},
|
|
|
|
Generate {
|
2023-07-13 18:48:59 +00:00
|
|
|
target_format: InputProcessableFormat,
|
2022-04-01 21:51:12 +00:00
|
|
|
source: Serde<Alias>,
|
|
|
|
process_path: PathBuf,
|
|
|
|
process_args: Vec<String>,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2022-04-02 21:44:03 +00:00
|
|
|
pub(crate) async fn cleanup_alias<R: QueueRepo>(
|
|
|
|
repo: &R,
|
|
|
|
alias: Alias,
|
|
|
|
token: DeleteToken,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
let job = serde_json::to_vec(&Cleanup::Alias {
|
|
|
|
alias: Serde::new(alias),
|
|
|
|
token: Serde::new(token),
|
|
|
|
})?;
|
|
|
|
repo.push(CLEANUP_QUEUE, job.into()).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) async fn cleanup_hash<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
|
|
|
|
let job = serde_json::to_vec(&Cleanup::Hash {
|
2022-10-02 03:47:52 +00:00
|
|
|
hash: Base64Bytes(hash.as_ref().to_vec()),
|
2022-03-29 17:51:16 +00:00
|
|
|
})?;
|
2022-04-01 16:51:46 +00:00
|
|
|
repo.push(CLEANUP_QUEUE, job.into()).await?;
|
2022-03-29 17:51:16 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-04-02 21:44:03 +00:00
|
|
|
pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
|
|
|
|
repo: &R,
|
|
|
|
identifier: I,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
let job = serde_json::to_vec(&Cleanup::Identifier {
|
2022-10-02 03:47:52 +00:00
|
|
|
identifier: Base64Bytes(identifier.to_bytes()?),
|
2022-04-02 21:44:03 +00:00
|
|
|
})?;
|
|
|
|
repo.push(CLEANUP_QUEUE, job.into()).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-04-11 21:56:39 +00:00
|
|
|
async fn cleanup_variants<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
|
|
|
|
let job = serde_json::to_vec(&Cleanup::Variant {
|
2022-10-02 03:47:52 +00:00
|
|
|
hash: Base64Bytes(hash.as_ref().to_vec()),
|
2022-04-11 21:56:39 +00:00
|
|
|
})?;
|
|
|
|
repo.push(CLEANUP_QUEUE, job.into()).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) async fn cleanup_all_variants<R: QueueRepo>(repo: &R) -> Result<(), Error> {
|
|
|
|
let job = serde_json::to_vec(&Cleanup::AllVariants)?;
|
|
|
|
repo.push(CLEANUP_QUEUE, job.into()).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-04-01 21:51:12 +00:00
|
|
|
pub(crate) async fn queue_ingest<R: QueueRepo>(
|
|
|
|
repo: &R,
|
|
|
|
identifier: Vec<u8>,
|
2022-04-03 01:56:29 +00:00
|
|
|
upload_id: UploadId,
|
2022-04-01 21:51:12 +00:00
|
|
|
declared_alias: Option<Alias>,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
let job = serde_json::to_vec(&Process::Ingest {
|
2022-10-02 03:47:52 +00:00
|
|
|
identifier: Base64Bytes(identifier),
|
2022-04-01 21:51:12 +00:00
|
|
|
declared_alias: declared_alias.map(Serde::new),
|
2022-04-03 01:56:29 +00:00
|
|
|
upload_id: Serde::new(upload_id),
|
2022-04-01 21:51:12 +00:00
|
|
|
})?;
|
|
|
|
repo.push(PROCESS_QUEUE, job.into()).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) async fn queue_generate<R: QueueRepo>(
|
|
|
|
repo: &R,
|
2023-07-13 18:48:59 +00:00
|
|
|
target_format: InputProcessableFormat,
|
2022-04-01 21:51:12 +00:00
|
|
|
source: Alias,
|
|
|
|
process_path: PathBuf,
|
|
|
|
process_args: Vec<String>,
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
let job = serde_json::to_vec(&Process::Generate {
|
|
|
|
target_format,
|
|
|
|
source: Serde::new(source),
|
|
|
|
process_path,
|
|
|
|
process_args,
|
|
|
|
})?;
|
|
|
|
repo.push(PROCESS_QUEUE, job.into()).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
2022-03-29 18:18:47 +00:00
|
|
|
|
2022-04-02 21:44:03 +00:00
|
|
|
pub(crate) async fn process_cleanup<R: FullRepo, S: Store>(repo: R, store: S, worker_id: String) {
|
2022-04-02 23:53:03 +00:00
|
|
|
process_jobs(&repo, &store, worker_id, CLEANUP_QUEUE, cleanup::perform).await
|
2022-04-01 21:51:12 +00:00
|
|
|
}
|
2022-03-29 17:51:16 +00:00
|
|
|
|
2022-04-02 22:40:04 +00:00
|
|
|
pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>(
|
2022-04-02 21:44:03 +00:00
|
|
|
repo: R,
|
|
|
|
store: S,
|
2023-07-22 16:15:30 +00:00
|
|
|
process_map: ProcessMap,
|
2023-07-22 17:31:01 +00:00
|
|
|
config: Configuration,
|
2022-04-02 21:44:03 +00:00
|
|
|
worker_id: String,
|
|
|
|
) {
|
2023-07-22 16:15:30 +00:00
|
|
|
process_image_jobs(
|
|
|
|
&repo,
|
|
|
|
&store,
|
|
|
|
&process_map,
|
2023-07-22 17:31:01 +00:00
|
|
|
&config,
|
2023-07-22 16:15:30 +00:00
|
|
|
worker_id,
|
|
|
|
PROCESS_QUEUE,
|
|
|
|
process::perform,
|
|
|
|
)
|
|
|
|
.await
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
2022-04-01 21:51:12 +00:00
|
|
|
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
|
|
|
|
|
2022-04-02 23:53:03 +00:00
|
|
|
async fn process_jobs<R, S, F>(
|
|
|
|
repo: &R,
|
|
|
|
store: &S,
|
|
|
|
worker_id: String,
|
|
|
|
queue: &'static str,
|
|
|
|
callback: F,
|
|
|
|
) where
|
2022-03-29 17:51:16 +00:00
|
|
|
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
|
|
|
|
R::Bytes: Clone,
|
|
|
|
S: Store,
|
2022-04-01 21:51:12 +00:00
|
|
|
for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
|
2022-03-29 17:51:16 +00:00
|
|
|
{
|
|
|
|
loop {
|
2022-04-02 23:53:03 +00:00
|
|
|
let res = job_loop(repo, store, worker_id.clone(), queue, callback).await;
|
2022-04-01 21:51:12 +00:00
|
|
|
|
|
|
|
if let Err(e) = res {
|
2023-01-29 17:57:59 +00:00
|
|
|
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
|
|
|
tracing::warn!("{}", format!("{e:?}"));
|
2022-04-01 21:51:12 +00:00
|
|
|
continue;
|
|
|
|
}
|
2022-03-29 18:18:47 +00:00
|
|
|
|
2022-04-01 21:51:12 +00:00
|
|
|
break;
|
2022-03-29 18:18:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-02 23:53:03 +00:00
|
|
|
async fn job_loop<R, S, F>(
|
|
|
|
repo: &R,
|
|
|
|
store: &S,
|
|
|
|
worker_id: String,
|
|
|
|
queue: &'static str,
|
|
|
|
callback: F,
|
|
|
|
) -> Result<(), Error>
|
2022-03-29 18:18:47 +00:00
|
|
|
where
|
|
|
|
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
|
|
|
|
R::Bytes: Clone,
|
|
|
|
S: Store,
|
2022-04-01 21:51:12 +00:00
|
|
|
for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
|
2022-03-29 18:18:47 +00:00
|
|
|
{
|
2022-04-01 21:51:12 +00:00
|
|
|
loop {
|
2022-04-02 23:53:03 +00:00
|
|
|
let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?;
|
2022-03-29 18:18:47 +00:00
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
|
2022-04-02 23:53:03 +00:00
|
|
|
|
|
|
|
span.in_scope(|| (callback)(repo, store, bytes.as_ref()))
|
|
|
|
.instrument(span)
|
|
|
|
.await?;
|
2022-04-01 21:51:12 +00:00
|
|
|
}
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
2023-07-22 16:15:30 +00:00
|
|
|
|
|
|
|
async fn process_image_jobs<R, S, F>(
|
|
|
|
repo: &R,
|
|
|
|
store: &S,
|
|
|
|
process_map: &ProcessMap,
|
2023-07-22 17:31:01 +00:00
|
|
|
config: &Configuration,
|
2023-07-22 16:15:30 +00:00
|
|
|
worker_id: String,
|
|
|
|
queue: &'static str,
|
|
|
|
callback: F,
|
|
|
|
) where
|
|
|
|
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
|
|
|
|
R::Bytes: Clone,
|
|
|
|
S: Store,
|
2023-07-22 17:31:01 +00:00
|
|
|
for<'a> F: Fn(
|
|
|
|
&'a R,
|
|
|
|
&'a S,
|
|
|
|
&'a ProcessMap,
|
|
|
|
&'a Configuration,
|
|
|
|
&'a [u8],
|
|
|
|
) -> LocalBoxFuture<'a, Result<(), Error>>
|
|
|
|
+ Copy,
|
2023-07-22 16:15:30 +00:00
|
|
|
{
|
|
|
|
loop {
|
2023-07-22 17:31:01 +00:00
|
|
|
let res = image_job_loop(
|
|
|
|
repo,
|
|
|
|
store,
|
|
|
|
process_map,
|
|
|
|
config,
|
|
|
|
worker_id.clone(),
|
|
|
|
queue,
|
|
|
|
callback,
|
|
|
|
)
|
|
|
|
.await;
|
2023-07-22 16:15:30 +00:00
|
|
|
|
|
|
|
if let Err(e) = res {
|
|
|
|
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
|
|
|
tracing::warn!("{}", format!("{e:?}"));
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn image_job_loop<R, S, F>(
|
|
|
|
repo: &R,
|
|
|
|
store: &S,
|
|
|
|
process_map: &ProcessMap,
|
2023-07-22 17:31:01 +00:00
|
|
|
config: &Configuration,
|
2023-07-22 16:15:30 +00:00
|
|
|
worker_id: String,
|
|
|
|
queue: &'static str,
|
|
|
|
callback: F,
|
|
|
|
) -> Result<(), Error>
|
|
|
|
where
|
|
|
|
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
|
|
|
|
R::Bytes: Clone,
|
|
|
|
S: Store,
|
2023-07-22 17:31:01 +00:00
|
|
|
for<'a> F: Fn(
|
|
|
|
&'a R,
|
|
|
|
&'a S,
|
|
|
|
&'a ProcessMap,
|
|
|
|
&'a Configuration,
|
|
|
|
&'a [u8],
|
|
|
|
) -> LocalBoxFuture<'a, Result<(), Error>>
|
|
|
|
+ Copy,
|
2023-07-22 16:15:30 +00:00
|
|
|
{
|
|
|
|
loop {
|
|
|
|
let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?;
|
|
|
|
|
|
|
|
let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
|
|
|
|
|
2023-07-22 17:31:01 +00:00
|
|
|
span.in_scope(|| (callback)(repo, store, process_map, config, bytes.as_ref()))
|
2023-07-22 16:15:30 +00:00
|
|
|
.instrument(span)
|
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
}
|