diff --git a/src/queue.rs b/src/queue.rs index 3f84fae..a7d3f25 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -64,7 +64,7 @@ pub(crate) async fn cleanup_alias( alias: Alias, token: DeleteToken, ) -> Result<(), Error> { - let job = serde_json::to_value(&Cleanup::Alias { + let job = serde_json::to_value(Cleanup::Alias { alias: Serde::new(alias), token: Serde::new(token), }) @@ -74,7 +74,7 @@ pub(crate) async fn cleanup_alias( } pub(crate) async fn cleanup_hash(repo: &Arc, hash: Hash) -> Result<(), Error> { - let job = serde_json::to_value(&Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; + let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } @@ -83,7 +83,7 @@ pub(crate) async fn cleanup_identifier( repo: &Arc, identifier: &Arc, ) -> Result<(), Error> { - let job = serde_json::to_value(&Cleanup::Identifier { + let job = serde_json::to_value(Cleanup::Identifier { identifier: identifier.to_string(), }) .map_err(UploadError::PushJob)?; @@ -97,25 +97,25 @@ async fn cleanup_variants( variant: Option, ) -> Result<(), Error> { let job = - serde_json::to_value(&Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?; + serde_json::to_value(Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } pub(crate) async fn cleanup_outdated_proxies(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_value(&Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; + let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } pub(crate) async fn cleanup_outdated_variants(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_value(&Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; + let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } pub(crate) async fn cleanup_all_variants(repo: &Arc) -> Result<(), Error> { - let job = serde_json::to_value(&Cleanup::AllVariants).map_err(UploadError::PushJob)?; + let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?; repo.push(CLEANUP_QUEUE, job).await?; Ok(()) } @@ -126,7 +126,7 @@ pub(crate) async fn queue_ingest( upload_id: UploadId, declared_alias: Option, ) -> Result<(), Error> { - let job = serde_json::to_value(&Process::Ingest { + let job = serde_json::to_value(Process::Ingest { identifier: identifier.to_string(), declared_alias: declared_alias.map(Serde::new), upload_id: Serde::new(upload_id), @@ -143,7 +143,7 @@ pub(crate) async fn queue_generate( process_path: PathBuf, process_args: Vec, ) -> Result<(), Error> { - let job = serde_json::to_value(&Process::Generate { + let job = serde_json::to_value(Process::Generate { target_format, source: Serde::new(source), process_path, diff --git a/src/repo.rs b/src/repo.rs index e49f84a..6d3b2e3 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -562,10 +562,13 @@ impl HashPage { } } +type LocalBoxFuture<'a, T> = std::pin::Pin + 'a>>; + +type PageFuture = LocalBoxFuture<'static, Result>; + pub(crate) struct HashStream { repo: Option, - page_future: - Option>>>>, + page_future: Option, page: Option, } diff --git a/src/repo/hash.rs b/src/repo/hash.rs index b3c8036..23d9f8b 100644 --- a/src/repo/hash.rs +++ b/src/repo/hash.rs @@ -33,7 +33,7 @@ where fn from_sql(bytes: ::RawValue<'_>) -> diesel::deserialize::Result { let s = String::from_sql(bytes)?; - Self::from_base64(s).ok_or_else(|| format!("Invalid base64 hash").into()) + Self::from_base64(s).ok_or_else(|| "Invalid base64 hash".to_string().into()) } } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index eeea24d..61835d9 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -2,9 +2,15 @@ mod embedded; mod job_status; mod schema; -use std::sync::Arc; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; -use dashmap::{DashMap, DashSet}; +use dashmap::DashMap; use diesel::prelude::*; use diesel_async::{ pooled_connection::{ @@ -18,26 +24,33 @@ use tokio_postgres::{AsyncMessage, Notification}; use url::Url; use uuid::Uuid; -use crate::{details::Details, error_code::ErrorCode}; +use crate::{ + details::Details, + error_code::{ErrorCode, OwnedErrorCode}, + serde_str::Serde, + stream::LocalBoxStream, +}; use self::job_status::JobStatus; use super::{ - Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, Hash, - HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, QueueRepo, RepoError, SettingsRepo, - StoreMigrationRepo, UploadId, VariantAlreadyExists, + Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, + FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo, + QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult, + VariantAccessRepo, VariantAlreadyExists, }; #[derive(Clone)] pub(crate) struct PostgresRepo { inner: Arc, + #[allow(dead_code)] notifications: Arc>, } struct Inner { + health_count: AtomicU64, pool: Pool, queue_notifications: DashMap>, - completed_uploads: DashSet, upload_notifier: Notify, } @@ -54,6 +67,9 @@ async fn delegate_notifications(receiver: flume::Receiver, inner: .or_insert_with(|| Arc::new(Notify::new())) .notify_waiters(); } + "upload_completion_channel" => { + inner.upload_notifier.notify_waiters(); + } channel => { tracing::info!( "Unhandled postgres notification: {channel}: {}", @@ -61,7 +77,6 @@ async fn delegate_notifications(receiver: flume::Receiver, inner: ); } } - todo!() } tracing::warn!("Notification delegator shutting down"); @@ -95,6 +110,12 @@ pub(crate) enum PostgresError { #[error("Error deserializing details")] DeserializeDetails(#[source] serde_json::Error), + + #[error("Error serializing upload result")] + SerializeUploadResult(#[source] serde_json::Error), + + #[error("Error deserializing upload result")] + DeserializeUploadResult(#[source] serde_json::Error), } impl PostgresError { @@ -134,9 +155,9 @@ impl PostgresRepo { .map_err(ConnectPostgresError::BuildPool)?; let inner = Arc::new(Inner { + health_count: AtomicU64::new(0), pool, queue_notifications: DashMap::new(), - completed_uploads: DashSet::new(), upload_notifier: Notify::new(), }); @@ -829,8 +850,7 @@ impl QueueRepo for PostgresRepo { return Ok((JobId(job_id), job_json)); } - let _ = actix_rt::time::timeout(std::time::Duration::from_secs(5), notifier.notified()) - .await; + let _ = actix_rt::time::timeout(Duration::from_secs(5), notifier.notified()).await; diesel::sql_query("UNLISTEN queue_status_channel;") .execute(&mut conn) @@ -959,6 +979,422 @@ impl StoreMigrationRepo for PostgresRepo { } } +#[async_trait::async_trait(?Send)] +impl ProxyRepo for PostgresRepo { + async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> { + use schema::proxies::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::insert_into(proxies) + .values((url.eq(input_url.as_str()), alias.eq(&input_alias))) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + async fn related(&self, input_url: Url) -> Result, RepoError> { + use schema::proxies::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = proxies + .select(alias) + .filter(url.eq(input_url.as_str())) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)?; + + Ok(opt) + } + + async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> { + use schema::proxies::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::delete(proxies) + .filter(alias.eq(&input_alias)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl AliasAccessRepo for PostgresRepo { + async fn set_accessed_alias( + &self, + input_alias: Alias, + timestamp: time::OffsetDateTime, + ) -> Result<(), RepoError> { + use schema::proxies::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let timestamp = to_primitive(timestamp); + + diesel::update(proxies) + .filter(alias.eq(&input_alias)) + .set(accessed.eq(timestamp)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + async fn alias_accessed_at( + &self, + input_alias: Alias, + ) -> Result, RepoError> { + use schema::proxies::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = proxies + .select(accessed) + .filter(alias.eq(&input_alias)) + .get_result::(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + .map(time::PrimitiveDateTime::assume_utc); + + Ok(opt) + } + + async fn older_aliases( + &self, + timestamp: time::OffsetDateTime, + ) -> Result>, RepoError> { + Ok(Box::pin(PageStream { + inner: self.inner.clone(), + future: None, + current: Vec::new(), + older_than: to_primitive(timestamp), + next: Box::new(|inner, older_than| { + Box::pin(async move { + use schema::proxies::dsl::*; + + let mut conn = inner.pool.get().await.map_err(PostgresError::Pool)?; + + let vec = proxies + .select((accessed, alias)) + .filter(accessed.lt(older_than)) + .order(accessed.desc()) + .limit(100) + .get_results(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(vec) + }) + }), + })) + } + + async fn remove_alias_access(&self, _: Alias) -> Result<(), RepoError> { + // Noop - handled by ProxyRepo::remove_relation + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl VariantAccessRepo for PostgresRepo { + async fn set_accessed_variant( + &self, + input_hash: Hash, + input_variant: String, + input_accessed: time::OffsetDateTime, + ) -> Result<(), RepoError> { + use schema::variants::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let timestamp = to_primitive(input_accessed); + + diesel::update(variants) + .filter(hash.eq(&input_hash).and(variant.eq(&input_variant))) + .set(accessed.eq(timestamp)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + async fn variant_accessed_at( + &self, + input_hash: Hash, + input_variant: String, + ) -> Result, RepoError> { + use schema::variants::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = variants + .select(accessed) + .filter(hash.eq(&input_hash).and(variant.eq(&input_variant))) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + .map(time::PrimitiveDateTime::assume_utc); + + Ok(opt) + } + + async fn older_variants( + &self, + timestamp: time::OffsetDateTime, + ) -> Result>, RepoError> { + Ok(Box::pin(PageStream { + inner: self.inner.clone(), + future: None, + current: Vec::new(), + older_than: to_primitive(timestamp), + next: Box::new(|inner, older_than| { + Box::pin(async move { + use schema::variants::dsl::*; + + let mut conn = inner.pool.get().await.map_err(PostgresError::Pool)?; + + let vec = variants + .select((accessed, (hash, variant))) + .filter(accessed.lt(older_than)) + .order(accessed.desc()) + .limit(100) + .get_results(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(vec) + }) + }), + })) + } + + async fn remove_variant_access(&self, _: Hash, _: String) -> Result<(), RepoError> { + // Noop - handled by HashRepo::remove_variant + Ok(()) + } +} + +#[derive(serde::Deserialize, serde::Serialize)] +enum InnerUploadResult { + Success { + alias: Serde, + token: Serde, + }, + Failure { + message: String, + code: OwnedErrorCode, + }, +} + +impl From for InnerUploadResult { + fn from(u: UploadResult) -> Self { + match u { + UploadResult::Success { alias, token } => InnerUploadResult::Success { + alias: Serde::new(alias), + token: Serde::new(token), + }, + UploadResult::Failure { message, code } => InnerUploadResult::Failure { message, code }, + } + } +} + +impl From for UploadResult { + fn from(i: InnerUploadResult) -> Self { + match i { + InnerUploadResult::Success { alias, token } => UploadResult::Success { + alias: Serde::into_inner(alias), + token: Serde::into_inner(token), + }, + InnerUploadResult::Failure { message, code } => UploadResult::Failure { message, code }, + } + } +} + +#[async_trait::async_trait(?Send)] +impl UploadRepo for PostgresRepo { + async fn create_upload(&self) -> Result { + use schema::uploads::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let uuid = diesel::insert_into(uploads) + .default_values() + .returning(id) + .get_result(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(UploadId { id: uuid }) + } + + async fn wait(&self, upload_id: UploadId) -> Result { + use schema::uploads::dsl::*; + + loop { + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::sql_query("LISTEN upload_completion_channel;") + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + let opt = uploads + .select(result) + .filter(id.eq(upload_id.id)) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + .flatten(); + + if let Some(upload_result) = opt { + diesel::sql_query("UNLISTEN upload_completion_channel;") + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + let upload_result: InnerUploadResult = serde_json::from_value(upload_result) + .map_err(PostgresError::DeserializeUploadResult)?; + + return Ok(upload_result.into()); + } + + let _ = actix_rt::time::timeout( + Duration::from_secs(2), + self.inner.upload_notifier.notified(), + ) + .await; + + diesel::sql_query("UNLISTEN upload_completion_channel;") + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + drop(conn); + } + } + + async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { + use schema::uploads::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::delete(uploads) + .filter(id.eq(upload_id.id)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + async fn complete_upload( + &self, + upload_id: UploadId, + upload_result: UploadResult, + ) -> Result<(), RepoError> { + use schema::uploads::dsl::*; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let upload_result: InnerUploadResult = upload_result.into(); + let upload_result = + serde_json::to_value(&upload_result).map_err(PostgresError::SerializeUploadResult)?; + + diesel::update(uploads) + .filter(id.eq(upload_id.id)) + .set(result.eq(upload_result)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl FullRepo for PostgresRepo { + async fn health_check(&self) -> Result<(), RepoError> { + let next = self.inner.health_count.fetch_add(1, Ordering::Relaxed); + + self.set("health-value", Arc::from(next.to_be_bytes())) + .await?; + + Ok(()) + } +} + +type LocalBoxFuture<'a, T> = std::pin::Pin + 'a>>; + +type NextFuture = LocalBoxFuture<'static, Result, RepoError>>; + +struct PageStream { + inner: Arc, + future: Option>, + current: Vec, + older_than: time::PrimitiveDateTime, + next: Box, time::PrimitiveDateTime) -> NextFuture>, +} + +impl futures_core::Stream for PageStream +where + I: Unpin, +{ + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + loop { + // Pop because we reversed the list + if let Some(alias) = this.current.pop() { + return std::task::Poll::Ready(Some(Ok(alias))); + } + + if let Some(future) = this.future.as_mut() { + let res = std::task::ready!(future.as_mut().poll(cx)); + + this.future.take(); + + match res { + Ok(page) if page.is_empty() => { + return std::task::Poll::Ready(None); + } + Ok(page) => { + let (mut timestamps, mut aliases): (Vec<_>, Vec<_>) = + page.into_iter().unzip(); + // reverse because we use .pop() to get next + aliases.reverse(); + + this.current = aliases; + this.older_than = timestamps.pop().expect("Verified nonempty"); + } + Err(e) => return std::task::Poll::Ready(Some(Err(e))), + } + } else { + let inner = this.inner.clone(); + let older_than = this.older_than; + + this.future = Some((this.next)(inner, older_than)); + } + } + } +} + impl std::fmt::Debug for PostgresRepo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PostgresRepo") diff --git a/src/repo/postgres/migrations/V0009__create_uploads.rs b/src/repo/postgres/migrations/V0009__create_uploads.rs index 9214ec4..4cfbafa 100644 --- a/src/repo/postgres/migrations/V0009__create_uploads.rs +++ b/src/repo/postgres/migrations/V0009__create_uploads.rs @@ -7,7 +7,13 @@ pub(crate) fn migration() -> String { m.create_table("uploads", |t| { t.inject_custom(r#""id" UUID PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL UNIQUE"#); - t.add_column("result", types::custom("jsonb")); + t.add_column("result", types::custom("jsonb").nullable(true)); + t.add_column( + "created_at", + types::datetime() + .nullable(false) + .default(AutogenFunction::CurrentTimestamp), + ); }); m.inject_custom( diff --git a/src/repo/postgres/schema.rs b/src/repo/postgres/schema.rs index da98a1c..80c5d4e 100644 --- a/src/repo/postgres/schema.rs +++ b/src/repo/postgres/schema.rs @@ -82,7 +82,7 @@ diesel::table! { diesel::table! { uploads (id) { id -> Uuid, - result -> Jsonb, + result -> Nullable, } }