mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Extract ProcessMap out of static
This commit is contained in:
parent
8d35c2449d
commit
ac48003f45
5 changed files with 175 additions and 62 deletions
|
@ -5,11 +5,11 @@ use crate::{
|
||||||
use actix_web::web;
|
use actix_web::web;
|
||||||
use dashmap::{mapref::entry::Entry, DashMap};
|
use dashmap::{mapref::entry::Entry, DashMap};
|
||||||
use flume::{r#async::RecvFut, Receiver, Sender};
|
use flume::{r#async::RecvFut, Receiver, Sender};
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
@ -18,14 +18,81 @@ type OutcomeReceiver = Receiver<(Details, web::Bytes)>;
|
||||||
|
|
||||||
type ProcessMapKey = (Vec<u8>, PathBuf);
|
type ProcessMapKey = (Vec<u8>, PathBuf);
|
||||||
|
|
||||||
type ProcessMap = DashMap<ProcessMapKey, OutcomeReceiver>;
|
type ProcessMapInner = DashMap<ProcessMapKey, OutcomeReceiver>;
|
||||||
|
|
||||||
static PROCESS_MAP: Lazy<ProcessMap> = Lazy::new(DashMap::new);
|
#[derive(Debug, Default, Clone)]
|
||||||
|
pub(crate) struct ProcessMap {
|
||||||
|
process_map: Arc<ProcessMapInner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessMap {
|
||||||
|
pub(super) fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn process<Fut>(
|
||||||
|
&self,
|
||||||
|
hash: &[u8],
|
||||||
|
path: PathBuf,
|
||||||
|
fut: Fut,
|
||||||
|
) -> Result<(Details, web::Bytes), Error>
|
||||||
|
where
|
||||||
|
Fut: Future<Output = Result<(Details, web::Bytes), Error>>,
|
||||||
|
{
|
||||||
|
let key = (hash.to_vec(), path.clone());
|
||||||
|
|
||||||
|
let (sender, receiver) = flume::bounded(1);
|
||||||
|
|
||||||
|
let entry = self.process_map.entry(key.clone());
|
||||||
|
|
||||||
|
let (state, span) = match entry {
|
||||||
|
Entry::Vacant(vacant) => {
|
||||||
|
vacant.insert(receiver);
|
||||||
|
|
||||||
|
let span = tracing::info_span!(
|
||||||
|
"Processing image",
|
||||||
|
hash = &tracing::field::debug(&hex::encode(hash)),
|
||||||
|
path = &tracing::field::debug(&path),
|
||||||
|
completed = &tracing::field::Empty,
|
||||||
|
);
|
||||||
|
|
||||||
|
(CancelState::Sender { sender }, span)
|
||||||
|
}
|
||||||
|
Entry::Occupied(receiver) => {
|
||||||
|
let span = tracing::info_span!(
|
||||||
|
"Waiting for processed image",
|
||||||
|
hash = &tracing::field::debug(&hex::encode(hash)),
|
||||||
|
path = &tracing::field::debug(&path),
|
||||||
|
);
|
||||||
|
|
||||||
|
let receiver = receiver.get().clone().into_recv_async();
|
||||||
|
|
||||||
|
(CancelState::Receiver { receiver }, span)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
CancelSafeProcessor {
|
||||||
|
cancel_token: CancelToken {
|
||||||
|
span,
|
||||||
|
key,
|
||||||
|
state,
|
||||||
|
process_map: self.clone(),
|
||||||
|
},
|
||||||
|
fut,
|
||||||
|
}
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&self, key: &ProcessMapKey) -> Option<OutcomeReceiver> {
|
||||||
|
self.process_map.remove(key).map(|(_, v)| v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct CancelToken {
|
struct CancelToken {
|
||||||
span: Span,
|
span: Span,
|
||||||
key: ProcessMapKey,
|
key: ProcessMapKey,
|
||||||
state: CancelState,
|
state: CancelState,
|
||||||
|
process_map: ProcessMap,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum CancelState {
|
enum CancelState {
|
||||||
|
@ -44,7 +111,7 @@ impl CancelState {
|
||||||
}
|
}
|
||||||
|
|
||||||
pin_project_lite::pin_project! {
|
pin_project_lite::pin_project! {
|
||||||
pub(super) struct CancelSafeProcessor<F> {
|
struct CancelSafeProcessor<F> {
|
||||||
cancel_token: CancelToken,
|
cancel_token: CancelToken,
|
||||||
|
|
||||||
#[pin]
|
#[pin]
|
||||||
|
@ -52,50 +119,6 @@ pin_project_lite::pin_project! {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> CancelSafeProcessor<F>
|
|
||||||
where
|
|
||||||
F: Future<Output = Result<(Details, web::Bytes), Error>>,
|
|
||||||
{
|
|
||||||
pub(super) fn new(hash: &[u8], path: PathBuf, fut: F) -> Self {
|
|
||||||
let key = (hash.to_vec(), path.clone());
|
|
||||||
|
|
||||||
let (sender, receiver) = flume::bounded(1);
|
|
||||||
|
|
||||||
let entry = PROCESS_MAP.entry(key.clone());
|
|
||||||
|
|
||||||
let (state, span) = match entry {
|
|
||||||
Entry::Vacant(vacant) => {
|
|
||||||
vacant.insert(receiver);
|
|
||||||
let span = tracing::info_span!(
|
|
||||||
"Processing image",
|
|
||||||
hash = &tracing::field::debug(&hex::encode(hash)),
|
|
||||||
path = &tracing::field::debug(&path),
|
|
||||||
completed = &tracing::field::Empty,
|
|
||||||
);
|
|
||||||
(CancelState::Sender { sender }, span)
|
|
||||||
}
|
|
||||||
Entry::Occupied(receiver) => {
|
|
||||||
let span = tracing::info_span!(
|
|
||||||
"Waiting for processed image",
|
|
||||||
hash = &tracing::field::debug(&hex::encode(hash)),
|
|
||||||
path = &tracing::field::debug(&path),
|
|
||||||
);
|
|
||||||
(
|
|
||||||
CancelState::Receiver {
|
|
||||||
receiver: receiver.get().clone().into_recv_async(),
|
|
||||||
},
|
|
||||||
span,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
CancelSafeProcessor {
|
|
||||||
cancel_token: CancelToken { span, key, state },
|
|
||||||
fut,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F> Future for CancelSafeProcessor<F>
|
impl<F> Future for CancelSafeProcessor<F>
|
||||||
where
|
where
|
||||||
F: Future<Output = Result<(Details, web::Bytes), Error>>,
|
F: Future<Output = Result<(Details, web::Bytes), Error>>,
|
||||||
|
@ -106,20 +129,23 @@ where
|
||||||
let this = self.as_mut().project();
|
let this = self.as_mut().project();
|
||||||
|
|
||||||
let span = &this.cancel_token.span;
|
let span = &this.cancel_token.span;
|
||||||
|
let process_map = &this.cancel_token.process_map;
|
||||||
let state = &mut this.cancel_token.state;
|
let state = &mut this.cancel_token.state;
|
||||||
let key = &this.cancel_token.key;
|
let key = &this.cancel_token.key;
|
||||||
let fut = this.fut;
|
let fut = this.fut;
|
||||||
|
|
||||||
span.in_scope(|| match state {
|
span.in_scope(|| match state {
|
||||||
CancelState::Sender { sender } => fut.poll(cx).map(|res| {
|
CancelState::Sender { sender } => {
|
||||||
PROCESS_MAP.remove(key);
|
let res = std::task::ready!(fut.poll(cx));
|
||||||
|
|
||||||
|
process_map.remove(key);
|
||||||
|
|
||||||
if let Ok(tup) = &res {
|
if let Ok(tup) = &res {
|
||||||
let _ = sender.try_send(tup.clone());
|
let _ = sender.try_send(tup.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
res
|
Poll::Ready(res)
|
||||||
}),
|
}
|
||||||
CancelState::Receiver { ref mut receiver } => Pin::new(receiver)
|
CancelState::Receiver { ref mut receiver } => Pin::new(receiver)
|
||||||
.poll(cx)
|
.poll(cx)
|
||||||
.map(|res| res.map_err(|_| UploadError::Canceled.into())),
|
.map(|res| res.map_err(|_| UploadError::Canceled.into())),
|
||||||
|
@ -130,7 +156,7 @@ where
|
||||||
impl Drop for CancelToken {
|
impl Drop for CancelToken {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.state.is_sender() {
|
if self.state.is_sender() {
|
||||||
let completed = PROCESS_MAP.remove(&self.key).is_none();
|
let completed = self.process_map.remove(&self.key).is_none();
|
||||||
self.span.record("completed", completed);
|
self.span.record("completed", completed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
concurrent_processor::CancelSafeProcessor,
|
concurrent_processor::ProcessMap,
|
||||||
details::Details,
|
details::Details,
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
ffmpeg::ThumbnailFormat,
|
ffmpeg::ThumbnailFormat,
|
||||||
|
@ -17,6 +17,7 @@ use tracing::Instrument;
|
||||||
pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
|
pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
|
||||||
repo: &R,
|
repo: &R,
|
||||||
store: &S,
|
store: &S,
|
||||||
|
process_map: &ProcessMap,
|
||||||
format: InputProcessableFormat,
|
format: InputProcessableFormat,
|
||||||
alias: Alias,
|
alias: Alias,
|
||||||
thumbnail_path: PathBuf,
|
thumbnail_path: PathBuf,
|
||||||
|
@ -39,8 +40,9 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
|
||||||
hash.clone(),
|
hash.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let (details, bytes) =
|
let (details, bytes) = process_map
|
||||||
CancelSafeProcessor::new(hash.as_ref(), thumbnail_path, process_fut).await?;
|
.process(hash.as_ref(), thumbnail_path, process_fut)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok((details, bytes))
|
Ok((details, bytes))
|
||||||
}
|
}
|
||||||
|
|
25
src/lib.rs
25
src/lib.rs
|
@ -33,6 +33,7 @@ use actix_web::{
|
||||||
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
|
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
|
||||||
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
|
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
|
||||||
};
|
};
|
||||||
|
use concurrent_processor::ProcessMap;
|
||||||
use formats::InputProcessableFormat;
|
use formats::InputProcessableFormat;
|
||||||
use futures_util::{
|
use futures_util::{
|
||||||
stream::{empty, once},
|
stream::{empty, once},
|
||||||
|
@ -661,6 +662,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
||||||
ext: web::Path<String>,
|
ext: web::Path<String>,
|
||||||
repo: web::Data<R>,
|
repo: web::Data<R>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
|
process_map: web::Data<ProcessMap>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?;
|
let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?;
|
||||||
|
|
||||||
|
@ -723,6 +725,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
|
||||||
let (details, bytes) = generate::generate(
|
let (details, bytes) = generate::generate(
|
||||||
&repo,
|
&repo,
|
||||||
&store,
|
&store,
|
||||||
|
&process_map,
|
||||||
format,
|
format,
|
||||||
alias,
|
alias,
|
||||||
thumbnail_path,
|
thumbnail_path,
|
||||||
|
@ -1285,7 +1288,7 @@ fn configure_endpoints<
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn spawn_workers<R, S>(repo: R, store: S)
|
fn spawn_workers<R, S>(repo: R, store: S, process_map: ProcessMap)
|
||||||
where
|
where
|
||||||
R: FullRepo + 'static,
|
R: FullRepo + 'static,
|
||||||
S: Store + 'static,
|
S: Store + 'static,
|
||||||
|
@ -1297,8 +1300,14 @@ where
|
||||||
next_worker_id(),
|
next_worker_id(),
|
||||||
))
|
))
|
||||||
});
|
});
|
||||||
tracing::trace_span!(parent: None, "Spawn task")
|
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
|
||||||
.in_scope(|| actix_rt::spawn(queue::process_images(repo, store, next_worker_id())));
|
actix_rt::spawn(queue::process_images(
|
||||||
|
repo,
|
||||||
|
store,
|
||||||
|
process_map,
|
||||||
|
next_worker_id(),
|
||||||
|
))
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig) + Send + Clone>(
|
async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig) + Send + Clone>(
|
||||||
|
@ -1307,6 +1316,8 @@ async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig)
|
||||||
client: ClientWithMiddleware,
|
client: ClientWithMiddleware,
|
||||||
extra_config: F,
|
extra_config: F,
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
|
let process_map = ProcessMap::new();
|
||||||
|
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
|
|
||||||
|
@ -1314,11 +1325,12 @@ async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig)
|
||||||
let repo = repo.clone();
|
let repo = repo.clone();
|
||||||
let extra_config = extra_config.clone();
|
let extra_config = extra_config.clone();
|
||||||
|
|
||||||
spawn_workers(repo.clone(), store.clone());
|
spawn_workers(repo.clone(), store.clone(), process_map.clone());
|
||||||
|
|
||||||
App::new()
|
App::new()
|
||||||
.wrap(TracingLogger::default())
|
.wrap(TracingLogger::default())
|
||||||
.wrap(Deadline)
|
.wrap(Deadline)
|
||||||
|
.app_data(web::Data::new(process_map.clone()))
|
||||||
.configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config))
|
.configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config))
|
||||||
})
|
})
|
||||||
.bind(CONFIG.server.address)?
|
.bind(CONFIG.server.address)?
|
||||||
|
@ -1335,6 +1347,8 @@ async fn launch_object_store<
|
||||||
client: ClientWithMiddleware,
|
client: ClientWithMiddleware,
|
||||||
extra_config: F,
|
extra_config: F,
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
|
let process_map = ProcessMap::new();
|
||||||
|
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
|
|
||||||
|
@ -1342,11 +1356,12 @@ async fn launch_object_store<
|
||||||
let repo = repo.clone();
|
let repo = repo.clone();
|
||||||
let extra_config = extra_config.clone();
|
let extra_config = extra_config.clone();
|
||||||
|
|
||||||
spawn_workers(repo.clone(), store.clone());
|
spawn_workers(repo.clone(), store.clone(), process_map.clone());
|
||||||
|
|
||||||
App::new()
|
App::new()
|
||||||
.wrap(TracingLogger::default())
|
.wrap(TracingLogger::default())
|
||||||
.wrap(Deadline)
|
.wrap(Deadline)
|
||||||
|
.app_data(web::Data::new(process_map.clone()))
|
||||||
.configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config))
|
.configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config))
|
||||||
})
|
})
|
||||||
.bind(CONFIG.server.address)?
|
.bind(CONFIG.server.address)?
|
||||||
|
|
66
src/queue.rs
66
src/queue.rs
|
@ -1,4 +1,5 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
|
concurrent_processor::ProcessMap,
|
||||||
error::Error,
|
error::Error,
|
||||||
formats::InputProcessableFormat,
|
formats::InputProcessableFormat,
|
||||||
repo::{
|
repo::{
|
||||||
|
@ -161,9 +162,18 @@ pub(crate) async fn process_cleanup<R: FullRepo, S: Store>(repo: R, store: S, wo
|
||||||
pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>(
|
pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>(
|
||||||
repo: R,
|
repo: R,
|
||||||
store: S,
|
store: S,
|
||||||
|
process_map: ProcessMap,
|
||||||
worker_id: String,
|
worker_id: String,
|
||||||
) {
|
) {
|
||||||
process_jobs(&repo, &store, worker_id, PROCESS_QUEUE, process::perform).await
|
process_image_jobs(
|
||||||
|
&repo,
|
||||||
|
&store,
|
||||||
|
&process_map,
|
||||||
|
worker_id,
|
||||||
|
PROCESS_QUEUE,
|
||||||
|
process::perform,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
|
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
|
||||||
|
@ -216,3 +226,57 @@ where
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn process_image_jobs<R, S, F>(
|
||||||
|
repo: &R,
|
||||||
|
store: &S,
|
||||||
|
process_map: &ProcessMap,
|
||||||
|
worker_id: String,
|
||||||
|
queue: &'static str,
|
||||||
|
callback: F,
|
||||||
|
) where
|
||||||
|
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
|
||||||
|
R::Bytes: Clone,
|
||||||
|
S: Store,
|
||||||
|
for<'a> F:
|
||||||
|
Fn(&'a R, &'a S, &'a ProcessMap, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
|
||||||
|
{
|
||||||
|
loop {
|
||||||
|
let res =
|
||||||
|
image_job_loop(repo, store, process_map, worker_id.clone(), queue, callback).await;
|
||||||
|
|
||||||
|
if let Err(e) = res {
|
||||||
|
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||||
|
tracing::warn!("{}", format!("{e:?}"));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn image_job_loop<R, S, F>(
|
||||||
|
repo: &R,
|
||||||
|
store: &S,
|
||||||
|
process_map: &ProcessMap,
|
||||||
|
worker_id: String,
|
||||||
|
queue: &'static str,
|
||||||
|
callback: F,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
|
||||||
|
R::Bytes: Clone,
|
||||||
|
S: Store,
|
||||||
|
for<'a> F:
|
||||||
|
Fn(&'a R, &'a S, &'a ProcessMap, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
|
||||||
|
{
|
||||||
|
loop {
|
||||||
|
let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?;
|
||||||
|
|
||||||
|
let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
|
||||||
|
|
||||||
|
span.in_scope(|| (callback)(repo, store, process_map, bytes.as_ref()))
|
||||||
|
.instrument(span)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
|
concurrent_processor::ProcessMap,
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
formats::InputProcessableFormat,
|
formats::InputProcessableFormat,
|
||||||
ingest::Session,
|
ingest::Session,
|
||||||
|
@ -14,6 +15,7 @@ use std::path::PathBuf;
|
||||||
pub(super) fn perform<'a, R, S>(
|
pub(super) fn perform<'a, R, S>(
|
||||||
repo: &'a R,
|
repo: &'a R,
|
||||||
store: &'a S,
|
store: &'a S,
|
||||||
|
process_map: &'a ProcessMap,
|
||||||
job: &'a [u8],
|
job: &'a [u8],
|
||||||
) -> LocalBoxFuture<'a, Result<(), Error>>
|
) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||||
where
|
where
|
||||||
|
@ -47,6 +49,7 @@ where
|
||||||
generate(
|
generate(
|
||||||
repo,
|
repo,
|
||||||
store,
|
store,
|
||||||
|
process_map,
|
||||||
target_format,
|
target_format,
|
||||||
Serde::into_inner(source),
|
Serde::into_inner(source),
|
||||||
process_path,
|
process_path,
|
||||||
|
@ -126,10 +129,12 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn generate<R: FullRepo, S: Store + 'static>(
|
async fn generate<R: FullRepo, S: Store + 'static>(
|
||||||
repo: &R,
|
repo: &R,
|
||||||
store: &S,
|
store: &S,
|
||||||
|
process_map: &ProcessMap,
|
||||||
target_format: InputProcessableFormat,
|
target_format: InputProcessableFormat,
|
||||||
source: Alias,
|
source: Alias,
|
||||||
process_path: PathBuf,
|
process_path: PathBuf,
|
||||||
|
@ -155,6 +160,7 @@ async fn generate<R: FullRepo, S: Store + 'static>(
|
||||||
crate::generate::generate(
|
crate::generate::generate(
|
||||||
repo,
|
repo,
|
||||||
store,
|
store,
|
||||||
|
process_map,
|
||||||
target_format,
|
target_format,
|
||||||
source,
|
source,
|
||||||
process_path,
|
process_path,
|
||||||
|
|
Loading…
Reference in a new issue