Add ProxyRepo

This commit is contained in:
asonix 2023-07-23 10:23:37 -05:00
parent dbdeb94621
commit e59483c12c
2 changed files with 77 additions and 1 deletions

View File

@ -7,6 +7,7 @@ use base64::{prelude::BASE64_STANDARD, Engine};
use futures_util::Stream; use futures_util::Stream;
use std::{fmt::Debug, path::PathBuf}; use std::{fmt::Debug, path::PathBuf};
use tracing::Instrument; use tracing::Instrument;
use url::Url;
use uuid::Uuid; use uuid::Uuid;
mod old; mod old;
@ -75,6 +76,7 @@ pub(crate) trait FullRepo:
+ MigrationRepo + MigrationRepo
+ AliasAccessRepo + AliasAccessRepo
+ VariantAccessRepo + VariantAccessRepo
+ ProxyRepo
+ Send + Send
+ Sync + Sync
+ Clone + Clone
@ -145,6 +147,33 @@ where
type Bytes = T::Bytes; type Bytes = T::Bytes;
} }
#[async_trait::async_trait(?Send)]
pub(crate) trait ProxyRepo: BaseRepo {
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError>;
async fn related(&self, url: Url) -> Result<Option<Alias>, RepoError>;
async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
impl<T> ProxyRepo for actix_web::web::Data<T>
where
T: ProxyRepo,
{
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> {
T::relate_url(self, url, alias).await
}
async fn related(&self, url: Url) -> Result<Option<Alias>, RepoError> {
T::related(self, url).await
}
async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> {
T::remove_relation(self, alias).await
}
}
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait AliasAccessRepo: BaseRepo { pub(crate) trait AliasAccessRepo: BaseRepo {
type AliasAccessStream: Stream<Item = Result<Alias, RepoError>>; type AliasAccessStream: Stream<Item = Result<Alias, RepoError>>;

View File

@ -22,8 +22,9 @@ use std::{
time::Instant, time::Instant,
}; };
use tokio::{sync::Notify, task::JoinHandle}; use tokio::{sync::Notify, task::JoinHandle};
use url::Url;
use super::{AliasAccessRepo, RepoError, VariantAccessRepo}; use super::{AliasAccessRepo, ProxyRepo, RepoError, VariantAccessRepo};
macro_rules! b { macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{ ($self:ident.$ident:ident, $expr:expr) => {{
@ -77,6 +78,8 @@ pub(crate) struct SledRepo {
inverse_alias_access: Tree, inverse_alias_access: Tree,
variant_access: Tree, variant_access: Tree,
inverse_variant_access: Tree, inverse_variant_access: Tree,
proxy: Tree,
inverse_proxy: Tree,
in_progress_queue: Tree, in_progress_queue: Tree,
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>, queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
uploads: Tree, uploads: Tree,
@ -113,6 +116,8 @@ impl SledRepo {
inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?, inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?,
variant_access: db.open_tree("pict-rs-variant-access-tree")?, variant_access: db.open_tree("pict-rs-variant-access-tree")?,
inverse_variant_access: db.open_tree("pict-rs-inverse-variant-access-tree")?, inverse_variant_access: db.open_tree("pict-rs-inverse-variant-access-tree")?,
proxy: db.open_tree("pict-rs-proxy-tree")?,
inverse_proxy: db.open_tree("pict-rs-inverse-proxy-tree")?,
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
queue_notifier: Arc::new(RwLock::new(HashMap::new())), queue_notifier: Arc::new(RwLock::new(HashMap::new())),
uploads: db.open_tree("pict-rs-uploads-tree")?, uploads: db.open_tree("pict-rs-uploads-tree")?,
@ -191,6 +196,48 @@ impl FullRepo for SledRepo {
} }
} }
#[async_trait::async_trait(?Send)]
impl ProxyRepo for SledRepo {
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> {
let proxy = self.proxy.clone();
let inverse_proxy = self.inverse_proxy.clone();
actix_web::web::block(move || {
proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?;
inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?;
Ok(()) as Result<(), SledError>
})
.await
.map_err(|_| RepoError::Canceled)??;
Ok(())
}
async fn related(&self, url: Url) -> Result<Option<Alias>, RepoError> {
let opt = b!(self.proxy, proxy.get(url.as_str().as_bytes()));
Ok(opt.and_then(|ivec| Alias::from_slice(&ivec)))
}
async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> {
let proxy = self.proxy.clone();
let inverse_proxy = self.inverse_proxy.clone();
actix_web::web::block(move || {
if let Some(url) = inverse_proxy.remove(alias.to_bytes())? {
proxy.remove(url)?;
}
Ok(()) as Result<(), SledError>
})
.await
.map_err(|_| RepoError::Canceled)??;
Ok(())
}
}
type IterValue = Option<(sled::Iter, Result<IVec, RepoError>)>; type IterValue = Option<(sled::Iter, Result<IVec, RepoError>)>;
pub(crate) struct IterStream { pub(crate) struct IterStream {