diff --git a/Cargo.lock b/Cargo.lock index 33fed7a..3a45ff7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1643,6 +1643,7 @@ dependencies = [ "tokio", "tokio-uring", "tokio-util 0.7.0", + "toml", "tracing", "tracing-actix-web", "tracing-awc", diff --git a/Cargo.toml b/Cargo.toml index 4857e63..0ed1dec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ time = { version = "0.3.0", features = ["serde"] } tokio = { version = "1", features = ["full", "tracing"] } tokio-uring = { version = "0.3", optional = true, features = ["bytes"] } tokio-util = { version = "0.7", default-features = false, features = ["codec"] } +toml = "0.5.8" tracing = "0.1.15" tracing-error = "0.2.0" tracing-futures = "0.2.4" diff --git a/README.md b/README.md index dfb7266..c45163a 100644 --- a/README.md +++ b/README.md @@ -9,83 +9,145 @@ _a simple image hosting service_ ## Usage ### Running ``` -pict-rs 0.3.0-rc.7 +pict-rs USAGE: - pict-rs [FLAGS] [OPTIONS] [SUBCOMMAND] - -FLAGS: - -h, --help Prints help information - -s, --skip-validate-imports Whether to skip validating images uploaded via the internal import API - -V, --version Prints version information + pict-rs [OPTIONS] OPTIONS: - -a, --addr The address and port the server binds to. - --api-key + -a, --addr + The address and port the server binds to. + + --api-key An optional string to be checked on requests to privileged endpoints - -c, --config-file Path to the pict-rs configuration file - --console-buffer-capacity + -c, --config-file + Path to the pict-rs configuration file + + --console-buffer-capacity Specify the number of events the console subscriber is allowed to buffer - -f, --filters ... - An optional list of filters to permit, supports 'identity', 'thumbnail', 'resize', 'crop', and 'blur' + -f, --filters + An optional list of filters to permit, supports 'identity', 'thumbnail', 'resize', + 'crop', and 'blur' - -i, --image-format - An optional image format to convert all uploaded files into, supports 'jpg', 'png', and 'webp' + --filesystem-storage-path + Path in which pict-rs will create it's 'files' directory - -m, --max-file-size + -h, --help + Print help information + + -i, --image-format + An optional image format to convert all uploaded files into, supports 'jpg', 'png', and + 'webp' + + -m, --max-file-size Specify the maximum allowed uploaded file size (in Megabytes) - --max-image-area Specify the maximum area in pixels allowed in an image - --max-image-height Specify the maximum width in pixels allowed on an image - --max-image-width Specify the maximum width in pixels allowed on an image - --migrate-file Path to a file defining a store migration - -o, --opentelemetry-url + --max-image-area + Specify the maximum area in pixels allowed in an image + + --max-image-height + Specify the maximum width in pixels allowed on an image + + --max-image-width + Specify the maximum width in pixels allowed on an image + + -o, --opentelemetry-url Enable OpenTelemetry Tracing exports to the given OpenTelemetry collector - -p, --path The path to the data directory, e.g. data/ - --sled-cache-capacity - Specify the number of bytes sled is allowed to use for it's cache + --object-store-access-key + + --object-store-bucket-name + Name of the bucket in which pict-rs will store images + + --object-store-region + Region in which the bucket exists, can be an http endpoint + + --object-store-secret-key + + + --object-store-security-token + + + --object-store-session-token + + + -p, --path + The path to the data directory, e.g. data/ + + -R, --repo + Set the database implementation. Available options are 'sled'. Default is 'sled' + + -s, --skip-validate-imports + Whether to skip validating images uploaded via the internal import API + + -S, --store + Set the image store. Available options are 'object-storage' or 'filesystem'. Default is + 'filesystem' + + --sled-cache-capacity + The number of bytes sled is allowed to use for it's in-memory cache + + --sled-path + Path in which pict-rs will create it's 'repo' directory SUBCOMMANDS: - file-store - help Prints this message or the help of the given subcommand(s) - s3-store + dump + help Print this message or the help of the given subcommand(s) + migrate-repo + migrate-store + run ``` ``` -pict-rs-file-store 0.3.0-rc.1 +pict-rs-dump USAGE: - pict-rs file-store [OPTIONS] + pict-rs dump -FLAGS: - -h, --help Prints help information - -V, --version Prints version information +ARGS: + OPTIONS: - --path Path in which pict-rs will create it's 'files' directory + -h, --help Print help information ``` ``` -pict-rs-s3-store 0.3.0-rc.1 +pict-rs-migrate-repo USAGE: - pict-rs s3-store [OPTIONS] --bucket-name --region + pict-rs migrate-repo -FLAGS: - -h, --help Prints help information - -V, --version Prints version information +ARGS: + OPTIONS: - --access-key - --bucket-name Name of the bucket in which pict-rs will store images - --region Region in which the bucket exists, can be an http endpoint - --secret-key - --security-token - --session-token + -h, --help Print help information +``` + +``` +pict-rs-migrate-store + +USAGE: + pict-rs migrate-store + +ARGS: + + +OPTIONS: + -h, --help Print help information +``` + +``` +pict-rs-run + +USAGE: + pict-rs run + +OPTIONS: + -h, --help Print help information ``` See [`pict-rs.toml`](https://git.asonix.dog/asonix/pict-rs/src/branch/main/pict-rs.toml) and @@ -95,23 +157,27 @@ configuration #### Example: Running on all interfaces, port 8080, storing data in /opt/data ``` -$ ./pict-rs -a 0.0.0.0:8080 -p /opt/data +$ ./pict-rs -a 0.0.0.0:8080 -p /opt/data run ``` Running locally, port 9000, storing data in data/, and converting all uploads to PNG ``` -$ ./pict-rs -a 127.0.0.1:9000 -p data/ -f png +$ ./pict-rs -a 127.0.0.1:9000 -p data/ -f png run ``` Running locally, port 8080, storing data in data/, and only allowing the `thumbnail` and `identity` filters ``` -$ ./pict-rs -a 127.0.0.1:8080 -p data/ -w thumbnail identity +$ ./pict-rs -a 127.0.0.1:8080 -p data/ -w thumbnail identity run ``` Running from a configuration file ``` -$ ./pict-rs -c ./pict-rs.toml +$ ./pict-rs -c ./pict-rs.toml run ``` -Migrating between storage backends +Migrating to object storage from filesystem storage (both storages must be configured in pict-rs.toml) ``` -$ ./pict-rs -p ./data --migrate-file ./migrate.toml +$ ./pict-rs -c ./pict-rs.toml --store filesystem migrate-store object-storage +``` +Dumping commandline flags to a toml file +``` +$ ./pict-rs -p data/ --store object-storage --object-storage-bucket-name pict-rs --object-storage-region us-east-1 dump pict-rs.toml ``` #### Docker diff --git a/docker/object-storage/Dockerfile b/docker/object-storage/Dockerfile index 9b07035..fcb932e 100644 --- a/docker/object-storage/Dockerfile +++ b/docker/object-storage/Dockerfile @@ -1,25 +1,17 @@ -FROM archlinux:latest +FROM alpine:edge ARG UID=1000 ARG GID=1000 RUN \ - pacman -Syu --noconfirm \ - perl-image-exiftool \ - imagemagick \ - ffmpeg && \ - groupadd -g 1000 app && \ - useradd -m \ - -d /opt/app \ - -u $UID \ - -g $GID \ - app + apk add exiftool imagemagick ffmpeg && \ + addgroup -g $GID app && \ + adduser -h /opt/app -g "" -G app -u $UID -D app && \ + chown -R app:app /mnt COPY root/ / COPY ./pict-rs.toml /etc/pict-rs.toml -ENV PATH=$PATH:/usr/bin/vendor_perl - WORKDIR /opt/app USER app diff --git a/docker/object-storage/pict-rs.toml b/docker/object-storage/pict-rs.toml index 29c3c9d..04fc42a 100644 --- a/docker/object-storage/pict-rs.toml +++ b/docker/object-storage/pict-rs.toml @@ -1,9 +1,17 @@ -path = '/mnt' +path = 'data/' addr = '0.0.0.0:8080' -[store] -type = 's3_store' -bucket_name = 'pict-rs' -region = 'http://minio:9000' -access_key = '09ODZ3BGBISV4U92JLIM' -secret_key = 'j35YE9RrxhBP0dpiD5mmdXRXvPkEJR4k6zK12q3o' +repo = 'sled' +store = 'object_storage' + +[sled] +sled_cache_capacity = 67108864 + +[filesystem_storage] +filesystem_storage_path = '/mnt/files' + +[object_storage] +object_store_bucket_name = 'pict-rs' +object_store_region = 'http://minio:9000' +object_store_access_key = 'XZEZ5B8Y3UCINU1KCVF6' +object_store_secret_key = 'cWbE5LcCK9YH8j1NvhOZocl+vH+b6T5Zvy3z+BZu' diff --git a/docker/object-storage/root/usr/lib/ImageMagick-7.1.0/config-Q16HDRI/policy.xml b/docker/object-storage/root/usr/lib/ImageMagick-7.1.0/config-Q16HDRI/policy.xml index 06166a9..9586799 100644 --- a/docker/object-storage/root/usr/lib/ImageMagick-7.1.0/config-Q16HDRI/policy.xml +++ b/docker/object-storage/root/usr/lib/ImageMagick-7.1.0/config-Q16HDRI/policy.xml @@ -8,12 +8,14 @@ + + + - - + diff --git a/docker/prod/root/usr/lib/ImageMagick-7.1.0/config-Q16HDRI/policy.xml b/docker/prod/root/usr/lib/ImageMagick-7.1.0/config-Q16HDRI/policy.xml index 06166a9..9586799 100644 --- a/docker/prod/root/usr/lib/ImageMagick-7.1.0/config-Q16HDRI/policy.xml +++ b/docker/prod/root/usr/lib/ImageMagick-7.1.0/config-Q16HDRI/policy.xml @@ -8,12 +8,14 @@ + + + - - + diff --git a/pict-rs.toml b/pict-rs.toml index 645d002..16c3182 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -43,13 +43,6 @@ max_image_area = 40_000_000 # in Pixels # default: false skip_validate_imports = false -## Optional: set sled's cache capacity to a given number of bytes -# environment variable: PICTRS_SLED_CACHE_CAPACITY -# default: 67_108_864 (1024 * 1024 * 64) e.g. 64MB -# -# Increasing this value can improve performance by keeping more of the database in RAM -sled_cache_capacity = 67_108_864 # in bytes - ## Optional: enable tokio-console and set the event buffer size # environment variable: PICTRS_CONSOLE_BUFFER_CAPACITY # default: empty @@ -95,58 +88,65 @@ api_key = 'API_KEY' # Not specifying opentelemetry_url means no traces will be exported opentelemetry_url = 'http://localhost:4317/' -## Optional: store definition -# default store: file_store -# -# Not specifying a store means a file_store will be used with the top-level pict-rs' path -[store] -type = "file_store" +## Optional: the data repository to use +# environment variable: PICTRS_REPO +# default: 'sled' +# available options: 'sled' +repo = 'sled' -## Example file store -# [store] -# -# # environment variable: PICTRS_STORE__TYPE -# type = 'file_store' -# -# # Optional: file path -# # environment variable: PICTRS_STORE__PATH -# # default: empty -# # -# # Not specifying path means pict-rs' top-level `path` config is used -# path = './data' +## Optional: the file storage to use +# environment variable: PICTRS_STORE +# default: 'filesystem' +# available options: 'filesystem', 'object_storage' +store = 'filesystem' -## Example s3 store -# [store] + +## Optional: Sled store configration definition +[sled] +## Optional: set sled's cache capacity to a given number of bytes +# environment variable: PICTRS_SLED__SLED_CACHE_CAPACITY +# default: 67_108_864 (1024 * 1024 * 64) e.g. 64MB # -# # environment variable: PICTRS_STORE__TYPE -# type = 's3_store' +# Increasing this value can improve performance by keeping more of the database in RAM +sled_cache_capacity = 67_108_864 # in bytes + + +## Optional: Filesystem storage configuration +[filesystem_storage] +## Optional: set the path for pict-rs filesystem file storage +# environment variable: PICTRS_FILESYSTEM_STORAGE__FILESYSTEM_STORAGE_PATH +# default '${path}/files' +filesystem_storage_path = 'data/files' + + +## Optional: Object Storage configuration +[object_storage] +## Required: bucket name +# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_BUCKET_NAME +object_store_bucket_name = 'pict-rs' + +## Required: bucket region +# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_REGION # -# # Required: bucket name -# # environment variable: PICTRS_STORE__BUCKET_NAME -# bucket_name = 'rust_s3' -# -# # Required: bucket region -# # environment variable: PICTRS_STORE__REGION -# # -# # can also be endpoint of local s3 store, e.g. 'http://minio:9000' -# region = 'eu-central-1' -# -# # Optional: bucket access key -# # environment variable: PICTRS_STORE__ACCESS_KEY -# # default: empty -# access_key = 'ACCESS_KEY' -# -# # Optional: bucket secret key -# # environment variable: PICTRS_STORE__SECRET_KEY -# # default: empty -# secret_key = 'SECRET_KEY' -# -# # Optional: bucket security token -# # environment variable: PICTRS_STORE__SECURITY_TOKEN -# # default: empty -# security_token = 'SECURITY_TOKEN' -# -# # Optional: bucket session token -# # environment variable: PICTRS_STORE__SESSION_TOKEN -# # default: empty -# session_token = 'SESSION_TOKEN' +# can also be endpoint of local s3 store, e.g. 'http://minio:9000' +object_store_region = 'eu-central-1' + +## Optional: bucket access key +# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_ACCESS_KEY +# default: empty +object_store_access_key = '09ODZ3BGBISV4U92JLIM' + +## Optional: bucket secret key +# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_SECRET_KEY +# default: empty +object_store_secret_key = 'j35YE9RrxhBP0dpiD5mmdXRXvPkEJR4k6zK12q3o' + +## Optional: bucket security token +# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_SECURITY_TOKEN +# default: empty +object_store_security_token = 'SECURITY_TOKEN' + +## Optional: bucket session token +# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_SESSION_TOKEN +# default: empty +object_store_session_token = 'SESSION_TOKEN' diff --git a/src/config.rs b/src/config.rs index 1c68b96..c7cb2be 100644 --- a/src/config.rs +++ b/src/config.rs @@ -170,12 +170,16 @@ impl Overrides { #[serde(tag = "type")] pub(crate) enum Command { Run, + Dump { path: PathBuf }, MigrateStore { to: Store }, MigrateRepo { to: Repo }, } pub(crate) enum CommandConfig { Run, + Dump { + path: PathBuf, + }, MigrateStore { to: Storage, }, @@ -287,7 +291,6 @@ pub(crate) enum Repository { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] pub(crate) struct Config { - command: Command, skip_validate_imports: bool, addr: SocketAddr, path: PathBuf, @@ -301,8 +304,9 @@ pub(crate) struct Config { api_key: Option, opentelemetry_url: Option, repo: Repo, - sled: Option, store: Store, + command: Command, + sled: Option, filesystem_storage: Option, object_storage: Option, } @@ -329,9 +333,9 @@ struct SledDefaults { } impl Defaults { - fn new() -> Self { + fn new(command: Command) -> Self { Defaults { - command: Command::Run, + command, skip_validate_imports: false, addr: ([0, 0, 0, 0], 8080).into(), max_file_size: 40, @@ -351,8 +355,8 @@ impl Config { pub(crate) fn build() -> anyhow::Result { let args = Args::parse(); - let mut base_config = - config::Config::builder().add_source(config::Config::try_from(&Defaults::new())?); + let mut base_config = config::Config::builder() + .add_source(config::Config::try_from(&Defaults::new(args.command))?); if let Some(path) = args.config_file { base_config = base_config.add_source(config::File::from(path)); @@ -375,6 +379,7 @@ impl Config { pub(crate) fn command(&self) -> anyhow::Result { Ok(match &self.command { Command::Run => CommandConfig::Run, + Command::Dump { path } => CommandConfig::Dump { path: path.clone() }, Command::MigrateStore { to } => CommandConfig::MigrateStore { to: match to { Store::ObjectStorage => Storage::ObjectStorage( diff --git a/src/details.rs b/src/details.rs index 0730689..1a9e9b8 100644 --- a/src/details.rs +++ b/src/details.rs @@ -34,10 +34,7 @@ impl Details { store: S, identifier: S::Identifier, expected_format: Option, - ) -> Result - where - Error: From, - { + ) -> Result { let details = crate::magick::details_store(store, identifier, expected_format).await?; Ok(Details::now( diff --git a/src/error.rs b/src/error.rs index faca216..bfb336b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -15,6 +15,20 @@ impl std::fmt::Debug for Error { impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "{}", self.kind)?; + writeln!(f)?; + let mut count = 0; + let mut source = std::error::Error::source(self); + if source.is_some() { + writeln!(f, "Chain:")?; + } + while let Some(err) = source { + write!(f, "{}. ", count)?; + writeln!(f, "{}", err)?; + + count += 1; + source = std::error::Error::source(err); + } + std::fmt::Display::fmt(&self.context, f) } } @@ -43,7 +57,7 @@ pub(crate) enum UploadError { Upload(#[from] actix_form_data::Error), #[error("Error in DB")] - Sled(#[from] crate::repo::sled::Error), + Sled(#[from] crate::repo::sled::SledError), #[error("Error in old sled DB")] OldSled(#[from] ::sled::Error), diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index 1ec9c6a..3dd9124 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -44,7 +44,7 @@ impl ThumbnailFormat { fn as_format(&self) -> &'static str { match self { - ThumbnailFormat::Jpeg => "singlejpeg", + ThumbnailFormat::Jpeg => "image2", // ThumbnailFormat::Webp => "webp", } } @@ -101,10 +101,7 @@ pub(crate) async fn thumbnail( from: S::Identifier, input_format: InputFormat, format: ThumbnailFormat, -) -> Result -where - Error: From, -{ +) -> Result { let input_file = crate::tmp_file::tmp_file(Some(input_format.to_ext())); let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; crate::store::file_store::safe_create_parent(&input_file).await?; diff --git a/src/magick.rs b/src/magick.rs index 772b333..1d9f3c6 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -139,14 +139,12 @@ pub(crate) async fn details_bytes( parse_details(s) } +#[tracing::instrument(skip(store))] pub(crate) async fn details_store( store: S, identifier: S::Identifier, hint: Option, -) -> Result -where - Error: From, -{ +) -> Result { if hint.as_ref().map(|h| h.is_mp4()).unwrap_or(false) { let input_file = crate::tmp_file::tmp_file(Some(".mp4")); let input_file_str = input_file.to_str().ok_or(UploadError::Path)?; @@ -182,6 +180,7 @@ where parse_details(s) } +#[tracing::instrument] pub(crate) async fn details_file(path_str: &str) -> Result { let process = Process::run( "magick", diff --git a/src/main.rs b/src/main.rs index d7acc11..88daa92 100644 --- a/src/main.rs +++ b/src/main.rs @@ -77,10 +77,7 @@ async fn upload( value: Value>, manager: web::Data, store: web::Data, -) -> Result -where - Error: From, -{ +) -> Result { let images = value .map() .and_then(|mut m| m.remove("images")) @@ -196,10 +193,7 @@ async fn download( manager: web::Data, store: web::Data, query: web::Query, -) -> Result -where - Error: From, -{ +) -> Result { let res = client.get(&query.url).send().await?; if !res.status().is_success() { @@ -249,14 +243,11 @@ async fn delete( manager: web::Data, store: web::Data, path_entries: web::Path<(String, String)>, -) -> Result -where - Error: From, -{ - let (alias, token) = path_entries.into_inner(); +) -> Result { + let (token, alias) = path_entries.into_inner(); - let alias = Alias::from_existing(&alias); let token = DeleteToken::from_existing(&token); + let alias = Alias::from_existing(&alias); manager.delete((**store).clone(), alias, token).await?; @@ -314,10 +305,7 @@ async fn process_details( manager: web::Data, store: web::Data, filters: web::Data>>, -) -> Result -where - Error: From, -{ +) -> Result { let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?; let identifier = manager @@ -341,10 +329,7 @@ async fn process( manager: web::Data, store: web::Data, filters: web::Data>>, -) -> Result -where - Error: From, -{ +) -> Result { let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str(), &filters)?; @@ -468,10 +453,7 @@ async fn details( alias: web::Path, manager: web::Data, store: web::Data, -) -> Result -where - Error: From, -{ +) -> Result { let alias = alias.into_inner(); let alias = Alias::from_existing(&alias); @@ -498,10 +480,7 @@ async fn serve( alias: web::Path, manager: web::Data, store: web::Data, -) -> Result -where - Error: From, -{ +) -> Result { let alias = alias.into_inner(); let alias = Alias::from_existing(&alias); let identifier = manager.identifier_from_alias::(&alias).await?; @@ -525,10 +504,7 @@ async fn ranged_file_resp( identifier: S::Identifier, range: Option>, details: Details, -) -> Result -where - Error: From, -{ +) -> Result { let (builder, stream) = if let Some(web::Header(range_header)) = range { //Range header exists - return as ranged if let Some(range) = range::single_bytes_range(&range_header) { @@ -602,10 +578,7 @@ async fn purge( query: web::Query, upload_manager: web::Data, store: web::Data, -) -> Result -where - Error: From, -{ +) -> Result { let alias = Alias::from_existing(&query.alias); let aliases = upload_manager.aliases_by_alias(&alias).await?; @@ -626,10 +599,7 @@ async fn aliases( query: web::Query, upload_manager: web::Data, store: web::Data, -) -> Result -where - Error: From, -{ +) -> Result { let alias = Alias::from_existing(&query.alias); let aliases = upload_manager.aliases_by_alias(&alias).await?; @@ -658,11 +628,10 @@ fn build_reqwest_client() -> reqwest::Result { .build() } -async fn launch(manager: UploadManager, store: S) -> anyhow::Result<()> -where - S::Error: Unpin, - Error: From, -{ +async fn launch( + manager: UploadManager, + store: S, +) -> anyhow::Result<()> { // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it @@ -797,7 +766,6 @@ async fn migrate_inner( ) -> anyhow::Result<()> where S1: Store, - Error: From, { match to { config::Storage::Filesystem(RequiredFilesystemStorage { path }) => { @@ -848,6 +816,11 @@ async fn main() -> anyhow::Result<()> { match CONFIG.command()? { CommandConfig::Run => (), + CommandConfig::Dump { path } => { + let configuration = toml::to_string_pretty(&*CONFIG)?; + tokio::fs::write(path, configuration).await?; + return Ok(()); + } CommandConfig::MigrateRepo { to: _ } => { unimplemented!("Repo migrations are currently unsupported") } diff --git a/src/range.rs b/src/range.rs index e5d9efa..51066e9 100644 --- a/src/range.rs +++ b/src/range.rs @@ -15,7 +15,9 @@ pub(crate) fn chop_bytes( length: u64, ) -> Result>, Error> { if let Some((start, end)) = byte_range.to_satisfiable_range(length) { - return Ok(once(ready(Ok(bytes.slice(start as usize..end as usize))))); + // END IS INCLUSIVE + let end = end as usize + 1; + return Ok(once(ready(Ok(bytes.slice(start as usize..end))))); } Err(UploadError::Range.into()) @@ -26,14 +28,13 @@ pub(crate) async fn chop_store( store: &S, identifier: &S::Identifier, length: u64, -) -> Result>, Error> -where - Error: From, -{ +) -> Result>, Error> { if let Some((start, end)) = byte_range.to_satisfiable_range(length) { - return Ok(store + // END IS INCLUSIVE + let end = end + 1; + return store .to_stream(identifier, Some(start), Some(end.saturating_sub(start))) - .await?); + .await; } Err(UploadError::Range.into()) diff --git a/src/repo.rs b/src/repo.rs index b7fcf37..c1af53d 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,5 +1,9 @@ -use crate::config::RequiredSledRepo; -use crate::{config::Repository, details::Details, store::Identifier}; +use crate::{ + config::{Repository, RequiredSledRepo}, + details::Details, + error::Error, + store::Identifier, +}; use futures_util::Stream; use tracing::debug; use uuid::Uuid; @@ -34,98 +38,90 @@ pub(crate) struct AlreadyExists; #[async_trait::async_trait(?Send)] pub(crate) trait SettingsRepo { type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error + Send + Sync + 'static; - async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>; - async fn get(&self, key: &'static [u8]) -> Result, Self::Error>; - async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>; + async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error>; + async fn get(&self, key: &'static [u8]) -> Result, Error>; + async fn remove(&self, key: &'static [u8]) -> Result<(), Error>; } #[async_trait::async_trait(?Send)] pub(crate) trait IdentifierRepo { - type Error: std::error::Error + Send + Sync + 'static; - async fn relate_details( &self, identifier: &I, details: &Details, - ) -> Result<(), Self::Error>; - async fn details(&self, identifier: &I) -> Result, Self::Error>; + ) -> Result<(), Error>; + async fn details(&self, identifier: &I) -> Result, Error>; - async fn cleanup(&self, identifier: &I) -> Result<(), Self::Error>; + async fn cleanup(&self, identifier: &I) -> Result<(), Error>; } #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo { type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error + Send + Sync + 'static; - type Stream: Stream>; + type Stream: Stream>; async fn hashes(&self) -> Self::Stream; - async fn create(&self, hash: Self::Bytes) -> Result, Self::Error>; + async fn create(&self, hash: Self::Bytes) -> Result, Error>; - async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error>; - async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error>; - async fn aliases(&self, hash: Self::Bytes) -> Result, Self::Error>; + async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error>; + async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error>; + async fn aliases(&self, hash: Self::Bytes) -> Result, Error>; async fn relate_identifier( &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Self::Error>; - async fn identifier( - &self, - hash: Self::Bytes, - ) -> Result; + ) -> Result<(), Error>; + async fn identifier(&self, hash: Self::Bytes) -> Result; async fn relate_variant_identifier( &self, hash: Self::Bytes, variant: String, identifier: &I, - ) -> Result<(), Self::Error>; + ) -> Result<(), Error>; async fn variant_identifier( &self, hash: Self::Bytes, variant: String, - ) -> Result, Self::Error>; + ) -> Result, Error>; async fn variants( &self, hash: Self::Bytes, - ) -> Result, Self::Error>; + ) -> Result, Error>; async fn relate_motion_identifier( &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Self::Error>; + ) -> Result<(), Error>; async fn motion_identifier( &self, hash: Self::Bytes, - ) -> Result, Self::Error>; + ) -> Result, Error>; - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>; + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error>; } #[async_trait::async_trait(?Send)] pub(crate) trait AliasRepo { type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error + Send + Sync + 'static; - async fn create(&self, alias: &Alias) -> Result, Self::Error>; + async fn create(&self, alias: &Alias) -> Result, Error>; async fn relate_delete_token( &self, alias: &Alias, delete_token: &DeleteToken, - ) -> Result, Self::Error>; - async fn delete_token(&self, alias: &Alias) -> Result; + ) -> Result, Error>; + async fn delete_token(&self, alias: &Alias) -> Result; - async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Self::Error>; - async fn hash(&self, alias: &Alias) -> Result; + async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error>; + async fn hash(&self, alias: &Alias) -> Result; - async fn cleanup(&self, alias: &Alias) -> Result<(), Self::Error>; + async fn cleanup(&self, alias: &Alias) -> Result<(), Error>; } impl Repo { @@ -389,16 +385,14 @@ impl std::fmt::Display for Alias { } impl Identifier for Vec { - type Error = std::convert::Infallible; - - fn from_bytes(bytes: Vec) -> Result + fn from_bytes(bytes: Vec) -> Result where Self: Sized, { Ok(bytes) } - fn to_bytes(&self) -> Result, Self::Error> { + fn to_bytes(&self) -> Result, Error> { Ok(self.clone()) } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index d7a3f2f..c1f323f 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -2,24 +2,24 @@ use super::{ Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier, IdentifierRepo, SettingsRepo, }; +use crate::error::Error; use sled::{Db, IVec, Tree}; macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); - actix_rt::task::spawn_blocking(move || $expr).await?? + actix_rt::task::spawn_blocking(move || $expr) + .await + .map_err(SledError::from)?? }}; } #[derive(Debug, thiserror::Error)] -pub(crate) enum Error { +pub(crate) enum SledError { #[error("Error in database")] Sled(#[from] sled::Error), - #[error("Invalid identifier")] - Identifier(#[source] Box), - #[error("Invalid details json")] Details(#[from] serde_json::Error), @@ -46,7 +46,7 @@ pub(crate) struct SledRepo { } impl SledRepo { - pub(crate) fn new(db: Db) -> Result { + pub(crate) fn new(db: Db) -> Result { Ok(SledRepo { settings: db.open_tree("pict-rs-settings-tree")?, identifier_details: db.open_tree("pict-rs-identifier-details-tree")?, @@ -66,36 +66,29 @@ impl SledRepo { #[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { type Bytes = IVec; - type Error = Error; - async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error> { + #[tracing::instrument(skip(value))] + async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error> { b!(self.settings, settings.insert(key, value)); Ok(()) } - async fn get(&self, key: &'static [u8]) -> Result, Self::Error> { + #[tracing::instrument] + async fn get(&self, key: &'static [u8]) -> Result, Error> { let opt = b!(self.settings, settings.get(key)); Ok(opt) } - async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error> { + #[tracing::instrument] + async fn remove(&self, key: &'static [u8]) -> Result<(), Error> { b!(self.settings, settings.remove(key)); Ok(()) } } -fn identifier_bytes(identifier: &I) -> Result, Error> -where - I: Identifier, -{ - identifier - .to_bytes() - .map_err(|e| Error::Identifier(Box::new(e))) -} - fn variant_key(hash: &[u8], variant: &str) -> Vec { let mut bytes = hash.to_vec(); bytes.push(b'/'); @@ -111,14 +104,13 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { #[async_trait::async_trait(?Send)] impl IdentifierRepo for SledRepo { - type Error = Error; - + #[tracing::instrument] async fn relate_details( &self, identifier: &I, details: &Details, - ) -> Result<(), Self::Error> { - let key = identifier_bytes(identifier)?; + ) -> Result<(), Error> { + let key = identifier.to_bytes()?; let details = serde_json::to_vec(&details)?; b!( @@ -129,8 +121,9 @@ impl IdentifierRepo for SledRepo { Ok(()) } - async fn details(&self, identifier: &I) -> Result, Self::Error> { - let key = identifier_bytes(identifier)?; + #[tracing::instrument] + async fn details(&self, identifier: &I) -> Result, Error> { + let key = identifier.to_bytes()?; let opt = b!(self.identifier_details, identifier_details.get(key)); @@ -141,8 +134,9 @@ impl IdentifierRepo for SledRepo { } } - async fn cleanup(&self, identifier: &I) -> Result<(), Self::Error> { - let key = identifier_bytes(identifier)?; + #[tracing::instrument] + async fn cleanup(&self, identifier: &I) -> Result<(), Error> { + let key = identifier.to_bytes()?; b!(self.identifier_details, identifier_details.remove(key)); @@ -192,7 +186,13 @@ impl futures_util::Stream for HashStream { (iter, opt) }) .await - .map(|(iter, opt)| (iter, opt.map(|res| res.map_err(Error::from)))) + .map(|(iter, opt)| { + ( + iter, + opt.map(|res| res.map_err(SledError::from).map_err(Error::from)), + ) + }) + .map_err(SledError::from) .map_err(Error::from) }); @@ -213,7 +213,6 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec { #[async_trait::async_trait(?Send)] impl HashRepo for SledRepo { type Bytes = IVec; - type Error = Error; type Stream = HashStream; async fn hashes(&self) -> Self::Stream { @@ -225,7 +224,8 @@ impl HashRepo for SledRepo { } } - async fn create(&self, hash: Self::Bytes) -> Result, Self::Error> { + #[tracing::instrument] + async fn create(&self, hash: Self::Bytes) -> Result, Error> { let res = b!(self.hashes, { let hash2 = hash.clone(); hashes.compare_and_swap(hash, None as Option, Some(hash2)) @@ -234,7 +234,8 @@ impl HashRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error> { + #[tracing::instrument] + async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { let key = hash_alias_key(&hash, alias); let value = alias.to_bytes(); @@ -243,7 +244,8 @@ impl HashRepo for SledRepo { Ok(()) } - async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error> { + #[tracing::instrument] + async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { let key = hash_alias_key(&hash, alias); b!(self.hash_aliases, hash_aliases.remove(key)); @@ -251,7 +253,8 @@ impl HashRepo for SledRepo { Ok(()) } - async fn aliases(&self, hash: Self::Bytes) -> Result, Self::Error> { + #[tracing::instrument] + async fn aliases(&self, hash: Self::Bytes) -> Result, Error> { let v = b!(self.hash_aliases, { Ok(hash_aliases .scan_prefix(hash) @@ -264,37 +267,37 @@ impl HashRepo for SledRepo { Ok(v) } + #[tracing::instrument] async fn relate_identifier( &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Self::Error> { - let bytes = identifier_bytes(identifier)?; + ) -> Result<(), Error> { + let bytes = identifier.to_bytes()?; b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes)); Ok(()) } - async fn identifier( - &self, - hash: Self::Bytes, - ) -> Result { + #[tracing::instrument] + async fn identifier(&self, hash: Self::Bytes) -> Result { let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); - opt.ok_or(Error::Missing).and_then(|ivec| { - I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e))) - }) + opt.ok_or(SledError::Missing) + .map_err(Error::from) + .and_then(|ivec| I::from_bytes(ivec.to_vec())) } + #[tracing::instrument] async fn relate_variant_identifier( &self, hash: Self::Bytes, variant: String, identifier: &I, - ) -> Result<(), Self::Error> { + ) -> Result<(), Error> { let key = variant_key(&hash, &variant); - let value = identifier_bytes(identifier)?; + let value = identifier.to_bytes()?; b!( self.hash_variant_identifiers, @@ -304,11 +307,12 @@ impl HashRepo for SledRepo { Ok(()) } + #[tracing::instrument] async fn variant_identifier( &self, hash: Self::Bytes, variant: String, - ) -> Result, Self::Error> { + ) -> Result, Error> { let key = variant_key(&hash, &variant); let opt = b!( @@ -317,18 +321,17 @@ impl HashRepo for SledRepo { ); if let Some(ivec) = opt { - Ok(Some( - I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))?, - )) + Ok(Some(I::from_bytes(ivec.to_vec())?)) } else { Ok(None) } } + #[tracing::instrument] async fn variants( &self, hash: Self::Bytes, - ) -> Result, Self::Error> { + ) -> Result, Error> { let vec = b!( self.hash_variant_identifiers, Ok(hash_variant_identifiers @@ -346,12 +349,13 @@ impl HashRepo for SledRepo { Ok(vec) } + #[tracing::instrument] async fn relate_motion_identifier( &self, hash: Self::Bytes, identifier: &I, - ) -> Result<(), Self::Error> { - let bytes = identifier_bytes(identifier)?; + ) -> Result<(), Error> { + let bytes = identifier.to_bytes()?; b!( self.hash_motion_identifiers, @@ -361,25 +365,25 @@ impl HashRepo for SledRepo { Ok(()) } + #[tracing::instrument] async fn motion_identifier( &self, hash: Self::Bytes, - ) -> Result, Self::Error> { + ) -> Result, Error> { let opt = b!( self.hash_motion_identifiers, hash_motion_identifiers.get(hash) ); if let Some(ivec) = opt { - Ok(Some( - I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))?, - )) + Ok(Some(I::from_bytes(ivec.to_vec())?)) } else { Ok(None) } } - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error> { + #[tracing::instrument] + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error> { let hash2 = hash.clone(); b!(self.hashes, hashes.remove(hash2)); @@ -426,9 +430,9 @@ impl HashRepo for SledRepo { #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { type Bytes = sled::IVec; - type Error = Error; - async fn create(&self, alias: &Alias) -> Result, Self::Error> { + #[tracing::instrument] + async fn create(&self, alias: &Alias) -> Result, Error> { let bytes = alias.to_bytes(); let bytes2 = bytes.clone(); @@ -440,11 +444,12 @@ impl AliasRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } + #[tracing::instrument] async fn relate_delete_token( &self, alias: &Alias, delete_token: &DeleteToken, - ) -> Result, Self::Error> { + ) -> Result, Error> { let key = alias.to_bytes(); let token = delete_token.to_bytes(); @@ -456,16 +461,19 @@ impl AliasRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - async fn delete_token(&self, alias: &Alias) -> Result { + #[tracing::instrument] + async fn delete_token(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)); opt.and_then(|ivec| DeleteToken::from_slice(&ivec)) - .ok_or(Error::Missing) + .ok_or(SledError::Missing) + .map_err(Error::from) } - async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Self::Error> { + #[tracing::instrument] + async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> { let key = alias.to_bytes(); b!(self.alias_hashes, alias_hashes.insert(key, hash)); @@ -473,15 +481,17 @@ impl AliasRepo for SledRepo { Ok(()) } - async fn hash(&self, alias: &Alias) -> Result { + #[tracing::instrument] + async fn hash(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); let opt = b!(self.alias_hashes, alias_hashes.get(key)); - opt.ok_or(Error::Missing) + opt.ok_or(SledError::Missing).map_err(Error::from) } - async fn cleanup(&self, alias: &Alias) -> Result<(), Self::Error> { + #[tracing::instrument] + async fn cleanup(&self, alias: &Alias) -> Result<(), Error> { let key = alias.to_bytes(); let key2 = key.clone(); @@ -502,8 +512,8 @@ impl std::fmt::Debug for SledRepo { } } -impl From for Error { +impl From for SledError { fn from(_: actix_rt::task::JoinError) -> Self { - Error::Panic + SledError::Panic } } diff --git a/src/store.rs b/src/store.rs index 668950b..fea8374 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,43 +1,37 @@ -use std::fmt::Debug; - +use crate::error::Error; use actix_web::web::Bytes; use futures_util::stream::Stream; +use std::fmt::Debug; use tokio::io::{AsyncRead, AsyncWrite}; pub(crate) mod file_store; pub(crate) mod object_store; pub(crate) trait Identifier: Send + Sync + Clone + Debug { - type Error: std::error::Error + Send + Sync + 'static; + fn to_bytes(&self) -> Result, Error>; - fn to_bytes(&self) -> Result, Self::Error>; - - fn from_bytes(bytes: Vec) -> Result + fn from_bytes(bytes: Vec) -> Result where Self: Sized; } #[async_trait::async_trait(?Send)] pub(crate) trait Store: Send + Sync + Clone + Debug + 'static { - type Error: std::error::Error + Send + Sync + 'static; - type Identifier: Identifier; + type Identifier: Identifier; type Stream: Stream>; - async fn save_async_read( - &self, - reader: &mut Reader, - ) -> Result + async fn save_async_read(&self, reader: &mut Reader) -> Result where Reader: AsyncRead + Unpin; - async fn save_bytes(&self, bytes: Bytes) -> Result; + async fn save_bytes(&self, bytes: Bytes) -> Result; async fn to_stream( &self, identifier: &Self::Identifier, from_start: Option, len: Option, - ) -> Result; + ) -> Result; async fn read_into( &self, @@ -47,7 +41,7 @@ pub(crate) trait Store: Send + Sync + Clone + Debug + 'static { where Writer: AsyncWrite + Send + Unpin; - async fn len(&self, identifier: &Self::Identifier) -> Result; + async fn len(&self, identifier: &Self::Identifier) -> Result; - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error>; + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error>; } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index c923731..4145d98 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -1,4 +1,5 @@ use crate::{ + error::Error, file::File, repo::{Repo, SettingsRepo}, store::Store, @@ -23,9 +24,6 @@ const GENERATOR_KEY: &[u8] = b"last-path"; #[derive(Debug, thiserror::Error)] pub(crate) enum FileError { - #[error("Failed to interact with sled db")] - Sled(#[from] crate::repo::sled::Error), - #[error("Failed to read or write file")] Io(#[from] std::io::Error), @@ -51,15 +49,11 @@ pub(crate) struct FileStore { #[async_trait::async_trait(?Send)] impl Store for FileStore { - type Error = FileError; type Identifier = FileId; type Stream = Pin>>>; #[tracing::instrument(skip(reader))] - async fn save_async_read( - &self, - reader: &mut Reader, - ) -> Result + async fn save_async_read(&self, reader: &mut Reader) -> Result where Reader: AsyncRead + Unpin, { @@ -67,22 +61,22 @@ impl Store for FileStore { if let Err(e) = self.safe_save_reader(&path, reader).await { self.safe_remove_file(&path).await?; - return Err(e); + return Err(e.into()); } - self.file_id_from_path(path) + Ok(self.file_id_from_path(path)?) } #[tracing::instrument(skip(bytes))] - async fn save_bytes(&self, bytes: Bytes) -> Result { + async fn save_bytes(&self, bytes: Bytes) -> Result { let path = self.next_file().await?; if let Err(e) = self.safe_save_bytes(&path, bytes).await { self.safe_remove_file(&path).await?; - return Err(e); + return Err(e.into()); } - self.file_id_from_path(path) + Ok(self.file_id_from_path(path)?) } #[tracing::instrument] @@ -91,7 +85,7 @@ impl Store for FileStore { identifier: &Self::Identifier, from_start: Option, len: Option, - ) -> Result { + ) -> Result { let path = self.path_from_file_id(identifier); let stream = File::open(path) @@ -119,7 +113,7 @@ impl Store for FileStore { } #[tracing::instrument] - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Self::Identifier) -> Result { let path = self.path_from_file_id(identifier); let len = tokio::fs::metadata(path).await?.len(); @@ -128,7 +122,7 @@ impl Store for FileStore { } #[tracing::instrument] - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> { + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { let path = self.path_from_file_id(identifier); self.safe_remove_file(path).await?; @@ -138,7 +132,7 @@ impl Store for FileStore { } impl FileStore { - pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result { + pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result { let path_gen = init_generator(&repo).await?; Ok(FileStore { @@ -148,7 +142,7 @@ impl FileStore { }) } - async fn next_directory(&self) -> Result { + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); match self.repo { @@ -167,7 +161,7 @@ impl FileStore { Ok(target_path) } - async fn next_file(&self) -> Result { + async fn next_file(&self) -> Result { let target_path = self.next_directory().await?; let filename = uuid::Uuid::new_v4().to_string(); @@ -290,7 +284,7 @@ pub(crate) async fn safe_create_parent>(path: P) -> Result<(), Fi Ok(()) } -async fn init_generator(repo: &Repo) -> Result { +async fn init_generator(repo: &Repo) -> Result { match repo { Repo::Sled(sled_repo) => { if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { diff --git a/src/store/file_store/file_id.rs b/src/store/file_store/file_id.rs index e811466..bb7a3ec 100644 --- a/src/store/file_store/file_id.rs +++ b/src/store/file_store/file_id.rs @@ -1,6 +1,9 @@ -use crate::store::{ - file_store::{FileError, FileStore}, - Identifier, +use crate::{ + error::Error, + store::{ + file_store::{FileError, FileStore}, + Identifier, + }, }; use std::path::PathBuf; @@ -8,9 +11,7 @@ use std::path::PathBuf; pub(crate) struct FileId(PathBuf); impl Identifier for FileId { - type Error = FileError; - - fn to_bytes(&self) -> Result, Self::Error> { + fn to_bytes(&self) -> Result, Error> { let vec = self .0 .to_str() @@ -21,7 +22,7 @@ impl Identifier for FileId { Ok(vec) } - fn from_bytes(bytes: Vec) -> Result + fn from_bytes(bytes: Vec) -> Result where Self: Sized, { diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 8fea267..4a30165 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,4 +1,5 @@ use crate::{ + error::Error, repo::{Repo, SettingsRepo}, store::Store, }; @@ -28,9 +29,6 @@ pub(crate) enum ObjectError { #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), - #[error("Failed to interact with sled repo")] - Sled(#[from] crate::repo::sled::Error), - #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), @@ -58,15 +56,11 @@ pin_project_lite::pin_project! { #[async_trait::async_trait(?Send)] impl Store for ObjectStore { - type Error = ObjectError; type Identifier = ObjectId; type Stream = Pin>>>; #[tracing::instrument(skip(reader))] - async fn save_async_read( - &self, - reader: &mut Reader, - ) -> Result + async fn save_async_read(&self, reader: &mut Reader) -> Result where Reader: AsyncRead + Unpin, { @@ -74,16 +68,20 @@ impl Store for ObjectStore { self.bucket .put_object_stream(&self.client, reader, &path) - .await?; + .await + .map_err(ObjectError::from)?; Ok(ObjectId::from_string(path)) } #[tracing::instrument(skip(bytes))] - async fn save_bytes(&self, bytes: Bytes) -> Result { + async fn save_bytes(&self, bytes: Bytes) -> Result { let path = self.next_file().await?; - self.bucket.put_object(&self.client, &path, &bytes).await?; + self.bucket + .put_object(&self.client, &path, &bytes) + .await + .map_err(ObjectError::from)?; Ok(ObjectId::from_string(path)) } @@ -94,7 +92,7 @@ impl Store for ObjectStore { identifier: &Self::Identifier, from_start: Option, len: Option, - ) -> Result { + ) -> Result { let path = identifier.as_str(); let start = from_start.unwrap_or(0); @@ -107,7 +105,7 @@ impl Store for ObjectStore { Command::GetObjectRange { start, end }, ); - let response = request.response().await?; + let response = request.response().await.map_err(ObjectError::from)?; Ok(Box::pin(io_error(response.bytes_stream()))) } @@ -126,26 +124,34 @@ impl Store for ObjectStore { self.bucket .get_object_stream(&self.client, path, writer) .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Self::Error::from(e)))?; + .map_err(ObjectError::from) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Error::from(e)))?; Ok(()) } #[tracing::instrument] - async fn len(&self, identifier: &Self::Identifier) -> Result { + async fn len(&self, identifier: &Self::Identifier) -> Result { let path = identifier.as_str(); - let (head, _) = self.bucket.head_object(&self.client, path).await?; + let (head, _) = self + .bucket + .head_object(&self.client, path) + .await + .map_err(ObjectError::from)?; let length = head.content_length.ok_or(ObjectError::Length)?; Ok(length as u64) } #[tracing::instrument] - async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> { + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { let path = identifier.as_str(); - self.bucket.delete_object(&self.client, path).await?; + self.bucket + .delete_object(&self.client, path) + .await + .map_err(ObjectError::from)?; Ok(()) } } @@ -161,7 +167,7 @@ impl ObjectStore { session_token: Option, repo: Repo, client: reqwest::Client, - ) -> Result { + ) -> Result { let path_gen = init_generator(&repo).await?; Ok(ObjectStore { @@ -182,12 +188,13 @@ impl ObjectStore { security_token, session_token, }, - )?, + ) + .map_err(ObjectError::from)?, client, }) } - async fn next_directory(&self) -> Result { + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); match self.repo { @@ -201,7 +208,7 @@ impl ObjectStore { Ok(path) } - async fn next_file(&self) -> Result { + async fn next_file(&self) -> Result { let path = self.next_directory().await?.to_strings().join("/"); let filename = uuid::Uuid::new_v4().to_string(); @@ -209,7 +216,7 @@ impl ObjectStore { } } -async fn init_generator(repo: &Repo) -> Result { +async fn init_generator(repo: &Repo) -> Result { match repo { Repo::Sled(sled_repo) => { if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { @@ -250,7 +257,7 @@ where impl std::fmt::Debug for ObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ObjectStore") - .field("path_gen", &self.path_gen) + .field("path_gen", &"generator") .field("bucket", &self.bucket.name) .field("region", &self.bucket.region) .finish() diff --git a/src/store/object_store/object_id.rs b/src/store/object_store/object_id.rs index 6b3bb32..d9c8a4e 100644 --- a/src/store/object_store/object_id.rs +++ b/src/store/object_store/object_id.rs @@ -1,17 +1,20 @@ -use crate::store::{object_store::ObjectError, Identifier}; +use crate::{ + error::Error, + store::{object_store::ObjectError, Identifier}, +}; #[derive(Debug, Clone)] pub(crate) struct ObjectId(String); impl Identifier for ObjectId { - type Error = ObjectError; - - fn to_bytes(&self) -> Result, Self::Error> { + fn to_bytes(&self) -> Result, Error> { Ok(self.0.as_bytes().to_vec()) } - fn from_bytes(bytes: Vec) -> Result { - Ok(ObjectId(String::from_utf8(bytes)?)) + fn from_bytes(bytes: Vec) -> Result { + Ok(ObjectId( + String::from_utf8(bytes).map_err(ObjectError::from)?, + )) } } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 3052414..5261227 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -51,7 +51,6 @@ impl UploadManager { where S1: Store, S2: Store, - Error: From + From, { match self.inner.repo { Repo::Sled(ref sled_repo) => do_migrate_store(sled_repo, from, to).await, @@ -62,10 +61,7 @@ impl UploadManager { &self, store: S, alias: &Alias, - ) -> Result - where - Error: From, - { + ) -> Result { let identifier = self.identifier_from_alias::(alias).await?; let details = if let Some(details) = self.details(&identifier).await? { details @@ -205,10 +201,7 @@ impl UploadManager { pub(crate) async fn details( &self, identifier: &I, - ) -> Result, Error> - where - Error: From, - { + ) -> Result, Error> { match self.inner.repo { Repo::Sled(ref sled_repo) => Ok(sled_repo.details(identifier).await?), } @@ -240,10 +233,7 @@ impl UploadManager { &self, store: S, alias: Alias, - ) -> Result<(), Error> - where - Error: From, - { + ) -> Result<(), Error> { let token = match self.inner.repo { Repo::Sled(ref sled_repo) => sled_repo.delete_token(&alias).await?, }; @@ -258,10 +248,7 @@ impl UploadManager { store: S, alias: Alias, token: DeleteToken, - ) -> Result<(), Error> - where - Error: From, - { + ) -> Result<(), Error> { let hash = match self.inner.repo { Repo::Sled(ref sled_repo) => { let saved_delete_token = sled_repo.delete_token(&alias).await?; @@ -282,10 +269,7 @@ impl UploadManager { &self, store: S, hash: Vec, - ) -> Result<(), Error> - where - Error: From, - { + ) -> Result<(), Error> { match self.inner.repo { Repo::Sled(ref sled_repo) => { let hash: ::Bytes = hash.into(); @@ -309,7 +293,7 @@ impl UploadManager { HashRepo::cleanup(sled_repo, hash).await?; - let cleanup_span = tracing::info_span!("Cleaning files"); + let cleanup_span = tracing::info_span!(parent: None, "Cleaning files"); cleanup_span.follows_from(Span::current()); actix_rt::spawn( @@ -323,12 +307,10 @@ impl UploadManager { { debug!("Deleting {:?}", identifier); if let Err(e) = store.remove(identifier).await { - let e: Error = e.into(); errors.push(e); } if let Err(e) = IdentifierRepo::cleanup(&repo, identifier).await { - let e: Error = e.into(); errors.push(e); } } @@ -350,10 +332,7 @@ impl UploadManager { Ok(()) } - pub(crate) fn session(&self, store: S) -> UploadManagerSession - where - Error: From, - { + pub(crate) fn session(&self, store: S) -> UploadManagerSession { UploadManagerSession::new(self.clone(), store) } } @@ -366,7 +345,6 @@ async fn migrate_file( where S1: Store, S2: Store, - Error: From + From, { let stream = from.to_stream(identifier, None, None).await?; futures_util::pin_mut!(stream); @@ -382,7 +360,6 @@ where R: IdentifierRepo, I1: Identifier, I2: Identifier, - Error: From<::Error>, { if let Some(details) = repo.details(&from).await? { repo.relate_details(to, &details).await?; @@ -396,11 +373,7 @@ async fn do_migrate_store(repo: &R, from: S1, to: S2) -> Result<(), E where S1: Store, S2: Store, - Error: From + From, R: IdentifierRepo + HashRepo + SettingsRepo, - Error: From<::Error>, - Error: From<::Error>, - Error: From<::Error>, { let stream = repo.hashes().await; let mut stream = Box::pin(stream); diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs index dca15a1..726ac77 100644 --- a/src/upload_manager/session.rs +++ b/src/upload_manager/session.rs @@ -13,20 +13,14 @@ use futures_util::stream::{Stream, StreamExt}; use tracing::{debug, instrument, Span}; use tracing_futures::Instrument; -pub(crate) struct UploadManagerSession -where - Error: From, -{ +pub(crate) struct UploadManagerSession { store: S, manager: UploadManager, alias: Option, finished: bool, } -impl UploadManagerSession -where - Error: From, -{ +impl UploadManagerSession { pub(super) fn new(manager: UploadManager, store: S) -> Self { UploadManagerSession { store, @@ -45,10 +39,7 @@ where } } -impl Drop for UploadManagerSession -where - Error: From, -{ +impl Drop for UploadManagerSession { fn drop(&mut self) { if self.finished { return; @@ -91,10 +82,7 @@ where } } -impl UploadManagerSession -where - Error: From, -{ +impl UploadManagerSession { /// Generate a delete token for an alias #[instrument(skip(self))] pub(crate) async fn delete_token(&self) -> Result {