From 786f583d986e9e9cf228bec327fbc30f0e46f587 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 14 Jul 2023 14:53:37 -0500 Subject: [PATCH] Add public_url to Store for optional redirects to public object store urls Add details generation to file migration to set the content-type for uploads --- src/backgrounded.rs | 4 +- src/config/file.rs | 3 + src/config/primitives.rs | 5 ++ src/details.rs | 2 +- src/ffmpeg.rs | 7 +++ src/generate.rs | 13 ++++- src/ingest.rs | 4 +- src/lib.rs | 115 +++++++++++++++++++++++++++++++++----- src/store.rs | 70 ++++++++++++++++++----- src/store/file_store.rs | 16 +++++- src/store/object_store.rs | 53 ++++++++++++++---- 11 files changed, 242 insertions(+), 50 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 1028e82..86ae39e 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -5,6 +5,7 @@ use crate::{ }; use actix_web::web::Bytes; use futures_util::{Stream, TryStreamExt}; +use mime::APPLICATION_OCTET_STREAM; use tracing::{Instrument, Span}; pub(crate) struct Backgrounded @@ -58,7 +59,8 @@ where let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); - let identifier = store.save_stream(stream).await?; + // use octet-stream, we don't know the upload's real type yet + let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?; self.identifier = Some(identifier); diff --git a/src/config/file.rs b/src/config/file.rs index 549e122..b09140b 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -74,6 +74,9 @@ pub(crate) struct ObjectStorage { /// /// This defaults to 30 seconds pub(crate) client_timeout: u64, + + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) public_endpoint: Option, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] diff --git a/src/config/primitives.rs b/src/config/primitives.rs index c5c001f..293d78f 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -91,6 +91,11 @@ pub(crate) struct ObjectStorage { #[arg(long)] #[serde(skip_serializing_if = "Option::is_none")] pub(crate) client_timeout: Option, + + /// Base endpoint at which object storage images are publicly accessible + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) public_endpoint: Option, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] diff --git a/src/details.rs b/src/details.rs index e40c0ae..bd4e945 100644 --- a/src/details.rs +++ b/src/details.rs @@ -64,7 +64,7 @@ impl Details { InternalFormat::maybe_from_media_type(&self.content_type, self.frames.is_some()) } - pub(crate) fn content_type(&self) -> mime::Mime { + pub(crate) fn media_type(&self) -> mime::Mime { (*self.content_type).clone() } diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 03ddafc..90593de 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -86,6 +86,13 @@ impl ThumbnailFormat { // Self::Webp => "webp", } } + + pub(crate) fn media_type(self) -> mime::Mime { + match self { + Self::Jpeg => mime::IMAGE_JPEG, + // Self::Webp => crate::formats::mimes::image_webp(), + } + } } #[tracing::instrument(skip(store))] diff --git a/src/generate.rs b/src/generate.rs index 0a27ff0..0f2d51b 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -68,14 +68,19 @@ async fn process( return Err(UploadError::MissingIdentifier.into()); }; + let thumbnail_format = thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg); + let reader = crate::ffmpeg::thumbnail( store.clone(), identifier, input_format.unwrap_or(InternalVideoFormat::Mp4), - thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg), + thumbnail_format, ) .await?; - let motion_identifier = store.save_async_read(reader).await?; + + let motion_identifier = store + .save_async_read(reader, thumbnail_format.media_type()) + .await?; repo.relate_motion_identifier(hash.clone(), &motion_identifier) .await?; @@ -122,7 +127,9 @@ async fn process( let details = Details::from_bytes(bytes.clone()).await?; - let identifier = store.save_bytes(bytes.clone()).await?; + let identifier = store + .save_bytes(bytes.clone(), details.media_type()) + .await?; repo.relate_details(&identifier, &details).await?; repo.relate_variant_identifier( hash, diff --git a/src/ingest.rs b/src/ingest.rs index 88c5fdb..714cd86 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -89,7 +89,9 @@ where let hasher_reader = Hasher::new(processed_reader, Sha256::new()); let hasher = hasher_reader.hasher(); - let identifier = store.save_async_read(hasher_reader).await?; + let identifier = store + .save_async_read(hasher_reader, input_type.media_type()) + .await?; drop(permit); diff --git a/src/lib.rs b/src/lib.rs index e36ca31..509cfab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -672,6 +672,12 @@ async fn process( new_details }; + if let Some(public_url) = store.public_url(&identifier) { + return Ok(HttpResponse::SeeOther() + .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) + .finish()); + } + return ranged_file_resp(&store, identifier, range, details, not_found).await; } @@ -721,7 +727,7 @@ async fn process( Ok(srv_response( builder, stream, - details.content_type(), + details.media_type(), 7 * DAYS, details.system_time(), )) @@ -768,6 +774,12 @@ async fn process_head( new_details }; + if let Some(public_url) = store.public_url(&identifier) { + return Ok(HttpResponse::SeeOther() + .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) + .finish()); + } + return ranged_file_head_resp(&store, identifier, range, details).await; } @@ -847,6 +859,12 @@ async fn serve( let details = ensure_details(&repo, &store, &alias).await?; + if let Some(public_url) = store.public_url(&identifier) { + return Ok(HttpResponse::SeeOther() + .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) + .finish()); + } + ranged_file_resp(&store, identifier, range, details, not_found).await } @@ -866,6 +884,12 @@ async fn serve_head( let details = ensure_details(&repo, &store, &alias).await?; + if let Some(public_url) = store.public_url(&identifier) { + return Ok(HttpResponse::SeeOther() + .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) + .finish()); + } + ranged_file_head_resp(&store, identifier, range, details).await } @@ -897,7 +921,7 @@ async fn ranged_file_head_resp( Ok(srv_head( builder, - details.content_type(), + details.media_type(), 7 * DAYS, details.system_time(), ) @@ -953,7 +977,7 @@ async fn ranged_file_resp( Ok(srv_response( builder, stream, - details.content_type(), + details.media_type(), 7 * DAYS, details.system_time(), )) @@ -1280,7 +1304,7 @@ async fn migrate_inner( skip_missing_files: bool, ) -> color_eyre::Result<()> where - S1: Store, + S1: Store + 'static, { match to { config::primitives::Store::Filesystem(config::Filesystem { path }) => { @@ -1300,6 +1324,7 @@ where session_token, signature_duration, client_timeout, + public_endpoint, }) => { let to = ObjectStore::build( endpoint.clone(), @@ -1315,6 +1340,7 @@ where session_token, signature_duration.unwrap_or(15), client_timeout.unwrap_or(30), + public_endpoint, repo.clone(), ) .await? @@ -1427,6 +1453,7 @@ pub async fn run() -> color_eyre::Result<()> { session_token, signature_duration, client_timeout, + public_endpoint, }) => { let from = ObjectStore::build( endpoint, @@ -1442,6 +1469,7 @@ pub async fn run() -> color_eyre::Result<()> { session_token, signature_duration.unwrap_or(15), client_timeout.unwrap_or(30), + public_endpoint, repo.clone(), ) .await? @@ -1480,6 +1508,7 @@ pub async fn run() -> color_eyre::Result<()> { session_token, signature_duration, client_timeout, + public_endpoint, }) => { let store = ObjectStore::build( endpoint, @@ -1495,6 +1524,7 @@ pub async fn run() -> color_eyre::Result<()> { session_token, signature_duration, client_timeout, + public_endpoint, repo.clone(), ) .await?; @@ -1527,7 +1557,7 @@ async fn migrate_store( skip_missing_files: bool, ) -> Result<(), Error> where - S1: Store + Clone, + S1: Store + Clone + 'static, S2: Store + Clone, R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, { @@ -1570,7 +1600,7 @@ async fn do_migrate_store( skip_missing_files: bool, ) -> Result<(), Error> where - S1: Store, + S1: Store + 'static, S2: Store, R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, { @@ -1639,7 +1669,7 @@ where .await? { if repo.get(STORE_MIGRATION_MOTION).await?.is_none() { - match migrate_file(&from, &to, &identifier, skip_missing_files).await { + match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await { Ok(new_identifier) => { migrate_details(repo, identifier, &new_identifier).await?; repo.relate_motion_identifier( @@ -1653,6 +1683,13 @@ where Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error generating details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e.into()); + } Err(MigrateError::From(e)) => { tracing::warn!("Error migrating motion file from old store"); return Err(e.into()); @@ -1675,7 +1712,7 @@ where continue; } - match migrate_file(&from, &to, &identifier, skip_missing_files).await { + match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await { Ok(new_identifier) => { migrate_details(repo, identifier, &new_identifier).await?; repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone()) @@ -1697,6 +1734,13 @@ where hex::encode(&hash) ); } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error generating details for variant file for hash {}", + hex::encode(&hash) + ); + return Err(e.into()); + } Err(MigrateError::From(e)) => { tracing::warn!("Error migrating variant file from old store"); return Err(e.into()); @@ -1708,7 +1752,7 @@ where } } - match migrate_file(&from, &to, &original_identifier, skip_missing_files).await { + match migrate_file(repo, &from, &to, &original_identifier, skip_missing_files).await { Ok(new_identifier) => { migrate_details(repo, original_identifier, &new_identifier).await?; repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) @@ -1717,6 +1761,13 @@ where Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error generating details for original file for hash {}", + hex::encode(&hash) + ); + return Err(e.into()); + } Err(MigrateError::From(e)) => { tracing::warn!("Error migrating original file from old store"); return Err(e.into()); @@ -1756,20 +1807,22 @@ where Ok(()) } -async fn migrate_file( +async fn migrate_file( + repo: &R, from: &S1, to: &S2, identifier: &S1::Identifier, skip_missing_files: bool, ) -> Result where - S1: Store, + R: IdentifierRepo, + S1: Store + 'static, S2: Store, { let mut failure_count = 0; loop { - match do_migrate_file(from, to, identifier).await { + match do_migrate_file(repo, from, to, identifier).await { Ok(identifier) => return Ok(identifier), Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { return Err(MigrateError::From(e)); @@ -1793,16 +1846,19 @@ where #[derive(Debug)] enum MigrateError { From(crate::store::StoreError), + Details(crate::error::Error), To(crate::store::StoreError), } -async fn do_migrate_file( +async fn do_migrate_file( + repo: &R, from: &S1, to: &S2, identifier: &S1::Identifier, ) -> Result where - S1: Store, + R: IdentifierRepo, + S1: Store + 'static, S2: Store, { let stream = from @@ -1810,7 +1866,36 @@ where .await .map_err(MigrateError::From)?; - let new_identifier = to.save_stream(stream).await.map_err(MigrateError::To)?; + let details_opt = repo + .details(identifier) + .await + .map_err(Error::from) + .map_err(MigrateError::Details)? + .and_then(|details| { + if details.internal_format().is_some() { + Some(details) + } else { + None + } + }); + + let details = if let Some(details) = details_opt { + details + } else { + let new_details = Details::from_store(from, identifier) + .await + .map_err(MigrateError::Details)?; + repo.relate_details(identifier, &new_details) + .await + .map_err(Error::from) + .map_err(MigrateError::Details)?; + new_details + }; + + let new_identifier = to + .save_stream(stream, details.media_type()) + .await + .map_err(MigrateError::To)?; Ok(new_identifier) } diff --git a/src/store.rs b/src/store.rs index 180c1af..feec060 100644 --- a/src/store.rs +++ b/src/store.rs @@ -72,15 +72,31 @@ pub(crate) trait Store: Clone + Debug { async fn health_check(&self) -> Result<(), StoreError>; - async fn save_async_read(&self, reader: Reader) -> Result + async fn save_async_read( + &self, + reader: Reader, + content_type: mime::Mime, + ) -> Result where Reader: AsyncRead + Unpin + 'static; - async fn save_stream(&self, stream: S) -> Result + async fn save_stream( + &self, + stream: S, + content_type: mime::Mime, + ) -> Result where S: Stream> + Unpin + 'static; - async fn save_bytes(&self, bytes: Bytes) -> Result; + async fn save_bytes( + &self, + bytes: Bytes, + content_type: mime::Mime, + ) -> Result; + + fn public_url(&self, _: &Self::Identifier) -> Option { + None + } async fn to_stream( &self, @@ -114,22 +130,34 @@ where T::health_check(self).await } - async fn save_async_read(&self, reader: Reader) -> Result + async fn save_async_read( + &self, + reader: Reader, + content_type: mime::Mime, + ) -> Result where Reader: AsyncRead + Unpin + 'static, { - T::save_async_read(self, reader).await + T::save_async_read(self, reader, content_type).await } - async fn save_stream(&self, stream: S) -> Result + async fn save_stream( + &self, + stream: S, + content_type: mime::Mime, + ) -> Result where S: Stream> + Unpin + 'static, { - T::save_stream(self, stream).await + T::save_stream(self, stream, content_type).await } - async fn save_bytes(&self, bytes: Bytes) -> Result { - T::save_bytes(self, bytes).await + async fn save_bytes( + &self, + bytes: Bytes, + content_type: mime::Mime, + ) -> Result { + T::save_bytes(self, bytes, content_type).await } async fn to_stream( @@ -173,22 +201,34 @@ where T::health_check(self).await } - async fn save_async_read(&self, reader: Reader) -> Result + async fn save_async_read( + &self, + reader: Reader, + content_type: mime::Mime, + ) -> Result where Reader: AsyncRead + Unpin + 'static, { - T::save_async_read(self, reader).await + T::save_async_read(self, reader, content_type).await } - async fn save_stream(&self, stream: S) -> Result + async fn save_stream( + &self, + stream: S, + content_type: mime::Mime, + ) -> Result where S: Stream> + Unpin + 'static, { - T::save_stream(self, stream).await + T::save_stream(self, stream, content_type).await } - async fn save_bytes(&self, bytes: Bytes) -> Result { - T::save_bytes(self, bytes).await + async fn save_bytes( + &self, + bytes: Bytes, + content_type: mime::Mime, + ) -> Result { + T::save_bytes(self, bytes, content_type).await } async fn to_stream( diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 90e07a0..7b40a00 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -66,6 +66,7 @@ impl Store for FileStore { async fn save_async_read( &self, mut reader: Reader, + _content_type: mime::Mime, ) -> Result where Reader: AsyncRead + Unpin + 'static, @@ -80,15 +81,24 @@ impl Store for FileStore { Ok(self.file_id_from_path(path)?) } - async fn save_stream(&self, stream: S) -> Result + async fn save_stream( + &self, + stream: S, + content_type: mime::Mime, + ) -> Result where S: Stream> + Unpin + 'static, { - self.save_async_read(StreamReader::new(stream)).await + self.save_async_read(StreamReader::new(stream), content_type) + .await } #[tracing::instrument(skip(bytes))] - async fn save_bytes(&self, bytes: Bytes) -> Result { + async fn save_bytes( + &self, + bytes: Bytes, + _content_type: mime::Mime, + ) -> Result { let path = self.next_file().await?; if let Err(e) = self.safe_save_bytes(&path, bytes).await { diff --git a/src/store/object_store.rs b/src/store/object_store.rs index bf4c686..287c2ae 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -98,6 +98,7 @@ pub(crate) struct ObjectStore { client: Client, signature_expiration: Duration, client_timeout: Duration, + public_endpoint: Option, } #[derive(Clone)] @@ -108,6 +109,7 @@ pub(crate) struct ObjectStoreConfig { credentials: Credentials, signature_expiration: u64, client_timeout: u64, + public_endpoint: Option, } #[derive(serde::Deserialize, Debug)] @@ -130,6 +132,7 @@ impl ObjectStoreConfig { client, signature_expiration: Duration::from_secs(self.signature_expiration), client_timeout: Duration::from_secs(self.client_timeout), + public_endpoint: self.public_endpoint, } } } @@ -190,15 +193,24 @@ impl Store for ObjectStore { Ok(()) } - async fn save_async_read(&self, reader: Reader) -> Result + async fn save_async_read( + &self, + reader: Reader, + content_type: mime::Mime, + ) -> Result where Reader: AsyncRead + Unpin + 'static, { - self.save_stream(ReaderStream::new(reader)).await + self.save_stream(ReaderStream::new(reader), content_type) + .await } #[tracing::instrument(skip_all)] - async fn save_stream(&self, mut stream: S) -> Result + async fn save_stream( + &self, + mut stream: S, + content_type: mime::Mime, + ) -> Result where S: Stream> + Unpin + 'static, { @@ -206,7 +218,7 @@ impl Store for ObjectStore { if first_chunk.len() < CHUNK_SIZE { drop(stream); - let (req, object_id) = self.put_object_request().await?; + let (req, object_id) = self.put_object_request(content_type).await?; let response = req .send_body(first_chunk) .await @@ -221,7 +233,7 @@ impl Store for ObjectStore { let mut first_chunk = Some(first_chunk); - let (req, object_id) = self.create_multipart_request().await?; + let (req, object_id) = self.create_multipart_request(content_type).await?; let mut response = req.send().await.map_err(ObjectError::from)?; if !response.status().is_success() { @@ -329,8 +341,12 @@ impl Store for ObjectStore { } #[tracing::instrument(skip_all)] - async fn save_bytes(&self, bytes: Bytes) -> Result { - let (req, object_id) = self.put_object_request().await?; + async fn save_bytes( + &self, + bytes: Bytes, + content_type: mime::Mime, + ) -> Result { + let (req, object_id) = self.put_object_request(content_type).await?; let response = req.send_body(bytes).await.map_err(ObjectError::from)?; @@ -341,6 +357,13 @@ impl Store for ObjectStore { Ok(object_id) } + fn public_url(&self, identifier: &Self::Identifier) -> Option { + self.public_endpoint.clone().map(|mut endpoint| { + endpoint.set_path(identifier.as_str()); + endpoint + }) + } + #[tracing::instrument(skip(self))] async fn to_stream( &self, @@ -445,6 +468,7 @@ impl ObjectStore { session_token: Option, signature_expiration: u64, client_timeout: u64, + public_endpoint: Option, repo: Repo, ) -> Result { let path_gen = init_generator(&repo).await?; @@ -461,6 +485,7 @@ impl ObjectStore { }, signature_expiration, client_timeout, + public_endpoint, }) } @@ -470,19 +495,25 @@ impl ObjectStore { Ok(self.build_request(action)) } - async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> { + async fn put_object_request( + &self, + content_type: mime::Mime, + ) -> Result<(ClientRequest, ObjectId), StoreError> { let path = self.next_file().await?; let mut action = self.bucket.put_object(Some(&self.credentials), &path); action .headers_mut() - .insert("content-type", "application/octet-stream"); + .insert("content-type", content_type.as_ref()); Ok((self.build_request(action), ObjectId::from_string(path))) } - async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> { + async fn create_multipart_request( + &self, + content_type: mime::Mime, + ) -> Result<(ClientRequest, ObjectId), StoreError> { let path = self.next_file().await?; let mut action = self @@ -491,7 +522,7 @@ impl ObjectStore { action .headers_mut() - .insert("content-type", "application/octet-stream"); + .insert("content-type", content_type.as_ref()); Ok((self.build_request(action), ObjectId::from_string(path))) }