From fd74161c61a2371546fbbe5c8f27f002efad1369 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 1 Sep 2023 18:41:04 -0500 Subject: [PATCH] Rename some repo methods, generate UploadId in repo --- src/backgrounded.rs | 6 ++---- src/lib.rs | 2 +- src/queue/cleanup.rs | 4 ++-- src/queue/process.rs | 2 +- src/repo.rs | 28 ++++++++++++++++++---------- src/repo/migrate.rs | 4 ++-- src/repo/sled.rs | 15 +++++++++++---- src/repo_04.rs | 2 +- src/repo_04/sled.rs | 2 +- 9 files changed, 39 insertions(+), 26 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 87ffdc6..0210dac 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -42,7 +42,7 @@ where let mut this = Self { repo, identifier: None, - upload_id: Some(UploadId::generate()), + upload_id: None, }; this.do_proxy(store, stream).await?; @@ -54,9 +54,7 @@ where where P: Stream> + Unpin + 'static, { - self.repo - .create_upload(self.upload_id.expect("Upload id exists")) - .await?; + self.upload_id = Some(self.repo.create_upload().await?); let stream = stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); diff --git a/src/lib.rs b/src/lib.rs index 7748481..74a3f09 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -629,7 +629,7 @@ async fn page( for hash in &page.hashes { let hex = hash.to_hex(); let aliases = repo - .for_hash(hash.clone()) + .aliases_for_hash(hash.clone()) .await? .into_iter() .map(|a| a.to_string()) diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index eaf6e69..34b2424 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -75,7 +75,7 @@ where #[tracing::instrument(skip_all)] async fn hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> { - let aliases = repo.for_hash(hash.clone()).await?; + let aliases = repo.aliases_for_hash(hash.clone()).await?; if !aliases.is_empty() { for alias in aliases { @@ -127,7 +127,7 @@ async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), E return Ok(()); }; - if repo.for_hash(hash.clone()).await?.is_empty() { + if repo.aliases_for_hash(hash.clone()).await?.is_empty() { super::cleanup_hash(repo, hash).await?; } diff --git a/src/queue/process.rs b/src/queue/process.rs index 399d817..03e4641 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -121,7 +121,7 @@ where } }; - repo.complete(upload_id, result).await?; + repo.complete_upload(upload_id, result).await?; Ok(()) } diff --git a/src/repo.rs b/src/repo.rs index a025c04..2f1bcf1 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -102,7 +102,7 @@ pub(crate) trait FullRepo: return Ok(vec![]); }; - self.for_hash(hash).await + self.aliases_for_hash(hash).await } #[tracing::instrument(skip(self))] @@ -291,13 +291,17 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait UploadRepo: BaseRepo { - async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError>; + async fn create_upload(&self) -> Result; async fn wait(&self, upload_id: UploadId) -> Result; async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError>; - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError>; + async fn complete_upload( + &self, + upload_id: UploadId, + result: UploadResult, + ) -> Result<(), RepoError>; } #[async_trait::async_trait(?Send)] @@ -305,8 +309,8 @@ impl UploadRepo for Arc where T: UploadRepo, { - async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> { - T::create_upload(self, upload_id).await + async fn create_upload(&self) -> Result { + T::create_upload(self).await } async fn wait(&self, upload_id: UploadId) -> Result { @@ -317,8 +321,12 @@ where T::claim(self, upload_id).await } - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { - T::complete(self, upload_id, result).await + async fn complete_upload( + &self, + upload_id: UploadId, + result: UploadResult, + ) -> Result<(), RepoError> { + T::complete_upload(self, upload_id, result).await } } @@ -727,7 +735,7 @@ pub(crate) trait AliasRepo: BaseRepo { async fn hash(&self, alias: &Alias) -> Result, RepoError>; - async fn for_hash(&self, hash: Hash) -> Result, RepoError>; + async fn aliases_for_hash(&self, hash: Hash) -> Result, RepoError>; async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError>; } @@ -754,8 +762,8 @@ where T::hash(self, alias).await } - async fn for_hash(&self, hash: Hash) -> Result, RepoError> { - T::for_hash(self, hash).await + async fn aliases_for_hash(&self, hash: Hash) -> Result, RepoError> { + T::aliases_for_hash(self, hash).await } async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError> { diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 408cc31..d27dd24 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -220,7 +220,7 @@ async fn do_migrate_hash(old_repo: &ArcRepo, new_repo: &ArcRepo, hash: Hash) -> } } - for alias in old_repo.for_hash(hash.clone()).await? { + for alias in old_repo.aliases_for_hash(hash.clone()).await? { let delete_token = old_repo .delete_token(&alias) .await? @@ -275,7 +275,7 @@ async fn do_migrate_hash_04( let hash_details = set_details(old_repo, new_repo, store, config, &identifier).await?; - let aliases = old_repo.for_hash(old_hash.clone()).await?; + let aliases = old_repo.aliases_for_hash(old_hash.clone()).await?; let variants = old_repo.variants::(old_hash.clone()).await?; let motion_identifier = old_repo .motion_identifier::(old_hash.clone()) diff --git a/src/repo/sled.rs b/src/repo/sled.rs index ec4f63b..244e6a7 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -520,9 +520,12 @@ impl Drop for PopMetricsGuard { #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] - async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> { + async fn create_upload(&self) -> Result { + let upload_id = UploadId::generate(); + b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); - Ok(()) + + Ok(upload_id) } #[tracing::instrument(skip(self))] @@ -567,7 +570,11 @@ impl UploadRepo for SledRepo { } #[tracing::instrument(level = "trace", skip(self, result))] - async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> { + async fn complete_upload( + &self, + upload_id: UploadId, + result: UploadResult, + ) -> Result<(), RepoError> { let result: InnerUploadResult = result.into(); let result = serde_json::to_vec(&result).map_err(SledError::from)?; @@ -1496,7 +1503,7 @@ impl AliasRepo for SledRepo { } #[tracing::instrument(skip_all)] - async fn for_hash(&self, hash: Hash) -> Result, RepoError> { + async fn aliases_for_hash(&self, hash: Hash) -> Result, RepoError> { let hash = hash.to_ivec(); let v = b!(self.hash_aliases, { diff --git a/src/repo_04.rs b/src/repo_04.rs index 3e0f9e3..174882a 100644 --- a/src/repo_04.rs +++ b/src/repo_04.rs @@ -77,5 +77,5 @@ pub(crate) trait HashRepo: BaseRepo { pub(crate) trait AliasRepo: BaseRepo { async fn delete_token(&self, alias: &Alias) -> Result, RepoError>; - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError>; + async fn aliases_for_hash(&self, hash: Self::Bytes) -> Result, RepoError>; } diff --git a/src/repo_04/sled.rs b/src/repo_04/sled.rs index 299f4e5..36595b7 100644 --- a/src/repo_04/sled.rs +++ b/src/repo_04/sled.rs @@ -294,7 +294,7 @@ impl AliasRepo for SledRepo { } #[tracing::instrument(skip_all)] - async fn for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { + async fn aliases_for_hash(&self, hash: Self::Bytes) -> Result, RepoError> { let v = b!(self.hash_aliases, { Ok(hash_aliases .scan_prefix(hash)