diff --git a/src/ingest.rs b/src/ingest.rs new file mode 100644 index 0000000..bae7bb6 --- /dev/null +++ b/src/ingest.rs @@ -0,0 +1,218 @@ +use crate::{ + error::{Error, UploadError}, + magick::ValidInputType, + repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo}, + store::Store, + CONFIG, +}; +use actix_web::web::{Bytes, BytesMut}; +use futures_util::{Stream, StreamExt}; +use once_cell::sync::Lazy; +use sha2::{Digest, Sha256}; +use tokio::sync::Semaphore; +use tracing::debug; + +mod hasher; +use hasher::Hasher; + +static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(num_cpus::get())); + +pub(crate) struct Session +where + R: FullRepo + 'static, + S: Store, +{ + repo: R, + hash: Option>, + alias: Option, + identifier: Option, +} + +pub(crate) async fn ingest( + repo: &R, + store: &S, + stream: impl Stream>, + declared_alias: Option, + should_validate: bool, +) -> Result, Error> +where + R: FullRepo + 'static, + S: Store, +{ + let permit = PROCESS_SEMAPHORE.acquire().await; + + let mut bytes_mut = BytesMut::new(); + + futures_util::pin_mut!(stream); + + debug!("Reading stream to memory"); + while let Some(res) = stream.next().await { + let bytes = res?; + bytes_mut.extend_from_slice(&bytes); + } + + debug!("Validating bytes"); + let (input_type, validated_reader) = crate::validate::validate_image_bytes( + bytes_mut.freeze(), + CONFIG.media.format, + CONFIG.media.enable_silent_video, + should_validate, + ) + .await?; + + let mut hasher_reader = Hasher::new(validated_reader, Sha256::new()); + + let identifier = store.save_async_read(&mut hasher_reader).await?; + + drop(permit); + + let mut session = Session { + repo: repo.clone(), + hash: None, + alias: None, + identifier: Some(identifier.clone()), + }; + + let hash = hasher_reader.finalize_reset().await?; + + session.hash = Some(hash.clone()); + + debug!("Saving upload"); + + save_upload(repo, store, &hash, &identifier).await?; + + debug!("Adding alias"); + + if let Some(alias) = declared_alias { + session.add_existing_alias(&hash, alias).await? + } else { + session.create_alias(&hash, input_type).await?; + } + + Ok(session) +} + +async fn save_upload( + repo: &R, + store: &S, + hash: &[u8], + identifier: &S::Identifier, +) -> Result<(), Error> +where + S: Store, + R: FullRepo, +{ + if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() { + store.remove(identifier).await?; + return Ok(()); + } + + repo.relate_identifier(hash.to_vec().into(), identifier) + .await?; + + Ok(()) +} + +impl Session +where + R: FullRepo + 'static, + S: Store, +{ + pub(crate) fn disarm(&mut self) { + let _ = self.alias.take(); + let _ = self.identifier.take(); + } + + pub(crate) fn alias(&self) -> Option<&Alias> { + self.alias.as_ref() + } + + pub(crate) async fn delete_token(&self) -> Result { + let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; + + debug!("Generating delete token"); + let delete_token = DeleteToken::generate(); + + debug!("Saving delete token"); + let res = self.repo.relate_delete_token(&alias, &delete_token).await?; + + if res.is_err() { + let delete_token = self.repo.delete_token(&alias).await?; + debug!("Returning existing delete token, {:?}", delete_token); + return Ok(delete_token); + } + + debug!("Returning new delete token, {:?}", delete_token); + Ok(delete_token) + } + + async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> { + AliasRepo::create(&self.repo, &alias) + .await? + .map_err(|_| UploadError::DuplicateAlias)?; + + self.alias = Some(alias.clone()); + + self.repo.relate_hash(&alias, hash.to_vec().into()).await?; + self.repo.relate_alias(hash.to_vec().into(), &alias).await?; + + Ok(()) + } + + async fn create_alias(&mut self, hash: &[u8], input_type: ValidInputType) -> Result<(), Error> { + debug!("Alias gen loop"); + + loop { + let alias = Alias::generate(input_type.as_ext().to_string()); + + if AliasRepo::create(&self.repo, &alias).await?.is_ok() { + self.alias = Some(alias.clone()); + + self.repo.relate_hash(&alias, hash.to_vec().into()).await?; + self.repo.relate_alias(hash.to_vec().into(), &alias).await?; + + return Ok(()); + } + + debug!("Alias exists, regenerating"); + } + } +} + +impl Drop for Session +where + R: FullRepo + 'static, + S: Store, +{ + fn drop(&mut self) { + if let Some(hash) = self.hash.take() { + let repo = self.repo.clone(); + actix_rt::spawn(async move { + let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; + }); + } + + if let Some(alias) = self.alias.take() { + let repo = self.repo.clone(); + + actix_rt::spawn(async move { + if let Ok(token) = repo.delete_token(&alias).await { + let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } else { + let token = DeleteToken::generate(); + if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { + let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } + } + }); + } + + if let Some(identifier) = self.identifier.take() { + let repo = self.repo.clone(); + + actix_rt::spawn(async move { + let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + }); + } + } +} diff --git a/src/upload_manager/hasher.rs b/src/ingest/hasher.rs similarity index 79% rename from src/upload_manager/hasher.rs rename to src/ingest/hasher.rs index 0ae9a8b..e1a1551 100644 --- a/src/upload_manager/hasher.rs +++ b/src/ingest/hasher.rs @@ -16,10 +16,6 @@ pin_project_lite::pin_project! { } } -pub(super) struct Hash { - inner: Vec, -} - impl Hasher where D: Digest + FixedOutputReset + Send + 'static, @@ -31,27 +27,13 @@ where } } - pub(super) async fn finalize_reset(self) -> Result { + pub(super) async fn finalize_reset(self) -> Result, Error> { let mut hasher = self.hasher; - let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?; + let hash = web::block(move || hasher.finalize_reset().to_vec()).await?; Ok(hash) } } -impl Hash { - fn new(inner: Vec) -> Self { - Hash { inner } - } - - pub(super) fn as_slice(&self) -> &[u8] { - &self.inner - } - - pub(super) fn into_inner(self) -> Vec { - self.inner - } -} - impl AsyncRead for Hasher where I: AsyncRead, @@ -77,12 +59,6 @@ where } } -impl std::fmt::Debug for Hash { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", base64::encode(&self.inner)) - } -} - #[cfg(test)] mod test { use super::Hasher; @@ -127,6 +103,6 @@ mod test { hasher.update(vec); let correct_hash = hasher.finalize_reset().to_vec(); - assert_eq!(hash.inner, correct_hash); + assert_eq!(hash, correct_hash); } } diff --git a/src/main.rs b/src/main.rs index f243f58..15f635a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,6 +30,7 @@ mod error; mod exiftool; mod ffmpeg; mod file; +mod ingest; mod init_tracing; mod magick; mod middleware; @@ -43,26 +44,24 @@ mod serde_str; mod store; mod stream; mod tmp_file; -mod upload_manager; mod validate; -use crate::stream::StreamTimeout; - use self::{ concurrent_processor::CancelSafeProcessor, config::{Configuration, ImageFormat, Operation}, details::Details, either::Either, error::{Error, UploadError}, + ffmpeg::{InputFormat, ThumbnailFormat}, + ingest::Session, init_tracing::init_tracing, magick::details_hint, middleware::{Deadline, Internal}, migrate::LatestDb, - repo::{Alias, DeleteToken, Repo}, + repo::{Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo}, serde_str::Serde, - store::{file_store::FileStore, object_store::ObjectStore, Store}, - stream::StreamLimit, - upload_manager::{UploadManager, UploadManagerSession}, + store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, + stream::{StreamLimit, StreamTimeout}, }; const MEGABYTES: usize = 1024 * 1024; @@ -78,10 +77,10 @@ static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))); /// Handle responding to succesful uploads -#[instrument(name = "Uploaded files", skip(value, manager))] -async fn upload( - value: Value>, - manager: web::Data, +#[instrument(name = "Uploaded files", skip(value))] +async fn upload( + value: Value>, + repo: web::Data, store: web::Data, ) -> Result { let images = value @@ -100,8 +99,8 @@ async fn upload( info!("Uploaded {} as {:?}", image.filename, alias); let delete_token = image.result.delete_token().await?; - let identifier = manager.identifier_from_alias::(alias).await?; - let details = manager.details(&identifier).await?; + let identifier = repo.identifier_from_alias::(alias).await?; + let details = repo.details(&identifier).await?; let details = if let Some(details) = details { debug!("details exist"); @@ -112,7 +111,7 @@ async fn upload( let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; debug!("storing details for {:?}", identifier); - manager.store_details(&identifier, &new_details).await?; + repo.relate_details(&identifier, &new_details).await?; debug!("stored"); new_details }; @@ -125,8 +124,8 @@ async fn upload( } } - for image in images { - image.result.succeed(); + for mut image in images { + image.result.disarm(); } Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", @@ -140,10 +139,10 @@ struct UrlQuery { } /// download an image from a URL -#[instrument(name = "Downloading file", skip(client, manager))] -async fn download( +#[instrument(name = "Downloading file", skip(client, repo))] +async fn download( client: web::Data, - manager: web::Data, + repo: web::Data, store: web::Data, query: web::Query, ) -> Result { @@ -157,31 +156,25 @@ async fn download( .map_err(Error::from) .limit((CONFIG.media.max_file_size * MEGABYTES) as u64); - futures_util::pin_mut!(stream); + let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?; - let permit = PROCESS_SEMAPHORE.acquire().await?; - let session = manager - .session((**store).clone()) - .upload(CONFIG.media.enable_silent_video, stream) - .await?; - let alias = session.alias().unwrap().to_owned(); - drop(permit); + let alias = session.alias().expect("alias should exist").to_owned(); let delete_token = session.delete_token().await?; - let identifier = manager.identifier_from_alias::(&alias).await?; + let identifier = repo.identifier_from_alias::(&alias).await?; - let details = manager.details(&identifier).await?; + let details = repo.details(&identifier).await?; let details = if let Some(details) = details { details } else { let hint = details_hint(&alias); let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - manager.store_details(&identifier, &new_details).await?; + repo.relate_details(&identifier, &new_details).await?; new_details }; - session.succeed(); + session.disarm(); Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", "files": [{ @@ -193,9 +186,9 @@ async fn download( } /// Delete aliases and files -#[instrument(name = "Deleting file", skip(manager))] -async fn delete( - manager: web::Data, +#[instrument(name = "Deleting file", skip(repo))] +async fn delete( + repo: web::Data, path_entries: web::Path<(String, String)>, ) -> Result { let (token, alias) = path_entries.into_inner(); @@ -203,7 +196,7 @@ async fn delete( let token = DeleteToken::from_existing(&token); let alias = Alias::from_existing(&alias); - manager.delete(alias, token).await?; + queue::cleanup_alias(&**repo, alias, token).await?; Ok(HttpResponse::NoContent().finish()) } @@ -249,20 +242,21 @@ fn prepare_process( Ok((format, alias, thumbnail_path, thumbnail_args)) } -#[instrument(name = "Fetching derived details", skip(manager))] -async fn process_details( +#[instrument(name = "Fetching derived details", skip(repo))] +async fn process_details( query: web::Query, ext: web::Path, - manager: web::Data, + repo: web::Data, ) -> Result { let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?; - let identifier = manager - .variant_identifier::(&alias, &thumbnail_path) + let hash = repo.hash(&alias).await?; + let identifier = repo + .variant_identifier::(hash, thumbnail_path.to_string_lossy().to_string()) .await? .ok_or(UploadError::MissingAlias)?; - let details = manager.details(&identifier).await?; + let details = repo.details(&identifier).await?; let details = details.ok_or(UploadError::NoFiles)?; @@ -270,38 +264,60 @@ async fn process_details( } /// Process files -#[instrument(name = "Serving processed image", skip(manager))] -async fn process( +#[instrument(name = "Serving processed image", skip(repo))] +async fn process( range: Option>, query: web::Query, ext: web::Path, - manager: web::Data, + repo: web::Data, store: web::Data, ) -> Result { let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; - let identifier_opt = manager - .variant_identifier::(&alias, &thumbnail_path) + let path_string = thumbnail_path.to_string_lossy().to_string(); + let hash = repo.hash(&alias).await?; + let identifier_opt = repo + .variant_identifier::(hash.clone(), path_string) .await?; if let Some(identifier) = identifier_opt { - let details_opt = manager.details(&identifier).await?; + let details_opt = repo.details(&identifier).await?; let details = if let Some(details) = details_opt { details } else { let hint = details_hint(&alias); let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - manager.store_details(&identifier, &details).await?; + repo.relate_details(&identifier, &details).await?; details }; return ranged_file_resp(&**store, identifier, range, details).await; } - let identifier = manager - .still_identifier_from_alias((**store).clone(), &alias) + let identifier = if let Some(identifier) = repo + .still_identifier_from_alias::(&alias) + .await? + { + identifier + } else { + let identifier = repo.identifier(hash.clone()).await?; + let permit = PROCESS_SEMAPHORE.acquire().await; + let mut reader = crate::ffmpeg::thumbnail( + (**store).clone(), + identifier, + InputFormat::Mp4, + ThumbnailFormat::Jpeg, + ) .await?; + let motion_identifier = store.save_async_read(&mut reader).await?; + drop(permit); + + repo.relate_motion_identifier(hash.clone(), &motion_identifier) + .await?; + + motion_identifier + }; let thumbnail_path2 = thumbnail_path.clone(); let identifier2 = identifier.clone(); @@ -326,10 +342,13 @@ async fn process( let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?; let identifier = store.save_bytes(bytes.clone()).await?; - manager.store_details(&identifier, &details).await?; - manager - .store_variant(&alias, &thumbnail_path, &identifier) - .await?; + repo.relate_details(&identifier, &details).await?; + repo.relate_variant_identifier( + hash, + thumbnail_path.to_string_lossy().to_string(), + &identifier, + ) + .await?; Ok((details, bytes)) as Result<(Details, web::Bytes), Error> }; @@ -370,25 +389,25 @@ async fn process( } /// Fetch file details -#[instrument(name = "Fetching details", skip(manager))] -async fn details( +#[instrument(name = "Fetching details", skip(repo))] +async fn details( alias: web::Path, - manager: web::Data, + repo: web::Data, store: web::Data, ) -> Result { let alias = alias.into_inner(); let alias = Alias::from_existing(&alias); - let identifier = manager.identifier_from_alias::(&alias).await?; + let identifier = repo.identifier_from_alias::(&alias).await?; - let details = manager.details(&identifier).await?; + let details = repo.details(&identifier).await?; let details = if let Some(details) = details { details } else { let hint = details_hint(&alias); let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - manager.store_details(&identifier, &new_details).await?; + repo.relate_details(&identifier, &new_details).await?; new_details }; @@ -396,25 +415,25 @@ async fn details( } /// Serve files -#[instrument(name = "Serving file", skip(manager))] -async fn serve( +#[instrument(name = "Serving file", skip(repo))] +async fn serve( range: Option>, alias: web::Path, - manager: web::Data, + repo: web::Data, store: web::Data, ) -> Result { let alias = alias.into_inner(); let alias = Alias::from_existing(&alias); - let identifier = manager.identifier_from_alias::(&alias).await?; + let identifier = repo.identifier_from_alias::(&alias).await?; - let details = manager.details(&identifier).await?; + let details = repo.details(&identifier).await?; let details = if let Some(details) = details { details } else { let hint = details_hint(&alias); let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; - manager.store_details(&identifier, &details).await?; + repo.relate_details(&identifier, &details).await?; details }; @@ -506,18 +525,17 @@ struct AliasQuery { alias: String, } -#[instrument(name = "Purging file", skip(upload_manager))] -async fn purge( +#[instrument(name = "Purging file", skip(repo))] +async fn purge( query: web::Query, - upload_manager: web::Data, + repo: web::Data, ) -> Result { let alias = Alias::from_existing(&query.alias); - let aliases = upload_manager.aliases_by_alias(&alias).await?; + let aliases = repo.aliases_from_alias(&alias).await?; for alias in aliases.iter() { - upload_manager - .delete_without_token(alias.to_owned()) - .await?; + let token = repo.delete_token(alias).await?; + queue::cleanup_alias(&**repo, alias.clone(), token).await?; } Ok(HttpResponse::Ok().json(&serde_json::json!({ @@ -526,13 +544,13 @@ async fn purge( }))) } -#[instrument(name = "Fetching aliases", skip(upload_manager))] -async fn aliases( +#[instrument(name = "Fetching aliases", skip(repo))] +async fn aliases( query: web::Query, - upload_manager: web::Data, + repo: web::Data, ) -> Result { let alias = Alias::from_existing(&query.alias); - let aliases = upload_manager.aliases_by_alias(&alias).await?; + let aliases = repo.aliases_from_alias(&alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", @@ -567,14 +585,14 @@ fn next_worker_id() -> String { format!("{}-{}", CONFIG.server.worker_id, next_id) } -async fn launch( - manager: UploadManager, +async fn launch( + repo: R, store: S, ) -> color_eyre::Result<()> { // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it - let manager2 = manager.clone(); + let repo2 = repo.clone(); let store2 = store.clone(); let form = Form::new() .max_files(10) @@ -583,33 +601,24 @@ async fn launch( .field( "images", Field::array(Field::file(move |filename, _, stream| { + let repo = repo2.clone(); let store = store2.clone(); - let manager = manager2.clone(); let span = tracing::info_span!("file-upload", ?filename); - async move { - let permit = PROCESS_SEMAPHORE.acquire().await?; + let stream = stream.map_err(Error::from); - let res = manager - .session(store) - .upload( - CONFIG.media.enable_silent_video, - stream.map_err(Error::from), - ) - .await; - - drop(permit); - res - } - .instrument(span) + Box::pin( + async move { ingest::ingest(&repo, &store, stream, None, true).await } + .instrument(span), + ) })), ); // Create a new Multipart Form validator for internal imports // // This form is expecting a single array field, 'images' with at most 10 files in it - let manager2 = manager.clone(); + let repo2 = repo.clone(); let store2 = store.clone(); let import_form = Form::new() .max_files(10) @@ -618,42 +627,40 @@ async fn launch( .field( "images", Field::array(Field::file(move |filename, _, stream| { + let repo = repo2.clone(); let store = store2.clone(); - let manager = manager2.clone(); let span = tracing::info_span!("file-import", ?filename); - async move { - let permit = PROCESS_SEMAPHORE.acquire().await?; + let stream = stream.map_err(Error::from); - let res = manager - .session(store) - .import( - filename, + Box::pin( + async move { + ingest::ingest( + &repo, + &store, + stream, + Some(Alias::from_existing(&filename)), !CONFIG.media.skip_validate_imports, - CONFIG.media.enable_silent_video, - stream.map_err(Error::from), ) - .await; - - drop(permit); - res - } - .instrument(span) + .await + } + .instrument(span), + ) })), ); HttpServer::new(move || { - let manager = manager.clone(); let store = store.clone(); + let repo = repo.clone(); actix_rt::spawn(queue::process_cleanup( - manager.repo().clone(), + repo.clone(), store.clone(), next_worker_id(), )); actix_rt::spawn(queue::process_images( - manager.repo().clone(), + repo.clone(), store.clone(), next_worker_id(), )); @@ -661,8 +668,8 @@ async fn launch( App::new() .wrap(TracingLogger::default()) .wrap(Deadline) + .app_data(web::Data::new(repo)) .app_data(web::Data::new(store)) - .app_data(web::Data::new(manager)) .app_data(web::Data::new(build_client())) .service( web::scope("/image") @@ -670,25 +677,27 @@ async fn launch( web::resource("") .guard(guard::Post()) .wrap(form.clone()) - .route(web::post().to(upload::)), + .route(web::post().to(upload::)), ) - .service(web::resource("/download").route(web::get().to(download::))) + .service(web::resource("/download").route(web::get().to(download::))) .service( web::resource("/delete/{delete_token}/{filename}") - .route(web::delete().to(delete)) - .route(web::get().to(delete)), + .route(web::delete().to(delete::)) + .route(web::get().to(delete::)), ) - .service(web::resource("/original/{filename}").route(web::get().to(serve::))) - .service(web::resource("/process.{ext}").route(web::get().to(process::))) + .service( + web::resource("/original/{filename}").route(web::get().to(serve::)), + ) + .service(web::resource("/process.{ext}").route(web::get().to(process::))) .service( web::scope("/details") .service( web::resource("/original/{filename}") - .route(web::get().to(details::)), + .route(web::get().to(details::)), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process_details::)), + .route(web::get().to(process_details::)), ), ), ) @@ -700,10 +709,10 @@ async fn launch( .service( web::resource("/import") .wrap(import_form.clone()) - .route(web::post().to(upload::)), + .route(web::post().to(upload::)), ) - .service(web::resource("/purge").route(web::post().to(purge))) - .service(web::resource("/aliases").route(web::get().to(aliases))), + .service(web::resource("/purge").route(web::post().to(purge::))) + .service(web::resource("/aliases").route(web::get().to(aliases::))), ) }) .bind(CONFIG.server.address)? @@ -715,19 +724,16 @@ async fn launch( Ok(()) } -async fn migrate_inner( - manager: &UploadManager, - repo: &Repo, - from: S1, - to: &config::Store, -) -> color_eyre::Result<()> +async fn migrate_inner(repo: &Repo, from: S1, to: &config::Store) -> color_eyre::Result<()> where S1: Store, { match to { config::Store::Filesystem(config::Filesystem { path }) => { let to = FileStore::build(path.clone(), repo.clone()).await?; - manager.migrate_store::(from, to).await?; + match repo { + Repo::Sled(repo) => migrate_store(repo, from, to).await?, + } } config::Store::ObjectStorage(config::ObjectStorage { bucket_name, @@ -749,7 +755,9 @@ where ) .await?; - manager.migrate_store::(from, to).await?; + match repo { + Repo::Sled(repo) => migrate_store(repo, from, to).await?, + } } } @@ -765,15 +773,13 @@ async fn main() -> color_eyre::Result<()> { let db = LatestDb::exists(CONFIG.old_db.path.clone()).migrate()?; repo.from_db(db).await?; - let manager = UploadManager::new(repo.clone(), CONFIG.media.format).await?; - match (*OPERATION).clone() { Operation::Run => (), Operation::MigrateStore { from, to } => { match from { config::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; - migrate_inner(&manager, &repo, from, &to).await?; + migrate_inner(&repo, from, &to).await?; } config::Store::ObjectStorage(config::ObjectStorage { bucket_name, @@ -795,7 +801,7 @@ async fn main() -> color_eyre::Result<()> { ) .await?; - migrate_inner(&manager, &repo, from, &to).await?; + migrate_inner(&repo, from, &to).await?; } } @@ -805,8 +811,10 @@ async fn main() -> color_eyre::Result<()> { match CONFIG.store.clone() { config::Store::Filesystem(config::Filesystem { path }) => { - let store = FileStore::build(path, repo).await?; - launch(manager, store).await + let store = FileStore::build(path, repo.clone()).await?; + match repo { + Repo::Sled(sled_repo) => launch(sled_repo, store).await, + } } config::Store::ObjectStorage(config::ObjectStorage { bucket_name, @@ -823,12 +831,92 @@ async fn main() -> color_eyre::Result<()> { Some(secret_key), security_token, session_token, - repo, + repo.clone(), build_reqwest_client()?, ) .await?; - launch(manager, store).await + match repo { + Repo::Sled(sled_repo) => launch(sled_repo, store).await, + } } } } + +const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; + +async fn migrate_store(repo: &R, from: S1, to: S2) -> Result<(), Error> +where + S1: Store, + S2: Store, + R: IdentifierRepo + HashRepo + SettingsRepo, +{ + let stream = repo.hashes().await; + let mut stream = Box::pin(stream); + + while let Some(hash) = stream.next().await { + let hash = hash?; + if let Some(identifier) = repo + .motion_identifier(hash.as_ref().to_vec().into()) + .await? + { + let new_identifier = migrate_file(&from, &to, &identifier).await?; + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier) + .await?; + } + + for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? { + let new_identifier = migrate_file(&from, &to, &identifier).await?; + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier) + .await?; + } + + let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?; + let new_identifier = migrate_file(&from, &to, &identifier).await?; + migrate_details(repo, identifier, &new_identifier).await?; + repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) + .await?; + + repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) + .await?; + } + + // clean up the migration key to avoid interfering with future migrations + repo.remove(STORE_MIGRATION_PROGRESS).await?; + + Ok(()) +} + +async fn migrate_file( + from: &S1, + to: &S2, + identifier: &S1::Identifier, +) -> Result +where + S1: Store, + S2: Store, +{ + let stream = from.to_stream(identifier, None, None).await?; + futures_util::pin_mut!(stream); + let mut reader = tokio_util::io::StreamReader::new(stream); + + let new_identifier = to.save_async_read(&mut reader).await?; + + Ok(new_identifier) +} + +async fn migrate_details(repo: &R, from: I1, to: &I2) -> Result<(), Error> +where + R: IdentifierRepo, + I1: Identifier, + I2: Identifier, +{ + if let Some(details) = repo.details(&from).await? { + repo.relate_details(to, &details).await?; + repo.cleanup(&from).await?; + } + + Ok(()) +} diff --git a/src/queue.rs b/src/queue.rs index 9eceb7c..8398f23 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,9 +1,9 @@ use crate::{ config::ImageFormat, error::Error, - repo::{Alias, AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo}, + repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo}, serde_str::Serde, - store::Store, + store::{Identifier, Store}, }; use std::{future::Future, path::PathBuf, pin::Pin}; use uuid::Uuid; @@ -16,8 +16,16 @@ const PROCESS_QUEUE: &str = "process"; #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Cleanup { - CleanupHash { hash: Vec }, - CleanupIdentifier { identifier: Vec }, + Hash { + hash: Vec, + }, + Identifier { + identifier: Vec, + }, + Alias { + alias: Serde, + token: Serde, + }, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -36,14 +44,38 @@ enum Process { }, } -pub(crate) async fn queue_cleanup(repo: &R, hash: R::Bytes) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::CleanupHash { +pub(crate) async fn cleanup_alias( + repo: &R, + alias: Alias, + token: DeleteToken, +) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::Alias { + alias: Serde::new(alias), + token: Serde::new(token), + })?; + repo.push(CLEANUP_QUEUE, job.into()).await?; + Ok(()) +} + +pub(crate) async fn cleanup_hash(repo: &R, hash: R::Bytes) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::Hash { hash: hash.as_ref().to_vec(), })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } +pub(crate) async fn cleanup_identifier( + repo: &R, + identifier: I, +) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::Identifier { + identifier: identifier.to_bytes()?, + })?; + repo.push(CLEANUP_QUEUE, job.into()).await?; + Ok(()) +} + pub(crate) async fn queue_ingest( repo: &R, identifier: Vec, @@ -78,16 +110,16 @@ pub(crate) async fn queue_generate( Ok(()) } -pub(crate) async fn process_cleanup(repo: Repo, store: S, worker_id: String) { - match repo { - Repo::Sled(repo) => process_jobs(&repo, &store, worker_id, cleanup::perform).await, - } +pub(crate) async fn process_cleanup(repo: R, store: S, worker_id: String) { + process_jobs(&repo, &store, worker_id, cleanup::perform).await } -pub(crate) async fn process_images(repo: Repo, store: S, worker_id: String) { - match repo { - Repo::Sled(repo) => process_jobs(&repo, &store, worker_id, process::perform).await, - } +pub(crate) async fn process_images( + repo: R, + store: S, + worker_id: String, +) { + process_jobs(&repo, &store, worker_id, process::perform).await } type LocalBoxFuture<'a, T> = Pin + 'a>>; diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 1abec81..0496695 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -1,7 +1,8 @@ use crate::{ - error::Error, - queue::{Cleanup, LocalBoxFuture, CLEANUP_QUEUE}, - repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo}, + error::{Error, UploadError}, + queue::{Cleanup, LocalBoxFuture}, + repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo}, + serde_str::Serde, store::{Identifier, Store}, }; use tracing::error; @@ -12,17 +13,27 @@ pub(super) fn perform<'a, R, S>( job: &'a [u8], ) -> LocalBoxFuture<'a, Result<(), Error>> where - R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, + R: FullRepo, S: Store, { Box::pin(async move { match serde_json::from_slice(job) { Ok(job) => match job { - Cleanup::CleanupHash { hash: in_hash } => hash::(repo, in_hash).await?, - Cleanup::CleanupIdentifier { + Cleanup::Hash { hash: in_hash } => hash::(repo, in_hash).await?, + Cleanup::Identifier { identifier: in_identifier, } => identifier(repo, &store, in_identifier).await?, + Cleanup::Alias { + alias: stored_alias, + token, + } => { + alias( + repo, + Serde::into_inner(stored_alias), + Serde::into_inner(token), + ) + .await? + } }, Err(e) => { tracing::warn!("Invalid job: {}", e); @@ -36,8 +47,7 @@ where #[tracing::instrument(skip(repo, store))] async fn identifier(repo: &R, store: &S, identifier: Vec) -> Result<(), Error> where - R: QueueRepo + HashRepo + IdentifierRepo, - R::Bytes: Clone, + R: FullRepo, S: Store, { let identifier = S::Identifier::from_bytes(identifier)?; @@ -67,8 +77,7 @@ where #[tracing::instrument(skip(repo))] async fn hash(repo: &R, hash: Vec) -> Result<(), Error> where - R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo, - R::Bytes: Clone, + R: FullRepo, S: Store, { let hash: R::Bytes = hash.into(); @@ -89,13 +98,31 @@ where idents.extend(repo.motion_identifier(hash.clone()).await?); for identifier in idents { - if let Ok(identifier) = identifier.to_bytes() { - let job = serde_json::to_vec(&Cleanup::CleanupIdentifier { identifier })?; - repo.push(CLEANUP_QUEUE, job.into()).await?; - } + let _ = crate::queue::cleanup_identifier(repo, identifier).await; } HashRepo::cleanup(repo, hash).await?; Ok(()) } + +async fn alias(repo: &R, alias: Alias, token: DeleteToken) -> Result<(), Error> +where + R: FullRepo, +{ + let saved_delete_token = repo.delete_token(&alias).await?; + if saved_delete_token != token { + return Err(UploadError::InvalidToken.into()); + } + + let hash = repo.hash(&alias).await?; + + AliasRepo::cleanup(repo, &alias).await?; + repo.remove_alias(hash.clone(), &alias).await?; + + if repo.aliases(hash.clone()).await?.is_empty() { + crate::queue::cleanup_hash(repo, hash).await?; + } + + Ok(()) +} diff --git a/src/queue/process.rs b/src/queue/process.rs index 261f095..76e4c8d 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -1,13 +1,14 @@ use crate::{ config::ImageFormat, error::Error, + ingest::Session, queue::{LocalBoxFuture, Process}, - repo::{Alias, AliasRepo, HashRepo, IdentifierRepo, QueueRepo}, + repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult}, serde_str::Serde, - store::Store, + store::{Identifier, Store}, }; +use futures_util::TryStreamExt; use std::path::PathBuf; -use uuid::Uuid; pub(super) fn perform<'a, R, S>( repo: &'a R, @@ -15,8 +16,7 @@ pub(super) fn perform<'a, R, S>( job: &'a [u8], ) -> LocalBoxFuture<'a, Result<(), Error>> where - R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, - R::Bytes: Clone, + R: FullRepo + 'static, S: Store, { Box::pin(async move { @@ -28,11 +28,11 @@ where declared_alias, should_validate, } => { - ingest( + process_ingest( repo, store, identifier, - upload_id, + upload_id.into(), declared_alias.map(Serde::into_inner), should_validate, ) @@ -64,15 +64,54 @@ where }) } -async fn ingest( +#[tracing::instrument(skip(repo, store))] +async fn process_ingest( repo: &R, store: &S, - identifier: Vec, - upload_id: Uuid, + unprocessed_identifier: Vec, + upload_id: UploadId, declared_alias: Option, should_validate: bool, -) -> Result<(), Error> { - unimplemented!("do this") +) -> Result<(), Error> +where + R: FullRepo + 'static, + S: Store, +{ + let fut = async { + let unprocessed_identifier = S::Identifier::from_bytes(unprocessed_identifier)?; + + let stream = store + .to_stream(&unprocessed_identifier, None, None) + .await? + .map_err(Error::from); + + let session = + crate::ingest::ingest(repo, store, stream, declared_alias, should_validate).await?; + + let token = session.delete_token().await?; + + Ok((session, token)) as Result<(Session, DeleteToken), Error> + }; + + let result = match fut.await { + Ok((mut session, token)) => { + let alias = session.alias().take().expect("Alias should exist").clone(); + let result = UploadResult::Success { alias, token }; + session.disarm(); + result + } + Err(e) => { + tracing::warn!("Failed to ingest {}, {:?}", e, e); + + UploadResult::Failure { + message: e.to_string(), + } + } + }; + + repo.complete(upload_id, result).await?; + + Ok(()) } async fn generate( diff --git a/src/repo.rs b/src/repo.rs index e77a482..934576e 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,5 +1,6 @@ use crate::{config, details::Details, error::Error, store::Identifier}; use futures_util::Stream; +use std::fmt::Debug; use tracing::debug; use uuid::Uuid; @@ -40,8 +41,49 @@ pub(crate) enum UploadResult { Failure { message: String }, } +#[async_trait::async_trait(?Send)] +pub(crate) trait FullRepo: + UploadRepo + + SettingsRepo + + IdentifierRepo + + AliasRepo + + QueueRepo + + HashRepo + + Send + + Sync + + Clone + + Debug +{ + async fn identifier_from_alias( + &self, + alias: &Alias, + ) -> Result { + let hash = self.hash(alias).await?; + self.identifier(hash).await + } + + async fn aliases_from_alias(&self, alias: &Alias) -> Result, Error> { + let hash = self.hash(alias).await?; + self.aliases(hash).await + } + + async fn still_identifier_from_alias( + &self, + alias: &Alias, + ) -> Result, Error> { + let hash = self.hash(alias).await?; + let identifier = self.identifier::(hash.clone()).await?; + + match self.details(&identifier).await? { + Some(details) if details.is_motion() => self.motion_identifier::(hash).await, + Some(_) => Ok(Some(identifier)), + None => Ok(None), + } + } +} + pub(crate) trait BaseRepo { - type Bytes: AsRef<[u8]> + From>; + type Bytes: AsRef<[u8]> + From> + Clone; } #[async_trait::async_trait(?Send)] @@ -396,6 +438,12 @@ impl UploadId { } } +impl From for UploadId { + fn from(id: Uuid) -> Self { + Self { id } + } +} + impl std::fmt::Display for MaybeUuid { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -405,6 +453,14 @@ impl std::fmt::Display for MaybeUuid { } } +impl std::str::FromStr for DeleteToken { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + Ok(DeleteToken::from_existing(s)) + } +} + impl std::fmt::Display for DeleteToken { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.id) diff --git a/src/repo/sled.rs b/src/repo/sled.rs index c699158..262fa81 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,8 +1,8 @@ use crate::{ error::Error, repo::{ - Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier, - IdentifierRepo, QueueRepo, SettingsRepo, + Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo, + Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, }, stream::from_iterator, }; @@ -15,8 +15,6 @@ use std::{ }; use tokio::sync::Notify; -use super::BaseRepo; - macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); @@ -85,6 +83,23 @@ impl BaseRepo for SledRepo { type Bytes = IVec; } +impl FullRepo for SledRepo {} + +#[async_trait::async_trait(?Send)] +impl UploadRepo for SledRepo { + async fn wait(&self, upload_id: UploadId) -> Result { + unimplemented!("DO THIS") + } + + async fn claim(&self, upload_id: UploadId) -> Result<(), Error> { + unimplemented!("DO THIS") + } + + async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> { + unimplemented!("DO THIS") + } +} + #[async_trait::async_trait(?Send)] impl QueueRepo for SledRepo { async fn in_progress(&self, worker_id: Vec) -> Result, Error> { diff --git a/src/upload_manager.rs b/src/upload_manager.rs deleted file mode 100644 index a5c916c..0000000 --- a/src/upload_manager.rs +++ /dev/null @@ -1,366 +0,0 @@ -use crate::{ - config::ImageFormat, - details::Details, - error::{Error, UploadError}, - ffmpeg::{InputFormat, ThumbnailFormat}, - magick::details_hint, - repo::{ - sled::SledRepo, Alias, AliasRepo, BaseRepo, DeleteToken, HashRepo, IdentifierRepo, Repo, - SettingsRepo, - }, - store::{Identifier, Store}, -}; -use futures_util::StreamExt; -use sha2::Digest; -use std::sync::Arc; -use tracing::instrument; - -mod hasher; -mod session; - -pub(super) use session::UploadManagerSession; - -const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; - -#[derive(Clone)] -pub(crate) struct UploadManager { - inner: Arc, -} - -pub(crate) struct UploadManagerInner { - format: Option, - hasher: sha2::Sha256, - repo: Repo, -} - -impl UploadManager { - pub(crate) fn repo(&self) -> &Repo { - &self.inner.repo - } - - /// Create a new UploadManager - pub(crate) async fn new(repo: Repo, format: Option) -> Result { - let manager = UploadManager { - inner: Arc::new(UploadManagerInner { - format, - hasher: sha2::Sha256::new(), - repo, - }), - }; - - Ok(manager) - } - - pub(crate) async fn migrate_store(&self, from: S1, to: S2) -> Result<(), Error> - where - S1: Store, - S2: Store, - { - match self.inner.repo { - Repo::Sled(ref sled_repo) => do_migrate_store(sled_repo, from, to).await, - } - } - - pub(crate) async fn still_identifier_from_alias( - &self, - store: S, - alias: &Alias, - ) -> Result { - let identifier = self.identifier_from_alias::(alias).await?; - let details = if let Some(details) = self.details(&identifier).await? { - details - } else { - let hint = details_hint(alias); - Details::from_store(store.clone(), identifier.clone(), hint).await? - }; - - if !details.is_motion() { - return Ok(identifier); - } - - if let Some(motion_identifier) = self.motion_identifier::(alias).await? { - return Ok(motion_identifier); - } - - let permit = crate::PROCESS_SEMAPHORE.acquire().await; - let mut reader = crate::ffmpeg::thumbnail( - store.clone(), - identifier, - InputFormat::Mp4, - ThumbnailFormat::Jpeg, - ) - .await?; - let motion_identifier = store.save_async_read(&mut reader).await?; - drop(permit); - - self.store_motion_identifier(alias, &motion_identifier) - .await?; - Ok(motion_identifier) - } - - async fn motion_identifier( - &self, - alias: &Alias, - ) -> Result, Error> { - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let hash = sled_repo.hash(alias).await?; - Ok(sled_repo.motion_identifier(hash).await?) - } - } - } - - async fn store_motion_identifier( - &self, - alias: &Alias, - identifier: &I, - ) -> Result<(), Error> { - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let hash = sled_repo.hash(alias).await?; - Ok(sled_repo.relate_motion_identifier(hash, identifier).await?) - } - } - } - - #[instrument(skip(self))] - pub(crate) async fn identifier_from_alias( - &self, - alias: &Alias, - ) -> Result { - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let hash = sled_repo.hash(alias).await?; - Ok(sled_repo.identifier(hash).await?) - } - } - } - - #[instrument(skip(self))] - async fn store_identifier( - &self, - hash: Vec, - identifier: &I, - ) -> Result<(), Error> { - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - Ok(sled_repo.relate_identifier(hash.into(), identifier).await?) - } - } - } - - #[instrument(skip(self))] - pub(crate) async fn variant_identifier( - &self, - alias: &Alias, - process_path: &std::path::Path, - ) -> Result, Error> { - let variant = process_path.to_string_lossy().to_string(); - - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let hash = sled_repo.hash(alias).await?; - Ok(sled_repo.variant_identifier(hash, variant).await?) - } - } - } - - /// Store the path to a generated image variant so we can easily clean it up later - #[instrument(skip(self))] - pub(crate) async fn store_full_res( - &self, - alias: &Alias, - identifier: &I, - ) -> Result<(), Error> { - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let hash = sled_repo.hash(alias).await?; - Ok(sled_repo.relate_identifier(hash, identifier).await?) - } - } - } - - /// Store the path to a generated image variant so we can easily clean it up later - #[instrument(skip(self))] - pub(crate) async fn store_variant( - &self, - alias: &Alias, - variant_process_path: &std::path::Path, - identifier: &I, - ) -> Result<(), Error> { - let variant = variant_process_path.to_string_lossy().to_string(); - - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let hash = sled_repo.hash(alias).await?; - Ok(sled_repo - .relate_variant_identifier(hash, variant, identifier) - .await?) - } - } - } - - /// Get the image details for a given variant - #[instrument(skip(self))] - pub(crate) async fn details( - &self, - identifier: &I, - ) -> Result, Error> { - match self.inner.repo { - Repo::Sled(ref sled_repo) => Ok(sled_repo.details(identifier).await?), - } - } - - #[instrument(skip(self))] - pub(crate) async fn store_details( - &self, - identifier: &I, - details: &Details, - ) -> Result<(), Error> { - match self.inner.repo { - Repo::Sled(ref sled_repo) => Ok(sled_repo.relate_details(identifier, details).await?), - } - } - - /// Get a list of aliases for a given alias - pub(crate) async fn aliases_by_alias(&self, alias: &Alias) -> Result, Error> { - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let hash = sled_repo.hash(alias).await?; - Ok(sled_repo.aliases(hash).await?) - } - } - } - - /// Delete an alias without a delete token - pub(crate) async fn delete_without_token(&self, alias: Alias) -> Result<(), Error> { - let token = match self.inner.repo { - Repo::Sled(ref sled_repo) => sled_repo.delete_token(&alias).await?, - }; - - self.delete(alias, token).await - } - - /// Delete the alias, and the file & variants if no more aliases exist - #[instrument(skip(self, alias, token))] - pub(crate) async fn delete(&self, alias: Alias, token: DeleteToken) -> Result<(), Error> { - let hash = match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let saved_delete_token = sled_repo.delete_token(&alias).await?; - if saved_delete_token != token { - return Err(UploadError::InvalidToken.into()); - } - let hash = sled_repo.hash(&alias).await?; - AliasRepo::cleanup(sled_repo, &alias).await?; - sled_repo.remove_alias(hash.clone(), &alias).await?; - hash.to_vec() - } - }; - - self.check_delete_files(hash).await - } - - async fn check_delete_files(&self, hash: Vec) -> Result<(), Error> { - match self.inner.repo { - Repo::Sled(ref sled_repo) => { - let hash: ::Bytes = hash.into(); - - let aliases = sled_repo.aliases(hash.clone()).await?; - - if !aliases.is_empty() { - return Ok(()); - } - - crate::queue::queue_cleanup(sled_repo, hash).await?; - } - } - - Ok(()) - } - - pub(crate) fn session(&self, store: S) -> UploadManagerSession { - UploadManagerSession::new(self.clone(), store) - } -} - -async fn migrate_file( - from: &S1, - to: &S2, - identifier: &S1::Identifier, -) -> Result -where - S1: Store, - S2: Store, -{ - let stream = from.to_stream(identifier, None, None).await?; - futures_util::pin_mut!(stream); - let mut reader = tokio_util::io::StreamReader::new(stream); - - let new_identifier = to.save_async_read(&mut reader).await?; - - Ok(new_identifier) -} - -async fn migrate_details(repo: &R, from: I1, to: &I2) -> Result<(), Error> -where - R: IdentifierRepo, - I1: Identifier, - I2: Identifier, -{ - if let Some(details) = repo.details(&from).await? { - repo.relate_details(to, &details).await?; - repo.cleanup(&from).await?; - } - - Ok(()) -} - -async fn do_migrate_store(repo: &R, from: S1, to: S2) -> Result<(), Error> -where - S1: Store, - S2: Store, - R: IdentifierRepo + HashRepo + SettingsRepo, -{ - let stream = repo.hashes().await; - let mut stream = Box::pin(stream); - - while let Some(hash) = stream.next().await { - let hash = hash?; - if let Some(identifier) = repo - .motion_identifier(hash.as_ref().to_vec().into()) - .await? - { - let new_identifier = migrate_file(&from, &to, &identifier).await?; - migrate_details(repo, identifier, &new_identifier).await?; - repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier) - .await?; - } - - for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? { - let new_identifier = migrate_file(&from, &to, &identifier).await?; - migrate_details(repo, identifier, &new_identifier).await?; - repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier) - .await?; - } - - let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?; - let new_identifier = migrate_file(&from, &to, &identifier).await?; - migrate_details(repo, identifier, &new_identifier).await?; - repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) - .await?; - - repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) - .await?; - } - - // clean up the migration key to avoid interfering with future migrations - repo.remove(STORE_MIGRATION_PROGRESS).await?; - - Ok(()) -} - -impl std::fmt::Debug for UploadManager { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("UploadManager").finish() - } -} diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs deleted file mode 100644 index 390d758..0000000 --- a/src/upload_manager/session.rs +++ /dev/null @@ -1,266 +0,0 @@ -use crate::{ - error::{Error, UploadError}, - magick::ValidInputType, - repo::{Alias, AliasRepo, AlreadyExists, DeleteToken, HashRepo, IdentifierRepo, Repo}, - store::Store, - upload_manager::{ - hasher::{Hash, Hasher}, - UploadManager, - }, -}; -use actix_web::web; -use futures_util::stream::{Stream, StreamExt}; -use tracing::{debug, instrument, Span}; -use tracing_futures::Instrument; - -pub(crate) struct UploadManagerSession { - store: S, - manager: UploadManager, - alias: Option, - finished: bool, -} - -impl UploadManagerSession { - pub(super) fn new(manager: UploadManager, store: S) -> Self { - UploadManagerSession { - store, - manager, - alias: None, - finished: false, - } - } - - pub(crate) fn succeed(mut self) { - self.finished = true; - } - - pub(crate) fn alias(&self) -> Option<&Alias> { - self.alias.as_ref() - } -} - -impl Drop for UploadManagerSession { - fn drop(&mut self) { - if self.finished { - return; - } - - if let Some(alias) = self.alias.take() { - let store = self.store.clone(); - let manager = self.manager.clone(); - let cleanup_span = tracing::info_span!( - parent: None, - "Upload cleanup", - alias = &tracing::field::display(&alias), - ); - cleanup_span.follows_from(Span::current()); - actix_rt::spawn( - async move { - // undo alias -> hash mapping - match manager.inner.repo { - Repo::Sled(ref sled_repo) => { - if let Ok(hash) = sled_repo.hash(&alias).await { - debug!("Clean alias repo"); - let _ = AliasRepo::cleanup(sled_repo, &alias).await; - - if let Ok(identifier) = sled_repo.identifier(hash.clone()).await { - debug!("Clean identifier repo"); - let _ = IdentifierRepo::cleanup(sled_repo, &identifier).await; - - debug!("Remove stored files"); - let _ = store.remove(&identifier).await; - } - debug!("Clean hash repo"); - let _ = HashRepo::cleanup(sled_repo, hash).await; - } - } - } - } - .instrument(cleanup_span), - ); - } - } -} - -impl UploadManagerSession { - /// Generate a delete token for an alias - #[instrument(skip(self))] - pub(crate) async fn delete_token(&self) -> Result { - let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; - - debug!("Generating delete token"); - let delete_token = DeleteToken::generate(); - - debug!("Saving delete token"); - match self.manager.inner.repo { - Repo::Sled(ref sled_repo) => { - let res = sled_repo.relate_delete_token(&alias, &delete_token).await?; - - Ok(if res.is_err() { - let delete_token = sled_repo.delete_token(&alias).await?; - debug!("Returning existing delete token, {:?}", delete_token); - delete_token - } else { - debug!("Returning new delete token, {:?}", delete_token); - delete_token - }) - } - } - } - - /// Import the file, discarding bytes if it's already present, or saving if it's new - pub(crate) async fn import( - mut self, - alias: String, - validate: bool, - enable_silent_video: bool, - mut stream: impl Stream> + Unpin, - ) -> Result { - let mut bytes_mut = actix_web::web::BytesMut::new(); - - debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { - let bytes = res?; - bytes_mut.extend_from_slice(&bytes); - } - - debug!("Validating bytes"); - let (_, validated_reader) = crate::validate::validate_image_bytes( - bytes_mut.freeze(), - self.manager.inner.format, - enable_silent_video, - validate, - ) - .await?; - - let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - - let identifier = self.store.save_async_read(&mut hasher_reader).await?; - let hash = hasher_reader.finalize_reset().await?; - - debug!("Adding alias"); - self.add_existing_alias(&hash, alias).await?; - - debug!("Saving file"); - self.save_upload(&identifier, hash).await?; - - // Return alias to file - Ok(self) - } - - /// Upload the file, discarding bytes if it's already present, or saving if it's new - #[instrument(skip(self, stream))] - pub(crate) async fn upload( - mut self, - enable_silent_video: bool, - mut stream: impl Stream> + Unpin, - ) -> Result { - let mut bytes_mut = actix_web::web::BytesMut::new(); - - debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { - let bytes = res?; - bytes_mut.extend_from_slice(&bytes); - } - - debug!("Validating bytes"); - let (input_type, validated_reader) = crate::validate::validate_image_bytes( - bytes_mut.freeze(), - self.manager.inner.format, - enable_silent_video, - true, - ) - .await?; - - let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - - let identifier = self.store.save_async_read(&mut hasher_reader).await?; - let hash = hasher_reader.finalize_reset().await?; - - debug!("Adding alias"); - self.add_alias(&hash, input_type).await?; - - debug!("Saving file"); - self.save_upload(&identifier, hash).await?; - - // Return alias to file - Ok(self) - } - - // check duplicates & store image if new - #[instrument(skip(self, hash))] - async fn save_upload(&self, identifier: &S::Identifier, hash: Hash) -> Result<(), Error> { - let res = self.check_duplicate(&hash).await?; - - // bail early with alias to existing file if this is a duplicate - if res.is_err() { - debug!("Duplicate exists, removing file"); - - self.store.remove(identifier).await?; - return Ok(()); - } - - self.manager - .store_identifier(hash.into_inner(), identifier) - .await?; - - Ok(()) - } - - // check for an already-uploaded image with this hash, returning the path to the target file - #[instrument(skip(self, hash))] - async fn check_duplicate(&self, hash: &Hash) -> Result, Error> { - let hash = hash.as_slice().to_vec(); - - match self.manager.inner.repo { - Repo::Sled(ref sled_repo) => Ok(HashRepo::create(sled_repo, hash.into()).await?), - } - } - - // Add an alias from an existing filename - async fn add_existing_alias(&mut self, hash: &Hash, filename: String) -> Result<(), Error> { - let alias = Alias::from_existing(&filename); - - match self.manager.inner.repo { - Repo::Sled(ref sled_repo) => { - AliasRepo::create(sled_repo, &alias) - .await? - .map_err(|_| UploadError::DuplicateAlias)?; - self.alias = Some(alias.clone()); - - let hash = hash.as_slice().to_vec(); - sled_repo.relate_hash(&alias, hash.clone().into()).await?; - sled_repo.relate_alias(hash.into(), &alias).await?; - } - } - - Ok(()) - } - - // Add an alias to an existing file - // - // This will help if multiple 'users' upload the same file, and one of them wants to delete it - #[instrument(skip(self, hash, input_type))] - async fn add_alias(&mut self, hash: &Hash, input_type: ValidInputType) -> Result<(), Error> { - loop { - debug!("Alias gen loop"); - let alias = Alias::generate(input_type.as_ext().to_string()); - - match self.manager.inner.repo { - Repo::Sled(ref sled_repo) => { - let res = AliasRepo::create(sled_repo, &alias).await?; - - if res.is_ok() { - self.alias = Some(alias.clone()); - let hash = hash.as_slice().to_vec(); - sled_repo.relate_hash(&alias, hash.clone().into()).await?; - sled_repo.relate_alias(hash.into(), &alias).await?; - return Ok(()); - } - } - }; - - debug!("Alias exists, regenning"); - } - } -}