mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-25 12:01:24 +00:00
Add internal endpoint to prune records for missing media
This commit is contained in:
parent
9971d3caeb
commit
13f4ec6b18
3 changed files with 104 additions and 5 deletions
33
src/lib.rs
33
src/lib.rs
|
@ -1063,6 +1063,38 @@ async fn prepare_upgrade<R: FullRepo>(
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(name = "Prune missing identifiers", skip(repo))]
|
||||||
|
async fn prune_missing<R: FullRepo>(
|
||||||
|
repo: web::Data<R>,
|
||||||
|
query: Option<web::Query<UpgradeQuery>>,
|
||||||
|
) -> Result<HttpResponse, Error> {
|
||||||
|
let total = repo.size().await?;
|
||||||
|
|
||||||
|
let progress = if let Some(progress) = repo.get("prune-missing-queued").await? {
|
||||||
|
progress
|
||||||
|
.as_ref()
|
||||||
|
.try_into()
|
||||||
|
.map(u64::from_be_bytes)
|
||||||
|
.unwrap_or(0)
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
|
||||||
|
let complete = repo.get("prune-missing-complete").await?.is_some();
|
||||||
|
|
||||||
|
let started = repo.get("prune-missing-started").await?.is_some();
|
||||||
|
|
||||||
|
if !started || query.is_some_and(|q| q.force) {
|
||||||
|
queue::prune_missing(&repo).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(HttpResponse::Ok().json(UpgradeResponse {
|
||||||
|
complete,
|
||||||
|
progress,
|
||||||
|
total,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Spawning variant cleanup", skip(repo))]
|
#[tracing::instrument(name = "Spawning variant cleanup", skip(repo))]
|
||||||
async fn clean_variants<R: FullRepo>(repo: web::Data<R>) -> Result<HttpResponse, Error> {
|
async fn clean_variants<R: FullRepo>(repo: web::Data<R>) -> Result<HttpResponse, Error> {
|
||||||
queue::cleanup_all_variants(&repo).await?;
|
queue::cleanup_all_variants(&repo).await?;
|
||||||
|
@ -1265,6 +1297,7 @@ fn configure_endpoints<
|
||||||
.service(
|
.service(
|
||||||
web::resource("/prepare_upgrade").route(web::post().to(prepare_upgrade::<R>)),
|
web::resource("/prepare_upgrade").route(web::post().to(prepare_upgrade::<R>)),
|
||||||
)
|
)
|
||||||
|
.service(web::resource("/prune_missing}").route(web::post().to(prune_missing::<R>)))
|
||||||
.configure(extra_config),
|
.configure(extra_config),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,7 @@ enum Cleanup {
|
||||||
hash: Base64Bytes,
|
hash: Base64Bytes,
|
||||||
},
|
},
|
||||||
AllVariants,
|
AllVariants,
|
||||||
|
Prune,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
@ -125,6 +126,12 @@ pub(crate) async fn cleanup_all_variants<R: QueueRepo>(repo: &R) -> Result<(), E
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn prune_missing<R: QueueRepo>(repo: &R) -> Result<(), Error> {
|
||||||
|
let job = serde_json::to_vec(&Cleanup::Prune)?;
|
||||||
|
repo.push(CLEANUP_QUEUE, job.into()).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn queue_prepare_upgrade<R: QueueRepo>(repo: &R) -> Result<(), Error> {
|
pub(crate) async fn queue_prepare_upgrade<R: QueueRepo>(repo: &R) -> Result<(), Error> {
|
||||||
let job = serde_json::to_vec(&Process::PrepareUpgrade)?;
|
let job = serde_json::to_vec(&Process::PrepareUpgrade)?;
|
||||||
repo.push(PROCESS_QUEUE, job.into()).await?;
|
repo.push(PROCESS_QUEUE, job.into()).await?;
|
||||||
|
|
|
@ -5,7 +5,7 @@ use crate::{
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
store::{Identifier, Store},
|
store::{Identifier, Store},
|
||||||
};
|
};
|
||||||
use futures_util::StreamExt;
|
use futures_util::TryStreamExt;
|
||||||
use reqwest_middleware::ClientWithMiddleware;
|
use reqwest_middleware::ClientWithMiddleware;
|
||||||
|
|
||||||
pub(super) fn perform<'a, R, S>(
|
pub(super) fn perform<'a, R, S>(
|
||||||
|
@ -15,8 +15,8 @@ pub(super) fn perform<'a, R, S>(
|
||||||
job: &'a [u8],
|
job: &'a [u8],
|
||||||
) -> LocalBoxFuture<'a, Result<(), Error>>
|
) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||||
where
|
where
|
||||||
R: FullRepo,
|
R: FullRepo + 'static,
|
||||||
S: Store,
|
S: Store + 'static,
|
||||||
{
|
{
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
match serde_json::from_slice(job) {
|
match serde_json::from_slice(job) {
|
||||||
|
@ -42,6 +42,7 @@ where
|
||||||
hash: Base64Bytes(hash),
|
hash: Base64Bytes(hash),
|
||||||
} => variant::<R, S>(repo, hash).await?,
|
} => variant::<R, S>(repo, hash).await?,
|
||||||
Cleanup::AllVariants => all_variants::<R, S>(repo).await?,
|
Cleanup::AllVariants => all_variants::<R, S>(repo).await?,
|
||||||
|
Cleanup::Prune => prune::<R, S>(repo, store).await?,
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!("Invalid job: {}", format!("{e}"));
|
tracing::warn!("Invalid job: {}", format!("{e}"));
|
||||||
|
@ -159,8 +160,7 @@ where
|
||||||
{
|
{
|
||||||
let mut hash_stream = Box::pin(repo.hashes().await);
|
let mut hash_stream = Box::pin(repo.hashes().await);
|
||||||
|
|
||||||
while let Some(res) = hash_stream.next().await {
|
while let Some(hash) = hash_stream.try_next().await? {
|
||||||
let hash = res?;
|
|
||||||
super::cleanup_variants(repo, hash).await?;
|
super::cleanup_variants(repo, hash).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,3 +181,62 @@ where
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn prune<R, S>(repo: &R, store: &S) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
R: FullRepo + 'static,
|
||||||
|
S: Store + 'static,
|
||||||
|
{
|
||||||
|
repo.set("prune-missing-started", b"1".to_vec().into())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut hash_stream = Box::pin(repo.hashes().await);
|
||||||
|
|
||||||
|
let mut count: u64 = 0;
|
||||||
|
|
||||||
|
while let Some(hash) = hash_stream.try_next().await? {
|
||||||
|
let repo = repo.clone();
|
||||||
|
let store = store.clone();
|
||||||
|
|
||||||
|
let res = actix_rt::spawn(async move {
|
||||||
|
let mut count = count;
|
||||||
|
|
||||||
|
if let Some(ident) = repo.identifier(hash.clone()).await? {
|
||||||
|
match store.len(&ident).await {
|
||||||
|
Err(e) if e.is_not_found() => {
|
||||||
|
super::cleanup_hash(&repo, hash).await?;
|
||||||
|
|
||||||
|
count += 1;
|
||||||
|
|
||||||
|
repo.set(
|
||||||
|
"prune-missing-queued",
|
||||||
|
Vec::from(count.to_be_bytes()).into(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(count) as Result<u64, Error>
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(Ok(updated)) => count = updated,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
tracing::warn!("Prune missing identifier failed - {e:?}");
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
tracing::warn!("Prune missing identifier panicked.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
repo.set("prune-missing-complete", b"1".to_vec().into())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue