diff --git a/dev.toml b/dev.toml index ae32642..6cdd71b 100644 --- a/dev.toml +++ b/dev.toml @@ -62,6 +62,10 @@ crf_max = 12 type = 'postgres' url = 'postgres://postgres:1234@localhost:5432/postgres' +# [repo] +# type = 'sled' +# path = 'data/sled-repo-local' + [store] type = 'filesystem' path = 'data/files-local' diff --git a/src/queue/process.rs b/src/queue/process.rs index e257f10..5454dc8 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -68,7 +68,7 @@ where }) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip(repo, store, media))] async fn process_ingest( repo: &ArcRepo, store: &S, @@ -126,7 +126,7 @@ where } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip(repo, store, process_map, process_path, process_args, config))] async fn generate( repo: &ArcRepo, store: &S, diff --git a/src/repo.rs b/src/repo.rs index a177745..fdf5ce1 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -524,7 +524,7 @@ where } } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub(crate) struct OrderedHash { timestamp: time::OffsetDateTime, hash: Hash, diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index d03a7c5..ab08fef 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -231,6 +231,7 @@ impl BaseRepo for PostgresRepo {} #[async_trait::async_trait(?Send)] impl HashRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn size(&self) -> Result { use schema::hashes::dsl::*; @@ -245,6 +246,7 @@ impl HashRepo for PostgresRepo { Ok(count.try_into().expect("non-negative count")) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn bound(&self, input_hash: Hash) -> Result, RepoError> { use schema::hashes::dsl::*; @@ -265,6 +267,7 @@ impl HashRepo for PostgresRepo { })) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn hash_page_by_date( &self, date: time::OffsetDateTime, @@ -292,6 +295,7 @@ impl HashRepo for PostgresRepo { self.hashes_ordered(ordered_hash, limit).await } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn hashes_ordered( &self, bound: Option, @@ -355,6 +359,7 @@ impl HashRepo for PostgresRepo { }) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn create_hash_with_timestamp( &self, input_hash: Hash, @@ -386,6 +391,7 @@ impl HashRepo for PostgresRepo { } } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn update_identifier( &self, input_hash: Hash, @@ -405,6 +411,7 @@ impl HashRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn identifier(&self, input_hash: Hash) -> Result>, RepoError> { use schema::hashes::dsl::*; @@ -421,6 +428,7 @@ impl HashRepo for PostgresRepo { Ok(opt.map(Arc::from)) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn relate_variant_identifier( &self, input_hash: Hash, @@ -450,6 +458,7 @@ impl HashRepo for PostgresRepo { } } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn variant_identifier( &self, input_hash: Hash, @@ -472,6 +481,7 @@ impl HashRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn variants(&self, input_hash: Hash) -> Result)>, RepoError> { use schema::variants::dsl::*; @@ -490,6 +500,7 @@ impl HashRepo for PostgresRepo { Ok(vec) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn remove_variant( &self, input_hash: Hash, @@ -509,6 +520,7 @@ impl HashRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn relate_motion_identifier( &self, input_hash: Hash, @@ -528,6 +540,7 @@ impl HashRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn motion_identifier(&self, input_hash: Hash) -> Result>, RepoError> { use schema::hashes::dsl::*; @@ -546,18 +559,19 @@ impl HashRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> { let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; conn.transaction(|conn| { Box::pin(async move { - diesel::delete(schema::hashes::dsl::hashes) - .filter(schema::hashes::dsl::hash.eq(&input_hash)) + diesel::delete(schema::variants::dsl::variants) + .filter(schema::variants::dsl::hash.eq(&input_hash)) .execute(conn) .await?; - diesel::delete(schema::variants::dsl::variants) - .filter(schema::variants::dsl::hash.eq(&input_hash)) + diesel::delete(schema::hashes::dsl::hashes) + .filter(schema::hashes::dsl::hash.eq(&input_hash)) .execute(conn) .await }) @@ -571,6 +585,7 @@ impl HashRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl AliasRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn create_alias( &self, input_alias: &Alias, @@ -600,6 +615,7 @@ impl AliasRepo for PostgresRepo { } } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn delete_token(&self, input_alias: &Alias) -> Result, RepoError> { use schema::aliases::dsl::*; @@ -616,6 +632,7 @@ impl AliasRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn hash(&self, input_alias: &Alias) -> Result, RepoError> { use schema::aliases::dsl::*; @@ -632,6 +649,7 @@ impl AliasRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn aliases_for_hash(&self, input_hash: Hash) -> Result, RepoError> { use schema::aliases::dsl::*; @@ -647,6 +665,7 @@ impl AliasRepo for PostgresRepo { Ok(vec) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> { use schema::aliases::dsl::*; @@ -664,6 +683,7 @@ impl AliasRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl SettingsRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self, input_value))] async fn set(&self, input_key: &'static str, input_value: Arc<[u8]>) -> Result<(), RepoError> { use schema::settings::dsl::*; @@ -672,7 +692,10 @@ impl SettingsRepo for PostgresRepo { let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::insert_into(settings) - .values((key.eq(input_key), value.eq(input_value))) + .values((key.eq(input_key), value.eq(&input_value))) + .on_conflict(key) + .do_update() + .set(value.eq(&input_value)) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; @@ -680,6 +703,7 @@ impl SettingsRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn get(&self, input_key: &'static str) -> Result>, RepoError> { use schema::settings::dsl::*; @@ -700,6 +724,7 @@ impl SettingsRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> { use schema::settings::dsl::*; @@ -717,6 +742,7 @@ impl SettingsRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl DetailsRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self, input_details))] async fn relate_details( &self, input_identifier: &Arc, @@ -738,6 +764,7 @@ impl DetailsRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn details(&self, input_identifier: &Arc) -> Result, RepoError> { use schema::details::dsl::*; @@ -758,6 +785,7 @@ impl DetailsRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn cleanup_details(&self, input_identifier: &Arc) -> Result<(), RepoError> { use schema::details::dsl::*; @@ -775,6 +803,7 @@ impl DetailsRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl QueueRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self, job_json))] async fn push( &self, queue_name: &'static str, @@ -794,6 +823,7 @@ impl QueueRepo for PostgresRepo { Ok(JobId(job_id)) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn pop( &self, queue_name: &'static str, @@ -869,6 +899,7 @@ impl QueueRepo for PostgresRepo { } } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn heartbeat( &self, queue_name: &'static str, @@ -895,6 +926,7 @@ impl QueueRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn complete_job( &self, queue_name: &'static str, @@ -921,6 +953,7 @@ impl QueueRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl StoreMigrationRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn is_continuing_migration(&self) -> Result { use schema::store_migrations::dsl::*; @@ -935,6 +968,7 @@ impl StoreMigrationRepo for PostgresRepo { Ok(count > 0) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn mark_migrated( &self, input_old_identifier: &Arc, @@ -958,6 +992,7 @@ impl StoreMigrationRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn is_migrated(&self, input_old_identifier: &Arc) -> Result { use schema::store_migrations::dsl::*; @@ -973,6 +1008,7 @@ impl StoreMigrationRepo for PostgresRepo { Ok(b) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn clear(&self) -> Result<(), RepoError> { use schema::store_migrations::dsl::*; @@ -989,6 +1025,7 @@ impl StoreMigrationRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl ProxyRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> { use schema::proxies::dsl::*; @@ -1003,6 +1040,7 @@ impl ProxyRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn related(&self, input_url: Url) -> Result, RepoError> { use schema::proxies::dsl::*; @@ -1019,6 +1057,7 @@ impl ProxyRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> { use schema::proxies::dsl::*; @@ -1036,6 +1075,7 @@ impl ProxyRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl AliasAccessRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn set_accessed_alias( &self, input_alias: Alias, @@ -1057,6 +1097,7 @@ impl AliasAccessRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn alias_accessed_at( &self, input_alias: Alias, @@ -1077,6 +1118,7 @@ impl AliasAccessRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn older_aliases( &self, timestamp: time::OffsetDateTime, @@ -1115,6 +1157,7 @@ impl AliasAccessRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl VariantAccessRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn set_accessed_variant( &self, input_hash: Hash, @@ -1137,6 +1180,7 @@ impl VariantAccessRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn variant_accessed_at( &self, input_hash: Hash, @@ -1158,6 +1202,7 @@ impl VariantAccessRepo for PostgresRepo { Ok(opt) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn older_variants( &self, timestamp: time::OffsetDateTime, @@ -1232,6 +1277,7 @@ impl From for UploadResult { #[async_trait::async_trait(?Send)] impl UploadRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn create_upload(&self) -> Result { use schema::uploads::dsl::*; @@ -1247,6 +1293,7 @@ impl UploadRepo for PostgresRepo { Ok(UploadId { id: uuid }) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn wait(&self, upload_id: UploadId) -> Result { use schema::uploads::dsl::*; @@ -1293,6 +1340,7 @@ impl UploadRepo for PostgresRepo { } } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> { use schema::uploads::dsl::*; @@ -1307,6 +1355,7 @@ impl UploadRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "DEBUG", skip(self))] async fn complete_upload( &self, upload_id: UploadId, @@ -1333,6 +1382,7 @@ impl UploadRepo for PostgresRepo { #[async_trait::async_trait(?Send)] impl FullRepo for PostgresRepo { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn health_check(&self) -> Result<(), RepoError> { let next = self.inner.health_count.fetch_add(1, Ordering::Relaxed); diff --git a/src/repo/postgres/migrations/V0000__enable_pgcrypto.rs b/src/repo/postgres/migrations/V0000__enable_pgcrypto.rs index 44eb85c..4c8bd57 100644 --- a/src/repo/postgres/migrations/V0000__enable_pgcrypto.rs +++ b/src/repo/postgres/migrations/V0000__enable_pgcrypto.rs @@ -5,7 +5,7 @@ use barrel::{types, Migration}; pub(crate) fn migration() -> String { let mut m = Migration::new(); - m.inject_custom("CREATE EXTENSION pgcrypto;"); + m.inject_custom("CREATE EXTENSION IF NOT EXISTS pgcrypto;"); m.make::().to_string() } diff --git a/src/repo/postgres/migrations/V0006__create_queue.rs b/src/repo/postgres/migrations/V0006__create_queue.rs index ace111f..f9a96e0 100644 --- a/src/repo/postgres/migrations/V0006__create_queue.rs +++ b/src/repo/postgres/migrations/V0006__create_queue.rs @@ -12,7 +12,10 @@ pub(crate) fn migration() -> String { t.add_column("queue", types::text().size(50).nullable(false)); t.add_column("job", types::custom("jsonb").nullable(false)); t.add_column("worker", types::uuid().nullable(true)); - t.add_column("status", types::custom("job_status").nullable(false)); + t.add_column( + "status", + types::custom("job_status").nullable(false).default("new"), + ); t.add_column( "queue_time", types::datetime() diff --git a/src/store/file_store.rs b/src/store/file_store.rs index dafa8f7..e208261 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -27,6 +27,9 @@ pub(crate) enum FileError { #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), + #[error("Couldn't strip root dir")] + PrefixError, + #[error("Couldn't convert Path to String")] StringError, @@ -40,7 +43,7 @@ impl FileError { Self::Io(_) => ErrorCode::FILE_IO_ERROR, Self::PathGenerator(_) => ErrorCode::PARSE_PATH_ERROR, Self::FileExists => ErrorCode::FILE_EXISTS, - Self::StringError => ErrorCode::FORMAT_FILE_ID_ERROR, + Self::StringError | Self::PrefixError => ErrorCode::FORMAT_FILE_ID_ERROR, } } } @@ -54,6 +57,7 @@ pub(crate) struct FileStore { #[async_trait::async_trait(?Send)] impl Store for FileStore { + #[tracing::instrument(level = "DEBUG", skip(self))] async fn health_check(&self) -> Result<(), StoreError> { tokio::fs::metadata(&self.root_dir) .await @@ -62,7 +66,7 @@ impl Store for FileStore { Ok(()) } - #[tracing::instrument(skip(reader))] + #[tracing::instrument(skip(self, reader))] async fn save_async_read( &self, mut reader: Reader, @@ -93,7 +97,7 @@ impl Store for FileStore { .await } - #[tracing::instrument(skip(bytes))] + #[tracing::instrument(skip(self, bytes))] async fn save_bytes( &self, bytes: Bytes, @@ -113,7 +117,7 @@ impl Store for FileStore { None } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn to_stream( &self, identifier: &Arc, @@ -137,7 +141,7 @@ impl Store for FileStore { Ok(Box::pin(stream)) } - #[tracing::instrument(skip(writer))] + #[tracing::instrument(skip(self, writer))] async fn read_into( &self, identifier: &Arc, @@ -153,7 +157,7 @@ impl Store for FileStore { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn len(&self, identifier: &Arc) -> Result { let path = self.path_from_file_id(identifier); @@ -165,7 +169,7 @@ impl Store for FileStore { Ok(len) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { let path = self.path_from_file_id(identifier); @@ -190,7 +194,11 @@ impl FileStore { } fn file_id_from_path(&self, path: PathBuf) -> Result, FileError> { - path.to_str().ok_or(FileError::StringError).map(Into::into) + path.strip_prefix(&self.root_dir) + .map_err(|_| FileError::PrefixError)? + .to_str() + .ok_or(FileError::StringError) + .map(Into::into) } fn path_from_file_id(&self, file_id: &Arc) -> PathBuf { @@ -219,6 +227,7 @@ impl FileStore { Ok(target_path.join(filename)) } + #[tracing::instrument(level = "DEBUG", skip(self, path), fields(path = ?path.as_ref()))] async fn safe_remove_file>(&self, path: P) -> Result<(), FileError> { tokio::fs::remove_file(&path).await?; self.try_remove_parents(path.as_ref()).await;