2023-09-06 01:45:07 +00:00
|
|
|
use reqwest_middleware::ClientWithMiddleware;
|
|
|
|
|
2022-04-01 21:51:12 +00:00
|
|
|
use crate::{
|
2023-07-22 16:15:30 +00:00
|
|
|
concurrent_processor::ProcessMap,
|
2023-07-22 17:31:01 +00:00
|
|
|
config::Configuration,
|
2023-07-18 03:30:10 +00:00
|
|
|
error::{Error, UploadError},
|
2023-07-13 03:12:21 +00:00
|
|
|
formats::InputProcessableFormat,
|
2023-09-05 02:51:27 +00:00
|
|
|
future::LocalBoxFuture,
|
2022-04-02 21:44:03 +00:00
|
|
|
ingest::Session,
|
2023-09-05 02:51:27 +00:00
|
|
|
queue::Process,
|
2023-08-16 00:19:03 +00:00
|
|
|
repo::{Alias, ArcRepo, UploadId, UploadResult},
|
2022-04-01 21:51:12 +00:00
|
|
|
serde_str::Serde,
|
2023-09-02 23:30:45 +00:00
|
|
|
store::Store,
|
2023-08-24 00:10:10 +00:00
|
|
|
stream::StreamMap,
|
2022-04-01 21:51:12 +00:00
|
|
|
};
|
2023-09-02 23:30:45 +00:00
|
|
|
use std::{path::PathBuf, sync::Arc};
|
2022-04-01 21:51:12 +00:00
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
pub(super) fn perform<'a, S>(
|
|
|
|
repo: &'a ArcRepo,
|
2022-04-01 21:51:12 +00:00
|
|
|
store: &'a S,
|
2023-09-06 01:45:07 +00:00
|
|
|
client: &'a ClientWithMiddleware,
|
2023-07-22 16:15:30 +00:00
|
|
|
process_map: &'a ProcessMap,
|
2023-07-22 17:31:01 +00:00
|
|
|
config: &'a Configuration,
|
2023-09-03 17:47:06 +00:00
|
|
|
job: serde_json::Value,
|
2022-04-01 21:51:12 +00:00
|
|
|
) -> LocalBoxFuture<'a, Result<(), Error>>
|
|
|
|
where
|
2022-04-02 22:40:04 +00:00
|
|
|
S: Store + 'static,
|
2022-04-01 21:51:12 +00:00
|
|
|
{
|
|
|
|
Box::pin(async move {
|
2023-09-03 17:47:06 +00:00
|
|
|
match serde_json::from_value(job) {
|
2022-04-01 21:51:12 +00:00
|
|
|
Ok(job) => match job {
|
|
|
|
Process::Ingest {
|
2023-09-02 23:30:45 +00:00
|
|
|
identifier,
|
2022-04-01 21:51:12 +00:00
|
|
|
upload_id,
|
|
|
|
declared_alias,
|
|
|
|
} => {
|
2022-04-02 21:44:03 +00:00
|
|
|
process_ingest(
|
2022-04-01 21:51:12 +00:00
|
|
|
repo,
|
|
|
|
store,
|
2023-09-06 01:45:07 +00:00
|
|
|
client,
|
2023-09-02 23:30:45 +00:00
|
|
|
Arc::from(identifier),
|
2022-04-03 01:56:29 +00:00
|
|
|
Serde::into_inner(upload_id),
|
2022-04-01 21:51:12 +00:00
|
|
|
declared_alias.map(Serde::into_inner),
|
2023-07-22 17:31:01 +00:00
|
|
|
&config.media,
|
2022-04-01 21:51:12 +00:00
|
|
|
)
|
|
|
|
.await?
|
|
|
|
}
|
|
|
|
Process::Generate {
|
|
|
|
target_format,
|
|
|
|
source,
|
|
|
|
process_path,
|
|
|
|
process_args,
|
|
|
|
} => {
|
|
|
|
generate(
|
|
|
|
repo,
|
|
|
|
store,
|
2023-07-22 16:15:30 +00:00
|
|
|
process_map,
|
2022-04-01 21:51:12 +00:00
|
|
|
target_format,
|
|
|
|
Serde::into_inner(source),
|
|
|
|
process_path,
|
|
|
|
process_args,
|
2023-07-22 17:31:01 +00:00
|
|
|
config,
|
2022-04-01 21:51:12 +00:00
|
|
|
)
|
|
|
|
.await?
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(e) => {
|
2023-01-29 17:57:59 +00:00
|
|
|
tracing::warn!("Invalid job: {}", format!("{e}"));
|
2022-04-01 21:51:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2023-09-06 01:45:07 +00:00
|
|
|
#[tracing::instrument(skip(repo, store, client, media))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn process_ingest<S>(
|
|
|
|
repo: &ArcRepo,
|
2022-04-01 21:51:12 +00:00
|
|
|
store: &S,
|
2023-09-06 01:45:07 +00:00
|
|
|
client: &ClientWithMiddleware,
|
2023-09-02 23:30:45 +00:00
|
|
|
unprocessed_identifier: Arc<str>,
|
2022-04-02 21:44:03 +00:00
|
|
|
upload_id: UploadId,
|
2022-04-01 21:51:12 +00:00
|
|
|
declared_alias: Option<Alias>,
|
2023-07-22 17:31:01 +00:00
|
|
|
media: &crate::config::Media,
|
2022-04-02 21:44:03 +00:00
|
|
|
) -> Result<(), Error>
|
|
|
|
where
|
2023-07-18 03:30:10 +00:00
|
|
|
S: Store + 'static,
|
2022-04-02 21:44:03 +00:00
|
|
|
{
|
|
|
|
let fut = async {
|
2023-07-18 03:30:10 +00:00
|
|
|
let ident = unprocessed_identifier.clone();
|
|
|
|
let store2 = store.clone();
|
|
|
|
let repo = repo.clone();
|
2023-09-06 01:45:07 +00:00
|
|
|
let client = client.clone();
|
2023-07-18 03:30:10 +00:00
|
|
|
|
2023-07-22 17:31:01 +00:00
|
|
|
let media = media.clone();
|
2023-09-04 02:30:47 +00:00
|
|
|
let error_boundary = crate::sync::spawn(async move {
|
2023-07-18 03:30:10 +00:00
|
|
|
let stream = store2
|
|
|
|
.to_stream(&ident, None, None)
|
|
|
|
.await?
|
2023-08-24 00:10:10 +00:00
|
|
|
.map(|res| res.map_err(Error::from));
|
2023-07-18 03:30:10 +00:00
|
|
|
|
|
|
|
let session =
|
2023-09-06 01:45:07 +00:00
|
|
|
crate::ingest::ingest(&repo, &store2, &client, stream, declared_alias, &media)
|
|
|
|
.await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2023-09-02 23:30:45 +00:00
|
|
|
Ok(session) as Result<Session, Error>
|
2023-07-18 03:30:10 +00:00
|
|
|
})
|
|
|
|
.await;
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-04-03 02:15:39 +00:00
|
|
|
store.remove(&unprocessed_identifier).await?;
|
|
|
|
|
2023-07-18 03:30:10 +00:00
|
|
|
error_boundary.map_err(|_| UploadError::Canceled)?
|
2022-04-02 21:44:03 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
let result = match fut.await {
|
2023-07-26 01:08:18 +00:00
|
|
|
Ok(session) => {
|
2022-04-02 21:44:03 +00:00
|
|
|
let alias = session.alias().take().expect("Alias should exist").clone();
|
2023-07-26 01:08:18 +00:00
|
|
|
let token = session.disarm();
|
2023-07-27 03:53:41 +00:00
|
|
|
UploadResult::Success { alias, token }
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
Err(e) => {
|
2023-07-10 22:15:43 +00:00
|
|
|
tracing::warn!("Failed to ingest\n{}\n{}", format!("{e}"), format!("{e:?}"));
|
2022-04-02 21:44:03 +00:00
|
|
|
|
|
|
|
UploadResult::Failure {
|
2023-07-17 02:51:14 +00:00
|
|
|
message: e.root_cause().to_string(),
|
2023-09-02 01:50:10 +00:00
|
|
|
code: e.error_code().into_owned(),
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2023-09-01 23:41:04 +00:00
|
|
|
repo.complete_upload(upload_id, result).await?;
|
2022-04-02 21:44:03 +00:00
|
|
|
|
|
|
|
Ok(())
|
2022-04-01 21:51:12 +00:00
|
|
|
}
|
|
|
|
|
2023-07-22 16:15:30 +00:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2023-09-03 23:21:46 +00:00
|
|
|
#[tracing::instrument(skip(repo, store, process_map, process_path, process_args, config))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn generate<S: Store + 'static>(
|
|
|
|
repo: &ArcRepo,
|
2022-04-01 21:51:12 +00:00
|
|
|
store: &S,
|
2023-07-22 16:15:30 +00:00
|
|
|
process_map: &ProcessMap,
|
2023-07-13 03:12:21 +00:00
|
|
|
target_format: InputProcessableFormat,
|
2022-04-01 21:51:12 +00:00
|
|
|
source: Alias,
|
|
|
|
process_path: PathBuf,
|
|
|
|
process_args: Vec<String>,
|
2023-07-22 17:31:01 +00:00
|
|
|
config: &Configuration,
|
2022-04-01 21:51:12 +00:00
|
|
|
) -> Result<(), Error> {
|
2023-07-05 21:46:44 +00:00
|
|
|
let Some(hash) = repo.hash(&source).await? else {
|
|
|
|
// Nothing to do
|
|
|
|
return Ok(());
|
|
|
|
};
|
2022-04-02 22:40:04 +00:00
|
|
|
|
|
|
|
let path_string = process_path.to_string_lossy().to_string();
|
2023-08-16 00:19:03 +00:00
|
|
|
let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?;
|
2022-04-02 22:40:04 +00:00
|
|
|
|
|
|
|
if identifier_opt.is_some() {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
2023-07-22 17:31:01 +00:00
|
|
|
let original_details = crate::ensure_details(repo, store, config, &source).await?;
|
2022-10-01 00:38:11 +00:00
|
|
|
|
2022-04-02 22:40:04 +00:00
|
|
|
crate::generate::generate(
|
|
|
|
repo,
|
|
|
|
store,
|
2023-07-22 16:15:30 +00:00
|
|
|
process_map,
|
2022-04-02 22:40:04 +00:00
|
|
|
target_format,
|
|
|
|
source,
|
|
|
|
process_path,
|
|
|
|
process_args,
|
2023-07-13 18:48:59 +00:00
|
|
|
original_details.video_format(),
|
2022-10-01 00:38:11 +00:00
|
|
|
None,
|
2023-07-22 17:31:01 +00:00
|
|
|
&config.media,
|
2022-04-02 22:40:04 +00:00
|
|
|
hash,
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
Ok(())
|
2022-04-01 21:51:12 +00:00
|
|
|
}
|