2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-31 23:11:26 +00:00

Add ability to claim uploads

This commit is contained in:
Aode (lion) 2022-04-02 21:15:39 -05:00
parent 8734dfbdc7
commit 29f0774331
3 changed files with 58 additions and 8 deletions

View file

@ -61,7 +61,7 @@ where
let identifier = store.save_async_read(&mut reader).await?; let identifier = store.save_async_read(&mut reader).await?;
self.identifier = Some(identifier.clone()); self.identifier = Some(identifier);
Ok(()) Ok(())
} }

View file

@ -48,6 +48,8 @@ mod stream;
mod tmp_file; mod tmp_file;
mod validate; mod validate;
use crate::repo::UploadResult;
use self::{ use self::{
backgrounded::Backgrounded, backgrounded::Backgrounded,
config::{Configuration, ImageFormat, Operation}, config::{Configuration, ImageFormat, Operation},
@ -60,7 +62,7 @@ use self::{
middleware::{Deadline, Internal}, middleware::{Deadline, Internal},
migrate::LatestDb, migrate::LatestDb,
queue::queue_generate, queue::queue_generate,
repo::{Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo}, repo::{Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo, UploadId},
serde_str::Serde, serde_str::Serde,
store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store},
stream::{StreamLimit, StreamTimeout}, stream::{StreamLimit, StreamTimeout},
@ -165,7 +167,7 @@ async fn upload_backgrounded<R: FullRepo, S: Store>(
queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?; queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?;
files.push(serde_json::json!({ files.push(serde_json::json!({
"file": upload_id.to_string(), "upload_id": upload_id.to_string(),
})); }));
} }
@ -173,12 +175,51 @@ async fn upload_backgrounded<R: FullRepo, S: Store>(
image.result.disarm(); image.result.disarm();
} }
Ok(HttpResponse::Created().json(&serde_json::json!({ Ok(HttpResponse::Accepted().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
"files": files "uploads": files
}))) })))
} }
#[derive(Debug, serde::Deserialize)]
struct ClaimQuery {
upload_id: Serde<UploadId>,
}
/// Claim a backgrounded upload
#[instrument(name = "Waiting on upload", skip(repo))]
async fn claim_upload<R: FullRepo>(
repo: web::Data<R>,
query: web::Query<ClaimQuery>,
) -> Result<HttpResponse, Error> {
let upload_id = Serde::into_inner(query.into_inner().upload_id);
match actix_rt::time::timeout(Duration::from_secs(10), repo.wait(upload_id)).await {
Ok(wait_res) => {
let upload_result = wait_res?;
repo.claim(upload_id).await?;
match upload_result {
UploadResult::Success { alias, token } => {
Ok(HttpResponse::Ok().json(&serde_json::json!({
"msg": "ok",
"files": [{
"file": alias.to_string(),
"delete_token": token.to_string(),
}]
})))
}
UploadResult::Failure { message } => Ok(HttpResponse::UnprocessableEntity().json(
&serde_json::json!({
"msg": message,
}),
)),
}
}
Err(_) => Ok(HttpResponse::NoContent().finish()),
}
}
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
struct UrlQuery { struct UrlQuery {
url: String, url: String,
@ -727,10 +768,17 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
.route(web::post().to(upload::<R, S>)), .route(web::post().to(upload::<R, S>)),
) )
.service( .service(
web::resource("/backgrounded") web::scope("/backgrounded")
.service(
web::resource("")
.guard(guard::Post())
.wrap(backgrounded_form.clone()) .wrap(backgrounded_form.clone())
.route(web::post().to(upload_backgrounded::<R, S>)), .route(web::post().to(upload_backgrounded::<R, S>)),
) )
.service(
web::resource("/claim").route(web::get().to(claim_upload::<R>)),
),
)
.service(web::resource("/download").route(web::get().to(download::<R, S>))) .service(web::resource("/download").route(web::get().to(download::<R, S>)))
.service( .service(
web::resource("/delete/{delete_token}/{filename}") web::resource("/delete/{delete_token}/{filename}")

View file

@ -90,6 +90,8 @@ where
let token = session.delete_token().await?; let token = session.delete_token().await?;
store.remove(&unprocessed_identifier).await?;
Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error> Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error>
}; };