diff --git a/README.md b/README.md index 2642f59..775d179 100644 --- a/README.md +++ b/README.md @@ -17,11 +17,12 @@ _a simple image hosting service_ 1. [Backups](#backups) 2. [0.4 to 0.5 Migration Guide](#0-4-to-0-5-migration-guide) 1. [Overview](#overview) - 2. [Configuration Updates](#configuration-updates) + 2. [Upgrade Configuration](#upgrade-configuration) + 3. [Configuration Updates](#configuration-updates) 1. [Image Changes](#image-changes) 2. [Animation Changes](#animation-changes) 3. [Video Changes](#video-changes) - 3. [Upgrading Directly to Postgres](#upgrading-directly-to-postgres) + 4. [Upgrading Directly to Postgres](#upgrading-directly-to-postgres) 3. [Filesystem to Object Storage Migration](#filesystem-to-object-storage-migration) 1. [Troubleshooting](#migration-troubleshooting) 4. [Sled to Postgres Migration](#sled-to-postgres-migration) @@ -645,6 +646,30 @@ uploaded before this was standard may not have ever had their details records ge _This upgrade must be performed while pict-rs is offline._ +#### Upgrade Configuration +Because upgrades may take so long, there is a new configuration option introduced to attempt to +improve its speed. + +```toml +[upgrade] +concurrency = 32 +``` +or +``` +PICTRS__UPGRADE__CONCURRENCY=32 +``` +or +```bash +$ pict-rs run --upgrade-concurrency 32 +``` + +This value dictates how many hashes pict-rs will attempt to migrate at the same time. Since this +value will increase the number of concurrent connections to the Repo and the Store, as well as +potentially increase CPU and memory use, it should be considered carefully before increasing. + +For large-scale deployments, it is likely this value should be bumped to 128, 256, or even 512. The +default value is 32. + #### Configuration Updates Previously, pict-rs only had two categories for files: images and videos. pict-rs 0.5 adds a third category: animation. With the new explicit support for animated file types, some configuration diff --git a/defaults.toml b/defaults.toml index 3938649..61f8f3b 100644 --- a/defaults.toml +++ b/defaults.toml @@ -6,6 +6,9 @@ max_file_count = 1 [client] timeout = 30 +[upgrade] +concurrency = 32 + [tracing.logging] format = "normal" targets = "info" diff --git a/pict-rs.toml b/pict-rs.toml index b8ea222..c82037e 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -27,6 +27,15 @@ max_file_count = 1 # distinct from the object storage client timeout, which can be configured separately timeout = 30 +[upgrade] +## Optional: How many hashes will be migrated from the previous version of pict-rs at the same time +# environment variable: PICTRS__UPGRADE__CONCURRENCY +# default: 32 +# +# Increasing this value will increase the number of concurrent connections to the Repo, and to the +# Store, so make sure your deployment can handle more throughput before tweaking this. +concurrency = 32 + ## Logging configuration [tracing.logging] diff --git a/src/details.rs b/src/details.rs index 582e075..c22c2dd 100644 --- a/src/details.rs +++ b/src/details.rs @@ -1,15 +1,10 @@ -use std::sync::Arc; - use crate::{ - bytes_stream::BytesStream, discover::Discovery, error::Error, formats::{InternalFormat, InternalVideoFormat}, serde_str::Serde, - store::Store, }; use actix_web::web; -use streem::IntoStreamer; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; #[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -83,6 +78,7 @@ impl Details { self.inner.created_at.timestamp } + #[tracing::instrument(level = "DEBUG")] pub(crate) async fn from_bytes(timeout: u64, input: web::Bytes) -> Result { let Discovery { input, @@ -99,28 +95,6 @@ impl Details { )) } - #[tracing::instrument(level = "DEBUG")] - pub(crate) async fn from_store( - store: &S, - identifier: &Arc, - timeout: u64, - ) -> Result { - let mut buf = BytesStream::new(); - - let mut stream = store - .to_stream(identifier, None, None) - .await? - .into_streamer(); - - while let Some(res) = stream.next().await { - buf.add_bytes(res?); - } - - let bytes = buf.into_bytes(); - - Self::from_bytes(timeout, bytes).await - } - pub(crate) fn internal_format(&self) -> InternalFormat { self.inner.format } diff --git a/src/generate.rs b/src/generate.rs index 33c8d8a..851124e 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -99,7 +99,9 @@ async fn process( let input_details = if let Some(details) = repo.details(&identifier).await? { details } else { - let details = Details::from_store(store, &identifier, media.process_timeout).await?; + let bytes_stream = store.to_bytes(&identifier, None, None).await?; + + let details = Details::from_bytes(media.process_timeout, bytes_stream.into_bytes()).await?; repo.relate_details(&identifier, &details).await?; diff --git a/src/ingest.rs b/src/ingest.rs index b3b168b..58e95ee 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -106,7 +106,8 @@ where .save_async_read(hasher_reader, input_type.media_type()) .await?; - let details = Details::from_store(store, &identifier, media.process_timeout).await?; + let bytes_stream = store.to_bytes(&identifier, None, None).await?; + let details = Details::from_bytes(media.process_timeout, bytes_stream.into_bytes()).await?; drop(permit); diff --git a/src/lib.rs b/src/lib.rs index 0ff61f4..33881a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -122,8 +122,9 @@ async fn ensure_details( } tracing::debug!("generating new details from {:?}", identifier); + let bytes_stream = store.to_bytes(&identifier, None, None).await?; let new_details = - Details::from_store(store, &identifier, config.media.process_timeout).await?; + Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes()).await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -897,8 +898,10 @@ async fn process( } tracing::debug!("generating new details from {:?}", identifier); + let bytes_stream = store.to_bytes(&identifier, None, None).await?; let new_details = - Details::from_store(&store, &identifier, config.media.process_timeout).await?; + Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes()) + .await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); @@ -1015,8 +1018,10 @@ async fn process_head( } tracing::debug!("generating new details from {:?}", identifier); + let bytes_stream = store.to_bytes(&identifier, None, None).await?; let new_details = - Details::from_store(&store, &identifier, config.media.process_timeout).await?; + Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes()) + .await?; tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; tracing::debug!("stored"); diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 6c62a66..0a8f95d 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -407,7 +407,12 @@ where let details = if let Some(details) = details_opt { details } else { - let new_details = Details::from_store(from, identifier, timeout) + let bytes_stream = from + .to_bytes(&identifier, None, None) + .await + .map_err(From::from) + .map_err(MigrateError::Details)?; + let new_details = Details::from_bytes(timeout, bytes_stream.into_bytes()) .await .map_err(MigrateError::Details)?; repo.relate_details(identifier, &new_details) diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index 4dba2cb..209c4ca 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use streem::IntoStreamer; -use tokio::task::JoinSet; +use tokio::{sync::Semaphore, task::JoinSet}; use crate::{ config::Configuration, @@ -341,6 +341,18 @@ async fn set_details( } } +static DETAILS_SEMAPHORE: OnceLock = OnceLock::new(); + +fn details_semaphore() -> &'static Semaphore { + DETAILS_SEMAPHORE.get_or_init(|| { + let parallelism = std::thread::available_parallelism() + .map(usize::from) + .unwrap_or(1); + + crate::sync::bare_semaphore(parallelism * 2) + }) +} + #[tracing::instrument(skip_all)] async fn fetch_or_generate_details( old_repo: &OldSledRepo, @@ -353,8 +365,13 @@ async fn fetch_or_generate_details( if let Some(details) = details_opt { Ok(details) } else { - Details::from_store(store, identifier, config.media.process_timeout) - .await - .map_err(From::from) + let bytes_stream = store.to_bytes(identifier, None, None).await?; + let bytes = bytes_stream.into_bytes(); + + let guard = details_semaphore().acquire().await?; + let details = Details::from_bytes(config.media.process_timeout, bytes).await?; + drop(guard); + + Ok(details) } } diff --git a/src/store.rs b/src/store.rs index b6f4d1e..af411b4 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,9 +1,10 @@ use actix_web::web::Bytes; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; +use streem::IntoStreamer; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::{error_code::ErrorCode, stream::LocalBoxStream}; +use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream}; pub(crate) mod file_store; pub(crate) mod object_store; @@ -22,6 +23,9 @@ pub(crate) enum StoreError { #[error("Error in 0.4 DB")] Repo04(#[from] crate::repo_04::RepoError), + #[error("Error reading bytes stream")] + ReadStream(#[source] std::io::Error), + #[error("Requested file is not found")] FileNotFound(#[source] std::io::Error), @@ -35,6 +39,7 @@ impl StoreError { Self::FileStore(e) => e.error_code(), Self::ObjectStore(e) => e.error_code(), Self::Repo(e) => e.error_code(), + Self::ReadStream(_) => ErrorCode::IO_ERROR, Self::Repo04(_) => ErrorCode::OLD_REPO_ERROR, Self::FileNotFound(_) | Self::ObjectNotFound(_) => ErrorCode::NOT_FOUND, } @@ -112,6 +117,26 @@ pub(crate) trait Store: Clone + Debug { len: Option, ) -> Result>, StoreError>; + async fn to_bytes( + &self, + identifier: &Arc, + from_start: Option, + len: Option, + ) -> Result { + let mut buf = BytesStream::new(); + + let mut streamer = self + .to_stream(identifier, from_start, len) + .await? + .into_streamer(); + + while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? { + buf.add_bytes(bytes); + } + + Ok(buf) + } + async fn read_into( &self, identifier: &Arc,