mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 03:11:24 +00:00
Better handle concurrent proxies
This commit is contained in:
parent
c38375ee92
commit
b569607854
4 changed files with 73 additions and 21 deletions
28
src/lib.rs
28
src/lib.rs
|
@ -74,7 +74,10 @@ use self::{
|
|||
middleware::{Deadline, Internal, Log, Metrics, Payload},
|
||||
migrate_store::migrate_store,
|
||||
queue::queue_generate,
|
||||
repo::{sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, Repo, UploadId, UploadResult},
|
||||
repo::{
|
||||
sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, ProxyAlreadyExists, Repo, UploadId,
|
||||
UploadResult,
|
||||
},
|
||||
serde_str::Serde,
|
||||
state::State,
|
||||
store::{file_store::FileStore, object_store::ObjectStore, Store},
|
||||
|
@ -1286,11 +1289,28 @@ async fn proxy_alias_from_query<S: Store + 'static>(
|
|||
} else if !state.config.server.read_only {
|
||||
let stream = download_stream(proxy.as_str(), state).await?;
|
||||
|
||||
let (alias, _, _) = ingest_inline(stream, state, &Default::default()).await?;
|
||||
// some time has passed, see if we've proxied elsewhere
|
||||
if let Some(alias) = state.repo.related(proxy.clone()).await? {
|
||||
alias
|
||||
} else {
|
||||
let (alias, token, _) =
|
||||
ingest_inline(stream, state, &Default::default()).await?;
|
||||
|
||||
state.repo.relate_url(proxy, alias.clone()).await?;
|
||||
// last check, do we succeed or fail to relate the proxy alias
|
||||
if let Err(ProxyAlreadyExists) =
|
||||
state.repo.relate_url(proxy.clone(), alias.clone()).await?
|
||||
{
|
||||
queue::cleanup_alias(&state.repo, alias, token).await?;
|
||||
|
||||
alias
|
||||
state
|
||||
.repo
|
||||
.related(proxy)
|
||||
.await?
|
||||
.ok_or(UploadError::MissingAlias)?
|
||||
} else {
|
||||
alias
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(UploadError::ReadOnly.into());
|
||||
};
|
||||
|
|
14
src/repo.rs
14
src/repo.rs
|
@ -46,6 +46,8 @@ pub(crate) struct HashAlreadyExists;
|
|||
pub(crate) struct AliasAlreadyExists;
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct VariantAlreadyExists;
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ProxyAlreadyExists;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub(crate) struct UploadId {
|
||||
|
@ -151,7 +153,11 @@ impl<T> BaseRepo for Arc<T> where T: BaseRepo {}
|
|||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
pub(crate) trait ProxyRepo: BaseRepo {
|
||||
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError>;
|
||||
async fn relate_url(
|
||||
&self,
|
||||
url: Url,
|
||||
alias: Alias,
|
||||
) -> Result<Result<(), ProxyAlreadyExists>, RepoError>;
|
||||
|
||||
async fn related(&self, url: Url) -> Result<Option<Alias>, RepoError>;
|
||||
|
||||
|
@ -163,7 +169,11 @@ impl<T> ProxyRepo for Arc<T>
|
|||
where
|
||||
T: ProxyRepo,
|
||||
{
|
||||
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> {
|
||||
async fn relate_url(
|
||||
&self,
|
||||
url: Url,
|
||||
alias: Alias,
|
||||
) -> Result<Result<(), ProxyAlreadyExists>, RepoError> {
|
||||
T::relate_url(self, url, alias).await
|
||||
}
|
||||
|
||||
|
|
|
@ -47,8 +47,8 @@ use super::{
|
|||
notification_map::{NotificationEntry, NotificationMap},
|
||||
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
|
||||
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
|
||||
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
|
||||
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
|
||||
ProxyAlreadyExists, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo,
|
||||
UploadId, UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -1884,21 +1884,31 @@ impl StoreMigrationRepo for PostgresRepo {
|
|||
#[async_trait::async_trait(?Send)]
|
||||
impl ProxyRepo for PostgresRepo {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> {
|
||||
async fn relate_url(
|
||||
&self,
|
||||
input_url: Url,
|
||||
input_alias: Alias,
|
||||
) -> Result<Result<(), ProxyAlreadyExists>, RepoError> {
|
||||
use schema::proxies::dsl::*;
|
||||
|
||||
let mut conn = self.get_connection().await?;
|
||||
|
||||
diesel::insert_into(proxies)
|
||||
let res = diesel::insert_into(proxies)
|
||||
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
|
||||
.execute(&mut conn)
|
||||
.with_metrics(crate::init_metrics::POSTGRES_PROXY_RELATE_URL)
|
||||
.with_timeout(Duration::from_secs(5))
|
||||
.await
|
||||
.map_err(|_| PostgresError::DbTimeout)?
|
||||
.map_err(PostgresError::Diesel)?;
|
||||
.map_err(|_| PostgresError::DbTimeout)?;
|
||||
|
||||
Ok(())
|
||||
match res {
|
||||
Ok(_) => Ok(Ok(())),
|
||||
Err(diesel::result::Error::DatabaseError(
|
||||
diesel::result::DatabaseErrorKind::UniqueViolation,
|
||||
_,
|
||||
)) => Ok(Err(ProxyAlreadyExists)),
|
||||
Err(e) => Err(PostgresError::Diesel(e).into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
|
|
|
@ -26,8 +26,8 @@ use super::{
|
|||
notification_map::{NotificationEntry, NotificationMap},
|
||||
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details,
|
||||
DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
|
||||
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
|
||||
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
|
||||
ProxyAlreadyExists, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo,
|
||||
UploadId, UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
|
||||
};
|
||||
|
||||
macro_rules! b {
|
||||
|
@ -218,20 +218,32 @@ impl FullRepo for SledRepo {
|
|||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
impl ProxyRepo for SledRepo {
|
||||
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> {
|
||||
async fn relate_url(
|
||||
&self,
|
||||
url: Url,
|
||||
alias: Alias,
|
||||
) -> Result<Result<(), ProxyAlreadyExists>, RepoError> {
|
||||
let proxy = self.proxy.clone();
|
||||
let inverse_proxy = self.inverse_proxy.clone();
|
||||
|
||||
crate::sync::spawn_blocking("sled-io", move || {
|
||||
proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?;
|
||||
inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?;
|
||||
let res = crate::sync::spawn_blocking("sled-io", move || {
|
||||
match proxy.compare_and_swap(
|
||||
url.as_str().as_bytes(),
|
||||
Option::<sled::IVec>::None,
|
||||
Some(alias.to_bytes()),
|
||||
)? {
|
||||
Ok(_) => {
|
||||
inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?;
|
||||
|
||||
Ok(()) as Result<(), SledError>
|
||||
Ok(Ok(())) as Result<Result<(), ProxyAlreadyExists>, SledError>
|
||||
}
|
||||
Err(_) => Ok(Err(ProxyAlreadyExists)),
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|_| RepoError::Canceled)??;
|
||||
|
||||
Ok(())
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn related(&self, url: Url) -> Result<Option<Alias>, RepoError> {
|
||||
|
|
Loading…
Reference in a new issue