From 668b68a23fbfe9b1b4bb985dca0b14326347d830 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 5 Dec 2023 16:58:52 -0600 Subject: [PATCH] Add internal endpoint to trigger upgrade preparation job --- README.md | 9 ++++ src/lib.rs | 101 ++++++++++++++++++++++++++----------------- src/queue.rs | 7 +++ src/queue/process.rs | 60 ++++++++++++++++++++++++- 4 files changed, 137 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 0a09980..4cd4aeb 100644 --- a/README.md +++ b/README.md @@ -510,6 +510,15 @@ A secure API key can be generated by any password generator. $ cp -r exports/2023-07-08T22:26:21.194126713Z sled-repo ``` 4. Starting pict-rs +- `POST /internal/prepare_upgrade?force={force}` Spawn a background task that will attempt to prepare the database + for the 0.5 upgrade. This process will attempt to generate metadata for all original uploads if + needed. + + This endpoint can be hit repeatedly to check the progress of the preparations. + + Optionally, the `force` query parameter can be passed with a value of `true` in order to make + pict-rs spawn another task if the current one seems stuck. + Additionally, all endpoints support setting deadlines, after which the request will cease processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an diff --git a/src/lib.rs b/src/lib.rs index f2e9ab9..410df01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -114,14 +114,22 @@ async fn ensure_details( return Err(UploadError::MissingAlias.into()); }; - let details = repo.details(&identifier).await?; + ensure_details_identifier(repo, store, &identifier, details_hint(alias)).await +} + +async fn ensure_details_identifier( + repo: &R, + store: &S, + identifier: &S::Identifier, + hint: Option, +) -> Result { + let details = repo.details(identifier).await?; if let Some(details) = details { tracing::debug!("details exist"); Ok(details) } else { tracing::debug!("generating new details from {:?}", identifier); - let hint = details_hint(alias); let new_details = Details::from_store( store.clone(), identifier.clone(), @@ -130,7 +138,7 @@ async fn ensure_details( ) .await?; tracing::debug!("storing details for {:?}", identifier); - repo.relate_details(&identifier, &new_details).await?; + repo.relate_details(identifier, &new_details).await?; tracing::debug!("stored"); Ok(new_details) } @@ -693,25 +701,9 @@ async fn process( .await?; if let Some(identifier) = identifier_opt { - let details = repo.details(&identifier).await?; + let hint = Some(ValidInputType::from_format(format)); - let details = if let Some(details) = details { - tracing::debug!("details exist"); - details - } else { - tracing::debug!("generating new details from {:?}", identifier); - let new_details = Details::from_store( - (**store).clone(), - identifier.clone(), - Some(ValidInputType::from_format(format)), - CONFIG.media.process_timeout, - ) - .await?; - tracing::debug!("storing details for {:?}", identifier); - repo.relate_details(&identifier, &new_details).await?; - tracing::debug!("stored"); - new_details - }; + let details = ensure_details_identifier(&repo, &store, &identifier, hint).await?; return ranged_file_resp(&store, identifier, range, details, not_found).await; } @@ -790,25 +782,9 @@ async fn process_head( .await?; if let Some(identifier) = identifier_opt { - let details = repo.details(&identifier).await?; + let hint = Some(ValidInputType::from_format(format)); - let details = if let Some(details) = details { - tracing::debug!("details exist"); - details - } else { - tracing::debug!("generating new details from {:?}", identifier); - let new_details = Details::from_store( - (**store).clone(), - identifier.clone(), - Some(ValidInputType::from_format(format)), - CONFIG.media.process_timeout, - ) - .await?; - tracing::debug!("storing details for {:?}", identifier); - repo.relate_details(&identifier, &new_details).await?; - tracing::debug!("stored"); - new_details - }; + let details = ensure_details_identifier(&repo, &store, &identifier, hint).await?; return ranged_file_head_resp(&store, identifier, range, details).await; } @@ -1043,6 +1019,50 @@ fn srv_head( builder } +#[derive(serde::Serialize)] +struct UpgradeResponse { + complete: bool, + progress: u64, + total: u64, +} + +#[derive(Debug, serde::Deserialize)] +struct UpgradeQuery { + force: bool, +} + +#[tracing::instrument(name = "Prepare for 0.5 upgrade", skip(repo))] +async fn prepare_upgrade( + repo: web::Data, + query: Option>, +) -> Result { + let total = repo.size().await?; + + let progress = if let Some(progress) = repo.get("upgrade-preparations-progress").await? { + progress + .as_ref() + .try_into() + .map(u64::from_be_bytes) + .unwrap_or(0) + } else { + 0 + }; + + let complete = repo.get("upgrade-preparations-complete").await?.is_some(); + + let started = repo.get("upgrade-preparations-started").await?.is_some(); + + if !started || query.is_some_and(|q| q.force) { + queue::queue_prepare_upgrade(&repo).await?; + } + + Ok(HttpResponse::Ok().json(UpgradeResponse { + complete, + progress, + total, + })) +} + #[tracing::instrument(name = "Spawning variant cleanup", skip(repo))] async fn clean_variants(repo: web::Data) -> Result { queue::cleanup_all_variants(&repo).await?; @@ -1242,6 +1262,9 @@ fn configure_endpoints< .service(web::resource("/aliases").route(web::get().to(aliases::))) .service(web::resource("/identifier").route(web::get().to(identifier::))) .service(web::resource("/set_not_found").route(web::post().to(set_not_found::))) + .service( + web::resource("/prepare_upgrade").route(web::post().to(prepare_upgrade::)), + ) .configure(extra_config), ); } diff --git a/src/queue.rs b/src/queue.rs index aa4fdcf..5f32ce6 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -76,6 +76,7 @@ enum Process { process_path: PathBuf, process_args: Vec, }, + PrepareUpgrade, } pub(crate) async fn cleanup_alias( @@ -124,6 +125,12 @@ pub(crate) async fn cleanup_all_variants(repo: &R) -> Result<(), E Ok(()) } +pub(crate) async fn queue_prepare_upgrade(repo: &R) -> Result<(), Error> { + let job = serde_json::to_vec(&Process::PrepareUpgrade)?; + repo.push(PROCESS_QUEUE, job.into()).await?; + Ok(()) +} + pub(crate) async fn queue_ingest( repo: &R, identifier: Vec, diff --git a/src/queue/process.rs b/src/queue/process.rs index 66ec37b..a0f9083 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -7,7 +7,7 @@ use crate::{ serde_str::Serde, store::{Identifier, Store}, }; -use futures_util::TryStreamExt; +use futures_util::{StreamExt, TryStreamExt}; use reqwest_middleware::ClientWithMiddleware; use std::path::PathBuf; use url::Url; @@ -62,6 +62,7 @@ where ) .await? } + Process::PrepareUpgrade => prepare_upgrade(repo, store).await?, }, Err(e) => { tracing::warn!("Invalid job: {}", format!("{e}")); @@ -225,3 +226,60 @@ async fn generate( Ok(()) } + +#[tracing::instrument(skip_all)] +async fn prepare_upgrade( + repo: &R, + store: &S, +) -> Result<(), Error> { + repo.set("upgrade-preparations-started", b"1".to_vec().into()) + .await?; + + let mut hashes = std::pin::pin!(repo.hashes().await); + + let mut count: u64 = 0; + + while let Some(res) = hashes.next().await { + match res { + Ok(hash) => { + let repo = repo.clone(); + let store = store.clone(); + + let res = actix_rt::spawn(async move { + if let Some(identifier) = repo.identifier(hash).await? { + crate::ensure_details_identifier(&repo, &store, &identifier, None).await?; + } + + repo.set( + "upgrade-preparations-progress", + Vec::from(count.to_be_bytes()).into(), + ) + .await?; + + Ok(()) as Result<(), Error> + }) + .await; + + match res { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!("Failed to ensure details for a hash: {e}"); + } + Err(_) => { + tracing::warn!("Panic while ensuring details for a hash"); + } + } + } + Err(e) => { + tracing::warn!("Skipping upgrade check for a hash: {e}"); + } + } + + count += 1; + } + + repo.set("upgrade-preparations-complete", b"1".to_vec().into()) + .await?; + + Ok(()) +}