From e59483c12c738d8504ad4637c6c5105a0e38dcc2 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 23 Jul 2023 10:23:37 -0500 Subject: [PATCH] Add ProxyRepo --- src/repo.rs | 29 ++++++++++++++++++++++++++++ src/repo/sled.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/src/repo.rs b/src/repo.rs index ec085a9..1291a9b 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -7,6 +7,7 @@ use base64::{prelude::BASE64_STANDARD, Engine}; use futures_util::Stream; use std::{fmt::Debug, path::PathBuf}; use tracing::Instrument; +use url::Url; use uuid::Uuid; mod old; @@ -75,6 +76,7 @@ pub(crate) trait FullRepo: + MigrationRepo + AliasAccessRepo + VariantAccessRepo + + ProxyRepo + Send + Sync + Clone @@ -145,6 +147,33 @@ where type Bytes = T::Bytes; } +#[async_trait::async_trait(?Send)] +pub(crate) trait ProxyRepo: BaseRepo { + async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError>; + + async fn related(&self, url: Url) -> Result, RepoError>; + + async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl ProxyRepo for actix_web::web::Data +where + T: ProxyRepo, +{ + async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { + T::relate_url(self, url, alias).await + } + + async fn related(&self, url: Url) -> Result, RepoError> { + T::related(self, url).await + } + + async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> { + T::remove_relation(self, alias).await + } +} + #[async_trait::async_trait(?Send)] pub(crate) trait AliasAccessRepo: BaseRepo { type AliasAccessStream: Stream>; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 0d0f28a..3d044e0 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -22,8 +22,9 @@ use std::{ time::Instant, }; use tokio::{sync::Notify, task::JoinHandle}; +use url::Url; -use super::{AliasAccessRepo, RepoError, VariantAccessRepo}; +use super::{AliasAccessRepo, ProxyRepo, RepoError, VariantAccessRepo}; macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ @@ -77,6 +78,8 @@ pub(crate) struct SledRepo { inverse_alias_access: Tree, variant_access: Tree, inverse_variant_access: Tree, + proxy: Tree, + inverse_proxy: Tree, in_progress_queue: Tree, queue_notifier: Arc>>>, uploads: Tree, @@ -113,6 +116,8 @@ impl SledRepo { inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?, variant_access: db.open_tree("pict-rs-variant-access-tree")?, inverse_variant_access: db.open_tree("pict-rs-inverse-variant-access-tree")?, + proxy: db.open_tree("pict-rs-proxy-tree")?, + inverse_proxy: db.open_tree("pict-rs-inverse-proxy-tree")?, in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), uploads: db.open_tree("pict-rs-uploads-tree")?, @@ -191,6 +196,48 @@ impl FullRepo for SledRepo { } } +#[async_trait::async_trait(?Send)] +impl ProxyRepo for SledRepo { + async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> { + let proxy = self.proxy.clone(); + let inverse_proxy = self.inverse_proxy.clone(); + + actix_web::web::block(move || { + proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?; + inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?; + + Ok(()) as Result<(), SledError> + }) + .await + .map_err(|_| RepoError::Canceled)??; + + Ok(()) + } + + async fn related(&self, url: Url) -> Result, RepoError> { + let opt = b!(self.proxy, proxy.get(url.as_str().as_bytes())); + + Ok(opt.and_then(|ivec| Alias::from_slice(&ivec))) + } + + async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> { + let proxy = self.proxy.clone(); + let inverse_proxy = self.inverse_proxy.clone(); + + actix_web::web::block(move || { + if let Some(url) = inverse_proxy.remove(alias.to_bytes())? { + proxy.remove(url)?; + } + + Ok(()) as Result<(), SledError> + }) + .await + .map_err(|_| RepoError::Canceled)??; + + Ok(()) + } +} + type IterValue = Option<(sled::Iter, Result)>; pub(crate) struct IterStream {