2023-09-02 23:30:45 +00:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2023-09-10 22:55:13 +00:00
|
|
|
use streem::IntoStreamer;
|
2023-12-12 22:54:41 +00:00
|
|
|
use tracing::{Instrument, Span};
|
2023-09-10 22:55:13 +00:00
|
|
|
|
2022-04-01 16:51:46 +00:00
|
|
|
use crate::{
|
2023-07-23 00:41:50 +00:00
|
|
|
config::Configuration,
|
2022-04-02 21:44:03 +00:00
|
|
|
error::{Error, UploadError},
|
2023-09-05 02:51:27 +00:00
|
|
|
future::LocalBoxFuture,
|
|
|
|
queue::Cleanup,
|
2023-08-16 21:09:40 +00:00
|
|
|
repo::{Alias, ArcRepo, DeleteToken, Hash},
|
2022-04-02 21:44:03 +00:00
|
|
|
serde_str::Serde,
|
2023-09-02 23:30:45 +00:00
|
|
|
store::Store,
|
2022-04-01 16:51:46 +00:00
|
|
|
};
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
pub(super) fn perform<'a, S>(
|
|
|
|
repo: &'a ArcRepo,
|
2022-04-01 21:51:12 +00:00
|
|
|
store: &'a S,
|
2023-07-23 00:41:50 +00:00
|
|
|
configuration: &'a Configuration,
|
2023-09-03 17:47:06 +00:00
|
|
|
job: serde_json::Value,
|
2022-04-01 21:51:12 +00:00
|
|
|
) -> LocalBoxFuture<'a, Result<(), Error>>
|
|
|
|
where
|
2023-12-12 22:54:41 +00:00
|
|
|
S: Store + 'static,
|
2022-04-01 21:51:12 +00:00
|
|
|
{
|
|
|
|
Box::pin(async move {
|
2023-09-03 17:47:06 +00:00
|
|
|
match serde_json::from_value(job) {
|
2022-04-01 21:51:12 +00:00
|
|
|
Ok(job) => match job {
|
2023-08-16 00:19:03 +00:00
|
|
|
Cleanup::Hash { hash: in_hash } => hash(repo, in_hash).await?,
|
2022-04-02 21:44:03 +00:00
|
|
|
Cleanup::Identifier {
|
2023-09-02 23:30:45 +00:00
|
|
|
identifier: in_identifier,
|
|
|
|
} => identifier(repo, store, Arc::from(in_identifier)).await?,
|
2022-04-02 21:44:03 +00:00
|
|
|
Cleanup::Alias {
|
|
|
|
alias: stored_alias,
|
|
|
|
token,
|
|
|
|
} => {
|
|
|
|
alias(
|
|
|
|
repo,
|
|
|
|
Serde::into_inner(stored_alias),
|
|
|
|
Serde::into_inner(token),
|
|
|
|
)
|
|
|
|
.await?
|
|
|
|
}
|
2023-08-16 00:19:03 +00:00
|
|
|
Cleanup::Variant { hash, variant } => hash_variant(repo, hash, variant).await?,
|
|
|
|
Cleanup::AllVariants => all_variants(repo).await?,
|
|
|
|
Cleanup::OutdatedVariants => outdated_variants(repo, configuration).await?,
|
|
|
|
Cleanup::OutdatedProxies => outdated_proxies(repo, configuration).await?,
|
2023-12-12 22:54:41 +00:00
|
|
|
Cleanup::Prune => prune(repo, store).await?,
|
2022-04-01 21:51:12 +00:00
|
|
|
},
|
|
|
|
Err(e) => {
|
2023-01-29 17:57:59 +00:00
|
|
|
tracing::warn!("Invalid job: {}", format!("{e}"));
|
2022-04-01 21:51:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-09-02 23:30:45 +00:00
|
|
|
async fn identifier<S>(repo: &ArcRepo, store: &S, identifier: Arc<str>) -> Result<(), Error>
|
2022-04-01 16:51:46 +00:00
|
|
|
where
|
|
|
|
S: Store,
|
|
|
|
{
|
|
|
|
let mut errors = Vec::new();
|
|
|
|
|
|
|
|
if let Err(e) = store.remove(&identifier).await {
|
2023-09-02 23:30:45 +00:00
|
|
|
errors.push(UploadError::from(e));
|
2022-04-01 16:51:46 +00:00
|
|
|
}
|
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
if let Err(e) = repo.cleanup_details(&identifier).await {
|
2023-09-02 23:30:45 +00:00
|
|
|
errors.push(UploadError::from(e));
|
2022-04-01 16:51:46 +00:00
|
|
|
}
|
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
for error in errors {
|
|
|
|
tracing::error!("{}", format!("{error:?}"));
|
2022-04-01 16:51:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
|
2023-09-01 23:41:04 +00:00
|
|
|
let aliases = repo.aliases_for_hash(hash.clone()).await?;
|
2022-04-01 16:51:46 +00:00
|
|
|
|
|
|
|
if !aliases.is_empty() {
|
2022-04-06 01:29:30 +00:00
|
|
|
for alias in aliases {
|
2023-07-07 18:17:26 +00:00
|
|
|
// TODO: decide if it is okay to skip aliases without tokens
|
|
|
|
if let Some(token) = repo.delete_token(&alias).await? {
|
|
|
|
super::cleanup_alias(repo, alias, token).await?;
|
2023-07-14 00:56:56 +00:00
|
|
|
} else {
|
|
|
|
tracing::warn!("Not cleaning alias!");
|
2023-07-07 18:17:26 +00:00
|
|
|
}
|
2022-04-06 01:29:30 +00:00
|
|
|
}
|
|
|
|
// Return after queueing cleanup alias, since we will be requeued when the last alias is cleaned
|
2022-04-01 16:51:46 +00:00
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut idents = repo
|
2023-08-16 00:19:03 +00:00
|
|
|
.variants(hash.clone())
|
2022-04-01 16:51:46 +00:00
|
|
|
.await?
|
|
|
|
.into_iter()
|
|
|
|
.map(|(_, v)| v)
|
|
|
|
.collect::<Vec<_>>();
|
2023-07-07 18:17:26 +00:00
|
|
|
idents.extend(repo.identifier(hash.clone()).await?);
|
2022-04-01 16:51:46 +00:00
|
|
|
idents.extend(repo.motion_identifier(hash.clone()).await?);
|
|
|
|
|
|
|
|
for identifier in idents {
|
2023-09-02 23:30:45 +00:00
|
|
|
let _ = super::cleanup_identifier(repo, &identifier).await;
|
2022-04-01 16:51:46 +00:00
|
|
|
}
|
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
repo.cleanup_hash(hash).await?;
|
2022-04-01 16:51:46 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-10-04 17:11:29 +00:00
|
|
|
pub(crate) async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), Error> {
|
2022-04-02 21:44:03 +00:00
|
|
|
let saved_delete_token = repo.delete_token(&alias).await?;
|
2023-07-07 18:17:26 +00:00
|
|
|
|
2023-10-04 17:11:29 +00:00
|
|
|
if !saved_delete_token.is_some_and(|t| t.ct_eq(&token)) {
|
2022-04-02 21:44:03 +00:00
|
|
|
return Err(UploadError::InvalidToken.into());
|
|
|
|
}
|
|
|
|
|
2023-07-14 00:58:31 +00:00
|
|
|
let hash = repo.hash(&alias).await?;
|
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
repo.cleanup_alias(&alias).await?;
|
2023-07-23 20:45:52 +00:00
|
|
|
repo.remove_relation(alias.clone()).await?;
|
2023-08-16 21:09:40 +00:00
|
|
|
repo.remove_alias_access(alias.clone()).await?;
|
2023-07-14 00:58:31 +00:00
|
|
|
|
|
|
|
let Some(hash) = hash else {
|
2023-07-05 21:46:44 +00:00
|
|
|
// hash doesn't exist, nothing to do
|
|
|
|
return Ok(());
|
|
|
|
};
|
|
|
|
|
2023-09-01 23:41:04 +00:00
|
|
|
if repo.aliases_for_hash(hash.clone()).await?.is_empty() {
|
2022-04-11 21:56:39 +00:00
|
|
|
super::cleanup_hash(repo, hash).await?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-07-23 00:41:50 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn all_variants(repo: &ArcRepo) -> Result<(), Error> {
|
2023-09-11 00:08:01 +00:00
|
|
|
let hash_stream = std::pin::pin!(repo.hashes());
|
|
|
|
let mut hash_stream = hash_stream.into_streamer();
|
2022-04-11 21:56:39 +00:00
|
|
|
|
|
|
|
while let Some(res) = hash_stream.next().await {
|
|
|
|
let hash = res?;
|
2023-07-23 00:41:50 +00:00
|
|
|
super::cleanup_variants(repo, hash, None).await?;
|
2022-04-11 21:56:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-07-23 00:41:50 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn outdated_variants(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> {
|
2023-07-23 00:41:50 +00:00
|
|
|
let now = time::OffsetDateTime::now_utc();
|
|
|
|
let since = now.saturating_sub(config.media.retention.variants.to_duration());
|
|
|
|
|
2023-08-23 16:59:42 +00:00
|
|
|
let mut variant_stream = repo.older_variants(since).await?.into_streamer();
|
2023-07-23 00:41:50 +00:00
|
|
|
|
|
|
|
while let Some(res) = variant_stream.next().await {
|
|
|
|
let (hash, variant) = res?;
|
|
|
|
super::cleanup_variants(repo, hash, Some(variant)).await?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn outdated_proxies(repo: &ArcRepo, config: &Configuration) -> Result<(), Error> {
|
2023-07-23 20:45:52 +00:00
|
|
|
let now = time::OffsetDateTime::now_utc();
|
|
|
|
let since = now.saturating_sub(config.media.retention.proxy.to_duration());
|
|
|
|
|
2023-08-23 16:59:42 +00:00
|
|
|
let mut alias_stream = repo.older_aliases(since).await?.into_streamer();
|
2023-07-23 20:45:52 +00:00
|
|
|
|
|
|
|
while let Some(res) = alias_stream.next().await {
|
|
|
|
let alias = res?;
|
|
|
|
if let Some(token) = repo.delete_token(&alias).await? {
|
|
|
|
super::cleanup_alias(repo, alias, token).await?;
|
|
|
|
} else {
|
|
|
|
tracing::warn!("Skipping alias cleanup - no delete token");
|
|
|
|
repo.remove_relation(alias.clone()).await?;
|
2023-08-16 21:09:40 +00:00
|
|
|
repo.remove_alias_access(alias).await?;
|
2023-07-23 20:45:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn hash_variant(
|
|
|
|
repo: &ArcRepo,
|
2023-08-14 19:25:19 +00:00
|
|
|
hash: Hash,
|
2023-07-23 00:41:50 +00:00
|
|
|
target_variant: Option<String>,
|
2023-08-16 00:19:03 +00:00
|
|
|
) -> Result<(), Error> {
|
2023-07-23 00:41:50 +00:00
|
|
|
if let Some(target_variant) = target_variant {
|
|
|
|
if let Some(identifier) = repo
|
2023-08-16 00:19:03 +00:00
|
|
|
.variant_identifier(hash.clone(), target_variant.clone())
|
2023-07-23 00:41:50 +00:00
|
|
|
.await?
|
|
|
|
{
|
2023-09-02 23:30:45 +00:00
|
|
|
super::cleanup_identifier(repo, &identifier).await?;
|
2023-07-23 00:41:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
repo.remove_variant(hash.clone(), target_variant.clone())
|
|
|
|
.await?;
|
2023-08-16 21:09:40 +00:00
|
|
|
repo.remove_variant_access(hash, target_variant).await?;
|
2023-07-23 00:41:50 +00:00
|
|
|
} else {
|
2023-08-16 00:19:03 +00:00
|
|
|
for (variant, identifier) in repo.variants(hash.clone()).await? {
|
2023-07-23 00:41:50 +00:00
|
|
|
repo.remove_variant(hash.clone(), variant.clone()).await?;
|
2023-08-16 21:09:40 +00:00
|
|
|
repo.remove_variant_access(hash.clone(), variant).await?;
|
2023-09-02 23:30:45 +00:00
|
|
|
super::cleanup_identifier(repo, &identifier).await?;
|
2023-07-23 00:41:50 +00:00
|
|
|
}
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2023-12-12 22:54:41 +00:00
|
|
|
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
|
|
async fn prune<S>(repo: &ArcRepo, store: &S) -> Result<(), Error>
|
|
|
|
where
|
|
|
|
S: Store + 'static,
|
|
|
|
{
|
|
|
|
repo.set("prune-missing-started", b"1".to_vec().into())
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
let hash_stream = std::pin::pin!(repo.hashes());
|
|
|
|
let mut hash_stream = hash_stream.into_streamer();
|
|
|
|
|
|
|
|
let mut count: u64 = 0;
|
|
|
|
|
|
|
|
while let Some(hash) = hash_stream.try_next().await? {
|
|
|
|
let repo = repo.clone();
|
|
|
|
let store = store.clone();
|
|
|
|
|
|
|
|
let current_span = Span::current();
|
|
|
|
|
|
|
|
let span = tracing::info_span!(parent: current_span, "error-boundary");
|
|
|
|
|
|
|
|
let res = crate::sync::spawn(
|
|
|
|
"prune-missing",
|
|
|
|
async move {
|
|
|
|
let mut count = count;
|
|
|
|
|
|
|
|
if let Some(ident) = repo.identifier(hash.clone()).await? {
|
|
|
|
match store.len(&ident).await {
|
|
|
|
Err(e) if e.is_not_found() => {
|
|
|
|
super::cleanup_hash(&repo, hash).await?;
|
|
|
|
|
|
|
|
count += 1;
|
|
|
|
|
|
|
|
repo.set(
|
|
|
|
"prune-missing-queued",
|
|
|
|
Vec::from(count.to_be_bytes()).into(),
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(count) as Result<u64, Error>
|
|
|
|
}
|
|
|
|
.instrument(span),
|
|
|
|
)
|
|
|
|
.await;
|
|
|
|
|
|
|
|
match res {
|
|
|
|
Ok(Ok(updated)) => count = updated,
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
tracing::warn!("Prune missing identifier failed - {e:?}");
|
|
|
|
}
|
|
|
|
Err(_) => {
|
|
|
|
tracing::warn!("Prune missing identifier panicked.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
count += 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
repo.set("prune-missing-complete", b"1".to_vec().into())
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|