Fix Range, consolidate errors, test object storage

This commit is contained in:
Aode (lion) 2022-03-26 20:45:12 -05:00
parent 15b52ba6ec
commit 37e6b21b55
24 changed files with 458 additions and 436 deletions

1
Cargo.lock generated
View File

@ -1643,6 +1643,7 @@ dependencies = [
"tokio",
"tokio-uring",
"tokio-util 0.7.0",
"toml",
"tracing",
"tracing-actix-web",
"tracing-awc",

View File

@ -56,6 +56,7 @@ time = { version = "0.3.0", features = ["serde"] }
tokio = { version = "1", features = ["full", "tracing"] }
tokio-uring = { version = "0.3", optional = true, features = ["bytes"] }
tokio-util = { version = "0.7", default-features = false, features = ["codec"] }
toml = "0.5.8"
tracing = "0.1.15"
tracing-error = "0.2.0"
tracing-futures = "0.2.4"

166
README.md
View File

@ -9,83 +9,145 @@ _a simple image hosting service_
## Usage
### Running
```
pict-rs 0.3.0-rc.7
pict-rs
USAGE:
pict-rs [FLAGS] [OPTIONS] [SUBCOMMAND]
FLAGS:
-h, --help Prints help information
-s, --skip-validate-imports Whether to skip validating images uploaded via the internal import API
-V, --version Prints version information
pict-rs [OPTIONS] <SUBCOMMAND>
OPTIONS:
-a, --addr <addr> The address and port the server binds to.
--api-key <api-key>
-a, --addr <ADDR>
The address and port the server binds to.
--api-key <API_KEY>
An optional string to be checked on requests to privileged endpoints
-c, --config-file <config-file> Path to the pict-rs configuration file
--console-buffer-capacity <console-buffer-capacity>
-c, --config-file <CONFIG_FILE>
Path to the pict-rs configuration file
--console-buffer-capacity <CONSOLE_BUFFER_CAPACITY>
Specify the number of events the console subscriber is allowed to buffer
-f, --filters <filters>...
An optional list of filters to permit, supports 'identity', 'thumbnail', 'resize', 'crop', and 'blur'
-f, --filters <FILTERS>
An optional list of filters to permit, supports 'identity', 'thumbnail', 'resize',
'crop', and 'blur'
-i, --image-format <image-format>
An optional image format to convert all uploaded files into, supports 'jpg', 'png', and 'webp'
--filesystem-storage-path <FILESYSTEM_STORAGE_PATH>
Path in which pict-rs will create it's 'files' directory
-m, --max-file-size <max-file-size>
-h, --help
Print help information
-i, --image-format <IMAGE_FORMAT>
An optional image format to convert all uploaded files into, supports 'jpg', 'png', and
'webp'
-m, --max-file-size <MAX_FILE_SIZE>
Specify the maximum allowed uploaded file size (in Megabytes)
--max-image-area <max-image-area> Specify the maximum area in pixels allowed in an image
--max-image-height <max-image-height> Specify the maximum width in pixels allowed on an image
--max-image-width <max-image-width> Specify the maximum width in pixels allowed on an image
--migrate-file <migrate-file> Path to a file defining a store migration
-o, --opentelemetry-url <opentelemetry-url>
--max-image-area <MAX_IMAGE_AREA>
Specify the maximum area in pixels allowed in an image
--max-image-height <MAX_IMAGE_HEIGHT>
Specify the maximum width in pixels allowed on an image
--max-image-width <MAX_IMAGE_WIDTH>
Specify the maximum width in pixels allowed on an image
-o, --opentelemetry-url <OPENTELEMETRY_URL>
Enable OpenTelemetry Tracing exports to the given OpenTelemetry collector
-p, --path <path> The path to the data directory, e.g. data/
--sled-cache-capacity <sled-cache-capacity>
Specify the number of bytes sled is allowed to use for it's cache
--object-store-access-key <OBJECT_STORE_ACCESS_KEY>
--object-store-bucket-name <OBJECT_STORE_BUCKET_NAME>
Name of the bucket in which pict-rs will store images
--object-store-region <OBJECT_STORE_REGION>
Region in which the bucket exists, can be an http endpoint
--object-store-secret-key <OBJECT_STORE_SECRET_KEY>
--object-store-security-token <OBJECT_STORE_SECURITY_TOKEN>
--object-store-session-token <OBJECT_STORE_SESSION_TOKEN>
-p, --path <PATH>
The path to the data directory, e.g. data/
-R, --repo <REPO>
Set the database implementation. Available options are 'sled'. Default is 'sled'
-s, --skip-validate-imports
Whether to skip validating images uploaded via the internal import API
-S, --store <STORE>
Set the image store. Available options are 'object-storage' or 'filesystem'. Default is
'filesystem'
--sled-cache-capacity <SLED_CACHE_CAPACITY>
The number of bytes sled is allowed to use for it's in-memory cache
--sled-path <SLED_PATH>
Path in which pict-rs will create it's 'repo' directory
SUBCOMMANDS:
file-store
help Prints this message or the help of the given subcommand(s)
s3-store
dump
help Print this message or the help of the given subcommand(s)
migrate-repo
migrate-store
run
```
```
pict-rs-file-store 0.3.0-rc.1
pict-rs-dump
USAGE:
pict-rs file-store [OPTIONS]
pict-rs dump <PATH>
FLAGS:
-h, --help Prints help information
-V, --version Prints version information
ARGS:
<PATH>
OPTIONS:
--path <path> Path in which pict-rs will create it's 'files' directory
-h, --help Print help information
```
```
pict-rs-s3-store 0.3.0-rc.1
pict-rs-migrate-repo
USAGE:
pict-rs s3-store [OPTIONS] --bucket-name <bucket-name> --region <region>
pict-rs migrate-repo <TO>
FLAGS:
-h, --help Prints help information
-V, --version Prints version information
ARGS:
<TO>
OPTIONS:
--access-key <access-key>
--bucket-name <bucket-name> Name of the bucket in which pict-rs will store images
--region <region> Region in which the bucket exists, can be an http endpoint
--secret-key <secret-key>
--security-token <security-token>
--session-token <session-token>
-h, --help Print help information
```
```
pict-rs-migrate-store
USAGE:
pict-rs migrate-store <TO>
ARGS:
<TO>
OPTIONS:
-h, --help Print help information
```
```
pict-rs-run
USAGE:
pict-rs run
OPTIONS:
-h, --help Print help information
```
See [`pict-rs.toml`](https://git.asonix.dog/asonix/pict-rs/src/branch/main/pict-rs.toml) and
@ -95,23 +157,27 @@ configuration
#### Example:
Running on all interfaces, port 8080, storing data in /opt/data
```
$ ./pict-rs -a 0.0.0.0:8080 -p /opt/data
$ ./pict-rs -a 0.0.0.0:8080 -p /opt/data run
```
Running locally, port 9000, storing data in data/, and converting all uploads to PNG
```
$ ./pict-rs -a 127.0.0.1:9000 -p data/ -f png
$ ./pict-rs -a 127.0.0.1:9000 -p data/ -f png run
```
Running locally, port 8080, storing data in data/, and only allowing the `thumbnail` and `identity` filters
```
$ ./pict-rs -a 127.0.0.1:8080 -p data/ -w thumbnail identity
$ ./pict-rs -a 127.0.0.1:8080 -p data/ -w thumbnail identity run
```
Running from a configuration file
```
$ ./pict-rs -c ./pict-rs.toml
$ ./pict-rs -c ./pict-rs.toml run
```
Migrating between storage backends
Migrating to object storage from filesystem storage (both storages must be configured in pict-rs.toml)
```
$ ./pict-rs -p ./data --migrate-file ./migrate.toml
$ ./pict-rs -c ./pict-rs.toml --store filesystem migrate-store object-storage
```
Dumping commandline flags to a toml file
```
$ ./pict-rs -p data/ --store object-storage --object-storage-bucket-name pict-rs --object-storage-region us-east-1 dump pict-rs.toml
```
#### Docker

View File

@ -1,25 +1,17 @@
FROM archlinux:latest
FROM alpine:edge
ARG UID=1000
ARG GID=1000
RUN \
pacman -Syu --noconfirm \
perl-image-exiftool \
imagemagick \
ffmpeg && \
groupadd -g 1000 app && \
useradd -m \
-d /opt/app \
-u $UID \
-g $GID \
app
apk add exiftool imagemagick ffmpeg && \
addgroup -g $GID app && \
adduser -h /opt/app -g "" -G app -u $UID -D app && \
chown -R app:app /mnt
COPY root/ /
COPY ./pict-rs.toml /etc/pict-rs.toml
ENV PATH=$PATH:/usr/bin/vendor_perl
WORKDIR /opt/app
USER app

View File

@ -1,9 +1,17 @@
path = '/mnt'
path = 'data/'
addr = '0.0.0.0:8080'
[store]
type = 's3_store'
bucket_name = 'pict-rs'
region = 'http://minio:9000'
access_key = '09ODZ3BGBISV4U92JLIM'
secret_key = 'j35YE9RrxhBP0dpiD5mmdXRXvPkEJR4k6zK12q3o'
repo = 'sled'
store = 'object_storage'
[sled]
sled_cache_capacity = 67108864
[filesystem_storage]
filesystem_storage_path = '/mnt/files'
[object_storage]
object_store_bucket_name = 'pict-rs'
object_store_region = 'http://minio:9000'
object_store_access_key = 'XZEZ5B8Y3UCINU1KCVF6'
object_store_secret_key = 'cWbE5LcCK9YH8j1NvhOZocl+vH+b6T5Zvy3z+BZu'

View File

@ -8,12 +8,14 @@
<policy domain="resource" name="disk" value="1GiB" />
<policy domain="resource" name="file" value="768" />
<policy domain="resource" name="thread" value="2" />
<policy domain="path" rights="none" pattern="@*" />
<policy domain="coder" rights="none" pattern="*" />
<policy domain="coder" rights="read | write" pattern="{GIF,JPEG,PNG,WEBP,MP4,TMP,PAM}" />
<policy domain="delegate" rights="none" pattern="*" />
<policy domain="delegate" rights="execute" pattern="ffmpeg" />
<policy domain="filter" rights="none" pattern="*" />
<policy domain="path" rights="none" pattern="@*" />
<policy domain="module" rights="none" pattern="*" />
<policy domain="module" rights="read | write" pattern="{GIF,JPEG,PNG,WEBP,MP4,TMP,PAM}" />
<policy domain="module" rights="read | write" pattern="{GIF,JPEG,PNG,WEBP,TMP,PAM,VIDEO}" />
<!-- indirect reads not permitted -->
<policy domain="system" name="precision" value="6" />
</policymap>

View File

@ -8,12 +8,14 @@
<policy domain="resource" name="disk" value="1GiB" />
<policy domain="resource" name="file" value="768" />
<policy domain="resource" name="thread" value="2" />
<policy domain="path" rights="none" pattern="@*" />
<policy domain="coder" rights="none" pattern="*" />
<policy domain="coder" rights="read | write" pattern="{GIF,JPEG,PNG,WEBP,MP4,TMP,PAM}" />
<policy domain="delegate" rights="none" pattern="*" />
<policy domain="delegate" rights="execute" pattern="ffmpeg" />
<policy domain="filter" rights="none" pattern="*" />
<policy domain="path" rights="none" pattern="@*" />
<policy domain="module" rights="none" pattern="*" />
<policy domain="module" rights="read | write" pattern="{GIF,JPEG,PNG,WEBP,MP4,TMP,PAM}" />
<policy domain="module" rights="read | write" pattern="{GIF,JPEG,PNG,WEBP,TMP,PAM,VIDEO}" />
<!-- indirect reads not permitted -->
<policy domain="system" name="precision" value="6" />
</policymap>

View File

@ -43,13 +43,6 @@ max_image_area = 40_000_000 # in Pixels
# default: false
skip_validate_imports = false
## Optional: set sled's cache capacity to a given number of bytes
# environment variable: PICTRS_SLED_CACHE_CAPACITY
# default: 67_108_864 (1024 * 1024 * 64) e.g. 64MB
#
# Increasing this value can improve performance by keeping more of the database in RAM
sled_cache_capacity = 67_108_864 # in bytes
## Optional: enable tokio-console and set the event buffer size
# environment variable: PICTRS_CONSOLE_BUFFER_CAPACITY
# default: empty
@ -95,58 +88,65 @@ api_key = 'API_KEY'
# Not specifying opentelemetry_url means no traces will be exported
opentelemetry_url = 'http://localhost:4317/'
## Optional: store definition
# default store: file_store
#
# Not specifying a store means a file_store will be used with the top-level pict-rs' path
[store]
type = "file_store"
## Optional: the data repository to use
# environment variable: PICTRS_REPO
# default: 'sled'
# available options: 'sled'
repo = 'sled'
## Example file store
# [store]
#
# # environment variable: PICTRS_STORE__TYPE
# type = 'file_store'
#
# # Optional: file path
# # environment variable: PICTRS_STORE__PATH
# # default: empty
# #
# # Not specifying path means pict-rs' top-level `path` config is used
# path = './data'
## Optional: the file storage to use
# environment variable: PICTRS_STORE
# default: 'filesystem'
# available options: 'filesystem', 'object_storage'
store = 'filesystem'
## Example s3 store
# [store]
## Optional: Sled store configration definition
[sled]
## Optional: set sled's cache capacity to a given number of bytes
# environment variable: PICTRS_SLED__SLED_CACHE_CAPACITY
# default: 67_108_864 (1024 * 1024 * 64) e.g. 64MB
#
# # environment variable: PICTRS_STORE__TYPE
# type = 's3_store'
# Increasing this value can improve performance by keeping more of the database in RAM
sled_cache_capacity = 67_108_864 # in bytes
## Optional: Filesystem storage configuration
[filesystem_storage]
## Optional: set the path for pict-rs filesystem file storage
# environment variable: PICTRS_FILESYSTEM_STORAGE__FILESYSTEM_STORAGE_PATH
# default '${path}/files'
filesystem_storage_path = 'data/files'
## Optional: Object Storage configuration
[object_storage]
## Required: bucket name
# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_BUCKET_NAME
object_store_bucket_name = 'pict-rs'
## Required: bucket region
# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_REGION
#
# # Required: bucket name
# # environment variable: PICTRS_STORE__BUCKET_NAME
# bucket_name = 'rust_s3'
#
# # Required: bucket region
# # environment variable: PICTRS_STORE__REGION
# #
# # can also be endpoint of local s3 store, e.g. 'http://minio:9000'
# region = 'eu-central-1'
#
# # Optional: bucket access key
# # environment variable: PICTRS_STORE__ACCESS_KEY
# # default: empty
# access_key = 'ACCESS_KEY'
#
# # Optional: bucket secret key
# # environment variable: PICTRS_STORE__SECRET_KEY
# # default: empty
# secret_key = 'SECRET_KEY'
#
# # Optional: bucket security token
# # environment variable: PICTRS_STORE__SECURITY_TOKEN
# # default: empty
# security_token = 'SECURITY_TOKEN'
#
# # Optional: bucket session token
# # environment variable: PICTRS_STORE__SESSION_TOKEN
# # default: empty
# session_token = 'SESSION_TOKEN'
# can also be endpoint of local s3 store, e.g. 'http://minio:9000'
object_store_region = 'eu-central-1'
## Optional: bucket access key
# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_ACCESS_KEY
# default: empty
object_store_access_key = '09ODZ3BGBISV4U92JLIM'
## Optional: bucket secret key
# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_SECRET_KEY
# default: empty
object_store_secret_key = 'j35YE9RrxhBP0dpiD5mmdXRXvPkEJR4k6zK12q3o'
## Optional: bucket security token
# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_SECURITY_TOKEN
# default: empty
object_store_security_token = 'SECURITY_TOKEN'
## Optional: bucket session token
# environment variable: PICTRS_OBJECT_STORAGE__OBJECT_STORE_SESSION_TOKEN
# default: empty
object_store_session_token = 'SESSION_TOKEN'

View File

@ -170,12 +170,16 @@ impl Overrides {
#[serde(tag = "type")]
pub(crate) enum Command {
Run,
Dump { path: PathBuf },
MigrateStore { to: Store },
MigrateRepo { to: Repo },
}
pub(crate) enum CommandConfig {
Run,
Dump {
path: PathBuf,
},
MigrateStore {
to: Storage,
},
@ -287,7 +291,6 @@ pub(crate) enum Repository {
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct Config {
command: Command,
skip_validate_imports: bool,
addr: SocketAddr,
path: PathBuf,
@ -301,8 +304,9 @@ pub(crate) struct Config {
api_key: Option<String>,
opentelemetry_url: Option<Url>,
repo: Repo,
sled: Option<Sled>,
store: Store,
command: Command,
sled: Option<Sled>,
filesystem_storage: Option<FilesystemStorage>,
object_storage: Option<ObjectStorage>,
}
@ -329,9 +333,9 @@ struct SledDefaults {
}
impl Defaults {
fn new() -> Self {
fn new(command: Command) -> Self {
Defaults {
command: Command::Run,
command,
skip_validate_imports: false,
addr: ([0, 0, 0, 0], 8080).into(),
max_file_size: 40,
@ -351,8 +355,8 @@ impl Config {
pub(crate) fn build() -> anyhow::Result<Self> {
let args = Args::parse();
let mut base_config =
config::Config::builder().add_source(config::Config::try_from(&Defaults::new())?);
let mut base_config = config::Config::builder()
.add_source(config::Config::try_from(&Defaults::new(args.command))?);
if let Some(path) = args.config_file {
base_config = base_config.add_source(config::File::from(path));
@ -375,6 +379,7 @@ impl Config {
pub(crate) fn command(&self) -> anyhow::Result<CommandConfig> {
Ok(match &self.command {
Command::Run => CommandConfig::Run,
Command::Dump { path } => CommandConfig::Dump { path: path.clone() },
Command::MigrateStore { to } => CommandConfig::MigrateStore {
to: match to {
Store::ObjectStorage => Storage::ObjectStorage(

View File

@ -34,10 +34,7 @@ impl Details {
store: S,
identifier: S::Identifier,
expected_format: Option<ValidInputType>,
) -> Result<Self, Error>
where
Error: From<S::Error>,
{
) -> Result<Self, Error> {
let details = crate::magick::details_store(store, identifier, expected_format).await?;
Ok(Details::now(

View File

@ -15,6 +15,20 @@ impl std::fmt::Debug for Error {
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "{}", self.kind)?;
writeln!(f)?;
let mut count = 0;
let mut source = std::error::Error::source(self);
if source.is_some() {
writeln!(f, "Chain:")?;
}
while let Some(err) = source {
write!(f, "{}. ", count)?;
writeln!(f, "{}", err)?;
count += 1;
source = std::error::Error::source(err);
}
std::fmt::Display::fmt(&self.context, f)
}
}
@ -43,7 +57,7 @@ pub(crate) enum UploadError {
Upload(#[from] actix_form_data::Error),
#[error("Error in DB")]
Sled(#[from] crate::repo::sled::Error),
Sled(#[from] crate::repo::sled::SledError),
#[error("Error in old sled DB")]
OldSled(#[from] ::sled::Error),

View File

@ -44,7 +44,7 @@ impl ThumbnailFormat {
fn as_format(&self) -> &'static str {
match self {
ThumbnailFormat::Jpeg => "singlejpeg",
ThumbnailFormat::Jpeg => "image2",
// ThumbnailFormat::Webp => "webp",
}
}
@ -101,10 +101,7 @@ pub(crate) async fn thumbnail<S: Store>(
from: S::Identifier,
input_format: InputFormat,
format: ThumbnailFormat,
) -> Result<impl AsyncRead + Unpin, Error>
where
Error: From<S::Error>,
{
) -> Result<impl AsyncRead + Unpin, Error> {
let input_file = crate::tmp_file::tmp_file(Some(input_format.to_ext()));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;

View File

@ -139,14 +139,12 @@ pub(crate) async fn details_bytes(
parse_details(s)
}
#[tracing::instrument(skip(store))]
pub(crate) async fn details_store<S: Store>(
store: S,
identifier: S::Identifier,
hint: Option<ValidInputType>,
) -> Result<Details, Error>
where
Error: From<S::Error>,
{
) -> Result<Details, Error> {
if hint.as_ref().map(|h| h.is_mp4()).unwrap_or(false) {
let input_file = crate::tmp_file::tmp_file(Some(".mp4"));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
@ -182,6 +180,7 @@ where
parse_details(s)
}
#[tracing::instrument]
pub(crate) async fn details_file(path_str: &str) -> Result<Details, Error> {
let process = Process::run(
"magick",

View File

@ -77,10 +77,7 @@ async fn upload<S: Store>(
value: Value<UploadManagerSession<S>>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let images = value
.map()
.and_then(|mut m| m.remove("images"))
@ -196,10 +193,7 @@ async fn download<S: Store>(
manager: web::Data<UploadManager>,
store: web::Data<S>,
query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let res = client.get(&query.url).send().await?;
if !res.status().is_success() {
@ -249,14 +243,11 @@ async fn delete<S: Store>(
manager: web::Data<UploadManager>,
store: web::Data<S>,
path_entries: web::Path<(String, String)>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let (alias, token) = path_entries.into_inner();
) -> Result<HttpResponse, Error> {
let (token, alias) = path_entries.into_inner();
let alias = Alias::from_existing(&alias);
let token = DeleteToken::from_existing(&token);
let alias = Alias::from_existing(&alias);
manager.delete((**store).clone(), alias, token).await?;
@ -314,10 +305,7 @@ async fn process_details<S: Store>(
manager: web::Data<UploadManager>,
store: web::Data<S>,
filters: web::Data<Option<HashSet<String>>>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?;
let identifier = manager
@ -341,10 +329,7 @@ async fn process<S: Store + 'static>(
manager: web::Data<UploadManager>,
store: web::Data<S>,
filters: web::Data<Option<HashSet<String>>>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let (format, alias, thumbnail_path, thumbnail_args) =
prepare_process(query, ext.as_str(), &filters)?;
@ -468,10 +453,7 @@ async fn details<S: Store>(
alias: web::Path<String>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let alias = alias.into_inner();
let alias = Alias::from_existing(&alias);
@ -498,10 +480,7 @@ async fn serve<S: Store>(
alias: web::Path<String>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let alias = alias.into_inner();
let alias = Alias::from_existing(&alias);
let identifier = manager.identifier_from_alias::<S>(&alias).await?;
@ -525,10 +504,7 @@ async fn ranged_file_resp<S: Store>(
identifier: S::Identifier,
range: Option<web::Header<Range>>,
details: Details,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let (builder, stream) = if let Some(web::Header(range_header)) = range {
//Range header exists - return as ranged
if let Some(range) = range::single_bytes_range(&range_header) {
@ -602,10 +578,7 @@ async fn purge<S: Store>(
query: web::Query<AliasQuery>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let alias = Alias::from_existing(&query.alias);
let aliases = upload_manager.aliases_by_alias(&alias).await?;
@ -626,10 +599,7 @@ async fn aliases<S: Store>(
query: web::Query<AliasQuery>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
) -> Result<HttpResponse, Error> {
let alias = Alias::from_existing(&query.alias);
let aliases = upload_manager.aliases_by_alias(&alias).await?;
@ -658,11 +628,10 @@ fn build_reqwest_client() -> reqwest::Result<reqwest::Client> {
.build()
}
async fn launch<S: Store + Clone + 'static>(manager: UploadManager, store: S) -> anyhow::Result<()>
where
S::Error: Unpin,
Error: From<S::Error>,
{
async fn launch<S: Store + Clone + 'static>(
manager: UploadManager,
store: S,
) -> anyhow::Result<()> {
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it
@ -797,7 +766,6 @@ async fn migrate_inner<S1>(
) -> anyhow::Result<()>
where
S1: Store,
Error: From<S1::Error>,
{
match to {
config::Storage::Filesystem(RequiredFilesystemStorage { path }) => {
@ -848,6 +816,11 @@ async fn main() -> anyhow::Result<()> {
match CONFIG.command()? {
CommandConfig::Run => (),
CommandConfig::Dump { path } => {
let configuration = toml::to_string_pretty(&*CONFIG)?;
tokio::fs::write(path, configuration).await?;
return Ok(());
}
CommandConfig::MigrateRepo { to: _ } => {
unimplemented!("Repo migrations are currently unsupported")
}

View File

@ -15,7 +15,9 @@ pub(crate) fn chop_bytes(
length: u64,
) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
if let Some((start, end)) = byte_range.to_satisfiable_range(length) {
return Ok(once(ready(Ok(bytes.slice(start as usize..end as usize)))));
// END IS INCLUSIVE
let end = end as usize + 1;
return Ok(once(ready(Ok(bytes.slice(start as usize..end)))));
}
Err(UploadError::Range.into())
@ -26,14 +28,13 @@ pub(crate) async fn chop_store<S: Store>(
store: &S,
identifier: &S::Identifier,
length: u64,
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, Error>
where
Error: From<S::Error>,
{
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, Error> {
if let Some((start, end)) = byte_range.to_satisfiable_range(length) {
return Ok(store
// END IS INCLUSIVE
let end = end + 1;
return store
.to_stream(identifier, Some(start), Some(end.saturating_sub(start)))
.await?);
.await;
}
Err(UploadError::Range.into())

View File

@ -1,5 +1,9 @@
use crate::config::RequiredSledRepo;
use crate::{config::Repository, details::Details, store::Identifier};
use crate::{
config::{Repository, RequiredSledRepo},
details::Details,
error::Error,
store::Identifier,
};
use futures_util::Stream;
use tracing::debug;
use uuid::Uuid;
@ -34,98 +38,90 @@ pub(crate) struct AlreadyExists;
#[async_trait::async_trait(?Send)]
pub(crate) trait SettingsRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error + Send + Sync + 'static;
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>;
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Self::Error>;
async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>;
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error>;
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Error>;
async fn remove(&self, key: &'static [u8]) -> Result<(), Error>;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait IdentifierRepo {
type Error: std::error::Error + Send + Sync + 'static;
async fn relate_details<I: Identifier>(
&self,
identifier: &I,
details: &Details,
) -> Result<(), Self::Error>;
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Self::Error>;
) -> Result<(), Error>;
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Error>;
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Self::Error>;
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Error>;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait HashRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error + Send + Sync + 'static;
type Stream: Stream<Item = Result<Self::Bytes, Self::Error>>;
type Stream: Stream<Item = Result<Self::Bytes, Error>>;
async fn hashes(&self) -> Self::Stream;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Error>;
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error>;
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error>;
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Self::Error>;
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error>;
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error>;
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Error>;
async fn relate_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Self::Error>;
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<I, Self::Error>;
) -> Result<(), Error>;
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes) -> Result<I, Error>;
async fn relate_variant_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
variant: String,
identifier: &I,
) -> Result<(), Self::Error>;
) -> Result<(), Error>;
async fn variant_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Self::Error>;
) -> Result<Option<I>, Error>;
async fn variants<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Vec<(String, I)>, Self::Error>;
) -> Result<Vec<(String, I)>, Error>;
async fn relate_motion_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Self::Error>;
) -> Result<(), Error>;
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Option<I>, Self::Error>;
) -> Result<Option<I>, Error>;
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>;
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error>;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait AliasRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error + Send + Sync + 'static;
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error>;
async fn relate_delete_token(
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Self::Error>;
) -> Result<Result<(), AlreadyExists>, Error>;
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Error>;
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Self::Error>;
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Self::Error>;
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error>;
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Error>;
async fn cleanup(&self, alias: &Alias) -> Result<(), Self::Error>;
async fn cleanup(&self, alias: &Alias) -> Result<(), Error>;
}
impl Repo {
@ -389,16 +385,14 @@ impl std::fmt::Display for Alias {
}
impl Identifier for Vec<u8> {
type Error = std::convert::Infallible;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error>
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Error>
where
Self: Sized,
{
Ok(bytes)
}
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error> {
fn to_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.clone())
}
}

View File

@ -2,24 +2,24 @@ use super::{
Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier, IdentifierRepo,
SettingsRepo,
};
use crate::error::Error;
use sled::{Db, IVec, Tree};
macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone();
actix_rt::task::spawn_blocking(move || $expr).await??
actix_rt::task::spawn_blocking(move || $expr)
.await
.map_err(SledError::from)??
}};
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
pub(crate) enum SledError {
#[error("Error in database")]
Sled(#[from] sled::Error),
#[error("Invalid identifier")]
Identifier(#[source] Box<dyn std::error::Error + Sync + Send>),
#[error("Invalid details json")]
Details(#[from] serde_json::Error),
@ -46,7 +46,7 @@ pub(crate) struct SledRepo {
}
impl SledRepo {
pub(crate) fn new(db: Db) -> Result<Self, Error> {
pub(crate) fn new(db: Db) -> Result<Self, SledError> {
Ok(SledRepo {
settings: db.open_tree("pict-rs-settings-tree")?,
identifier_details: db.open_tree("pict-rs-identifier-details-tree")?,
@ -66,36 +66,29 @@ impl SledRepo {
#[async_trait::async_trait(?Send)]
impl SettingsRepo for SledRepo {
type Bytes = IVec;
type Error = Error;
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error> {
#[tracing::instrument(skip(value))]
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error> {
b!(self.settings, settings.insert(key, value));
Ok(())
}
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Self::Error> {
#[tracing::instrument]
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Error> {
let opt = b!(self.settings, settings.get(key));
Ok(opt)
}
async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error> {
#[tracing::instrument]
async fn remove(&self, key: &'static [u8]) -> Result<(), Error> {
b!(self.settings, settings.remove(key));
Ok(())
}
}
fn identifier_bytes<I>(identifier: &I) -> Result<Vec<u8>, Error>
where
I: Identifier,
{
identifier
.to_bytes()
.map_err(|e| Error::Identifier(Box::new(e)))
}
fn variant_key(hash: &[u8], variant: &str) -> Vec<u8> {
let mut bytes = hash.to_vec();
bytes.push(b'/');
@ -111,14 +104,13 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option<String> {
#[async_trait::async_trait(?Send)]
impl IdentifierRepo for SledRepo {
type Error = Error;
#[tracing::instrument]
async fn relate_details<I: Identifier>(
&self,
identifier: &I,
details: &Details,
) -> Result<(), Self::Error> {
let key = identifier_bytes(identifier)?;
) -> Result<(), Error> {
let key = identifier.to_bytes()?;
let details = serde_json::to_vec(&details)?;
b!(
@ -129,8 +121,9 @@ impl IdentifierRepo for SledRepo {
Ok(())
}
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Self::Error> {
let key = identifier_bytes(identifier)?;
#[tracing::instrument]
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Error> {
let key = identifier.to_bytes()?;
let opt = b!(self.identifier_details, identifier_details.get(key));
@ -141,8 +134,9 @@ impl IdentifierRepo for SledRepo {
}
}
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Self::Error> {
let key = identifier_bytes(identifier)?;
#[tracing::instrument]
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Error> {
let key = identifier.to_bytes()?;
b!(self.identifier_details, identifier_details.remove(key));
@ -192,7 +186,13 @@ impl futures_util::Stream for HashStream {
(iter, opt)
})
.await
.map(|(iter, opt)| (iter, opt.map(|res| res.map_err(Error::from))))
.map(|(iter, opt)| {
(
iter,
opt.map(|res| res.map_err(SledError::from).map_err(Error::from)),
)
})
.map_err(SledError::from)
.map_err(Error::from)
});
@ -213,7 +213,6 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec<u8> {
#[async_trait::async_trait(?Send)]
impl HashRepo for SledRepo {
type Bytes = IVec;
type Error = Error;
type Stream = HashStream;
async fn hashes(&self) -> Self::Stream {
@ -225,7 +224,8 @@ impl HashRepo for SledRepo {
}
}
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Self::Error> {
#[tracing::instrument]
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Error> {
let res = b!(self.hashes, {
let hash2 = hash.clone();
hashes.compare_and_swap(hash, None as Option<Self::Bytes>, Some(hash2))
@ -234,7 +234,8 @@ impl HashRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error> {
#[tracing::instrument]
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
let key = hash_alias_key(&hash, alias);
let value = alias.to_bytes();
@ -243,7 +244,8 @@ impl HashRepo for SledRepo {
Ok(())
}
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error> {
#[tracing::instrument]
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
let key = hash_alias_key(&hash, alias);
b!(self.hash_aliases, hash_aliases.remove(key));
@ -251,7 +253,8 @@ impl HashRepo for SledRepo {
Ok(())
}
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Self::Error> {
#[tracing::instrument]
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Error> {
let v = b!(self.hash_aliases, {
Ok(hash_aliases
.scan_prefix(hash)
@ -264,37 +267,37 @@ impl HashRepo for SledRepo {
Ok(v)
}
#[tracing::instrument]
async fn relate_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Self::Error> {
let bytes = identifier_bytes(identifier)?;
) -> Result<(), Error> {
let bytes = identifier.to_bytes()?;
b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes));
Ok(())
}
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<I, Self::Error> {
#[tracing::instrument]
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes) -> Result<I, Error> {
let opt = b!(self.hash_identifiers, hash_identifiers.get(hash));
opt.ok_or(Error::Missing).and_then(|ivec| {
I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))
})
opt.ok_or(SledError::Missing)
.map_err(Error::from)
.and_then(|ivec| I::from_bytes(ivec.to_vec()))
}
#[tracing::instrument]
async fn relate_variant_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
variant: String,
identifier: &I,
) -> Result<(), Self::Error> {
) -> Result<(), Error> {
let key = variant_key(&hash, &variant);
let value = identifier_bytes(identifier)?;
let value = identifier.to_bytes()?;
b!(
self.hash_variant_identifiers,
@ -304,11 +307,12 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
async fn variant_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Self::Error> {
) -> Result<Option<I>, Error> {
let key = variant_key(&hash, &variant);
let opt = b!(
@ -317,18 +321,17 @@ impl HashRepo for SledRepo {
);
if let Some(ivec) = opt {
Ok(Some(
I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))?,
))
Ok(Some(I::from_bytes(ivec.to_vec())?))
} else {
Ok(None)
}
}
#[tracing::instrument]
async fn variants<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Vec<(String, I)>, Self::Error> {
) -> Result<Vec<(String, I)>, Error> {
let vec = b!(
self.hash_variant_identifiers,
Ok(hash_variant_identifiers
@ -346,12 +349,13 @@ impl HashRepo for SledRepo {
Ok(vec)
}
#[tracing::instrument]
async fn relate_motion_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Self::Error> {
let bytes = identifier_bytes(identifier)?;
) -> Result<(), Error> {
let bytes = identifier.to_bytes()?;
b!(
self.hash_motion_identifiers,
@ -361,25 +365,25 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Option<I>, Self::Error> {
) -> Result<Option<I>, Error> {
let opt = b!(
self.hash_motion_identifiers,
hash_motion_identifiers.get(hash)
);
if let Some(ivec) = opt {
Ok(Some(
I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))?,
))
Ok(Some(I::from_bytes(ivec.to_vec())?))
} else {
Ok(None)
}
}
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error> {
#[tracing::instrument]
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error> {
let hash2 = hash.clone();
b!(self.hashes, hashes.remove(hash2));
@ -426,9 +430,9 @@ impl HashRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo {
type Bytes = sled::IVec;
type Error = Error;
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Self::Error> {
#[tracing::instrument]
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error> {
let bytes = alias.to_bytes();
let bytes2 = bytes.clone();
@ -440,11 +444,12 @@ impl AliasRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
#[tracing::instrument]
async fn relate_delete_token(
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, Self::Error> {
) -> Result<Result<(), AlreadyExists>, Error> {
let key = alias.to_bytes();
let token = delete_token.to_bytes();
@ -456,16 +461,19 @@ impl AliasRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Self::Error> {
#[tracing::instrument]
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Error> {
let key = alias.to_bytes();
let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key));
opt.and_then(|ivec| DeleteToken::from_slice(&ivec))
.ok_or(Error::Missing)
.ok_or(SledError::Missing)
.map_err(Error::from)
}
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Self::Error> {
#[tracing::instrument]
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> {
let key = alias.to_bytes();
b!(self.alias_hashes, alias_hashes.insert(key, hash));
@ -473,15 +481,17 @@ impl AliasRepo for SledRepo {
Ok(())
}
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Self::Error> {
#[tracing::instrument]
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Error> {
let key = alias.to_bytes();
let opt = b!(self.alias_hashes, alias_hashes.get(key));
opt.ok_or(Error::Missing)
opt.ok_or(SledError::Missing).map_err(Error::from)
}
async fn cleanup(&self, alias: &Alias) -> Result<(), Self::Error> {
#[tracing::instrument]
async fn cleanup(&self, alias: &Alias) -> Result<(), Error> {
let key = alias.to_bytes();
let key2 = key.clone();
@ -502,8 +512,8 @@ impl std::fmt::Debug for SledRepo {
}
}
impl From<actix_rt::task::JoinError> for Error {
impl From<actix_rt::task::JoinError> for SledError {
fn from(_: actix_rt::task::JoinError) -> Self {
Error::Panic
SledError::Panic
}
}

View File

@ -1,43 +1,37 @@
use std::fmt::Debug;
use crate::error::Error;
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use std::fmt::Debug;
use tokio::io::{AsyncRead, AsyncWrite};
pub(crate) mod file_store;
pub(crate) mod object_store;
pub(crate) trait Identifier: Send + Sync + Clone + Debug {
type Error: std::error::Error + Send + Sync + 'static;
fn to_bytes(&self) -> Result<Vec<u8>, Error>;
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error>;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error>
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Error>
where
Self: Sized;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait Store: Send + Sync + Clone + Debug + 'static {
type Error: std::error::Error + Send + Sync + 'static;
type Identifier: Identifier<Error = Self::Error>;
type Identifier: Identifier;
type Stream: Stream<Item = std::io::Result<Bytes>>;
async fn save_async_read<Reader>(
&self,
reader: &mut Reader,
) -> Result<Self::Identifier, Self::Error>
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error>;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error>;
async fn to_stream(
&self,
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Self::Error>;
) -> Result<Self::Stream, Error>;
async fn read_into<Writer>(
&self,
@ -47,7 +41,7 @@ pub(crate) trait Store: Send + Sync + Clone + Debug + 'static {
where
Writer: AsyncWrite + Send + Unpin;
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error>;
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error>;
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error>;
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error>;
}

View File

@ -1,4 +1,5 @@
use crate::{
error::Error,
file::File,
repo::{Repo, SettingsRepo},
store::Store,
@ -23,9 +24,6 @@ const GENERATOR_KEY: &[u8] = b"last-path";
#[derive(Debug, thiserror::Error)]
pub(crate) enum FileError {
#[error("Failed to interact with sled db")]
Sled(#[from] crate::repo::sled::Error),
#[error("Failed to read or write file")]
Io(#[from] std::io::Error),
@ -51,15 +49,11 @@ pub(crate) struct FileStore {
#[async_trait::async_trait(?Send)]
impl Store for FileStore {
type Error = FileError;
type Identifier = FileId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
#[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>(
&self,
reader: &mut Reader,
) -> Result<Self::Identifier, Self::Error>
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin,
{
@ -67,22 +61,22 @@ impl Store for FileStore {
if let Err(e) = self.safe_save_reader(&path, reader).await {
self.safe_remove_file(&path).await?;
return Err(e);
return Err(e.into());
}
self.file_id_from_path(path)
Ok(self.file_id_from_path(path)?)
}
#[tracing::instrument(skip(bytes))]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error> {
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
let path = self.next_file().await?;
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
self.safe_remove_file(&path).await?;
return Err(e);
return Err(e.into());
}
self.file_id_from_path(path)
Ok(self.file_id_from_path(path)?)
}
#[tracing::instrument]
@ -91,7 +85,7 @@ impl Store for FileStore {
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Self::Error> {
) -> Result<Self::Stream, Error> {
let path = self.path_from_file_id(identifier);
let stream = File::open(path)
@ -119,7 +113,7 @@ impl Store for FileStore {
}
#[tracing::instrument]
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error> {
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
let path = self.path_from_file_id(identifier);
let len = tokio::fs::metadata(path).await?.len();
@ -128,7 +122,7 @@ impl Store for FileStore {
}
#[tracing::instrument]
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> {
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
let path = self.path_from_file_id(identifier);
self.safe_remove_file(path).await?;
@ -138,7 +132,7 @@ impl Store for FileStore {
}
impl FileStore {
pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result<Self, FileError> {
pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result<Self, Error> {
let path_gen = init_generator(&repo).await?;
Ok(FileStore {
@ -148,7 +142,7 @@ impl FileStore {
})
}
async fn next_directory(&self) -> Result<PathBuf, FileError> {
async fn next_directory(&self) -> Result<PathBuf, Error> {
let path = self.path_gen.next();
match self.repo {
@ -167,7 +161,7 @@ impl FileStore {
Ok(target_path)
}
async fn next_file(&self) -> Result<PathBuf, FileError> {
async fn next_file(&self) -> Result<PathBuf, Error> {
let target_path = self.next_directory().await?;
let filename = uuid::Uuid::new_v4().to_string();
@ -290,7 +284,7 @@ pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), Fi
Ok(())
}
async fn init_generator(repo: &Repo) -> Result<Generator, FileError> {
async fn init_generator(repo: &Repo) -> Result<Generator, Error> {
match repo {
Repo::Sled(sled_repo) => {
if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? {

View File

@ -1,6 +1,9 @@
use crate::store::{
file_store::{FileError, FileStore},
Identifier,
use crate::{
error::Error,
store::{
file_store::{FileError, FileStore},
Identifier,
},
};
use std::path::PathBuf;
@ -8,9 +11,7 @@ use std::path::PathBuf;
pub(crate) struct FileId(PathBuf);
impl Identifier for FileId {
type Error = FileError;
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error> {
fn to_bytes(&self) -> Result<Vec<u8>, Error> {
let vec = self
.0
.to_str()
@ -21,7 +22,7 @@ impl Identifier for FileId {
Ok(vec)
}
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error>
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Error>
where
Self: Sized,
{

View File

@ -1,4 +1,5 @@
use crate::{
error::Error,
repo::{Repo, SettingsRepo},
store::Store,
};
@ -28,9 +29,6 @@ pub(crate) enum ObjectError {
#[error("Failed to generate path")]
PathGenerator(#[from] storage_path_generator::PathError),
#[error("Failed to interact with sled repo")]
Sled(#[from] crate::repo::sled::Error),
#[error("Failed to parse string")]
Utf8(#[from] FromUtf8Error),
@ -58,15 +56,11 @@ pin_project_lite::pin_project! {
#[async_trait::async_trait(?Send)]
impl Store for ObjectStore {
type Error = ObjectError;
type Identifier = ObjectId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
#[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>(
&self,
reader: &mut Reader,
) -> Result<Self::Identifier, Self::Error>
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin,
{
@ -74,16 +68,20 @@ impl Store for ObjectStore {
self.bucket
.put_object_stream(&self.client, reader, &path)
.await?;
.await
.map_err(ObjectError::from)?;
Ok(ObjectId::from_string(path))
}
#[tracing::instrument(skip(bytes))]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error> {
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
let path = self.next_file().await?;
self.bucket.put_object(&self.client, &path, &bytes).await?;
self.bucket
.put_object(&self.client, &path, &bytes)
.await
.map_err(ObjectError::from)?;
Ok(ObjectId::from_string(path))
}
@ -94,7 +92,7 @@ impl Store for ObjectStore {
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Self::Error> {
) -> Result<Self::Stream, Error> {
let path = identifier.as_str();
let start = from_start.unwrap_or(0);
@ -107,7 +105,7 @@ impl Store for ObjectStore {
Command::GetObjectRange { start, end },
);
let response = request.response().await?;
let response = request.response().await.map_err(ObjectError::from)?;
Ok(Box::pin(io_error(response.bytes_stream())))
}
@ -126,26 +124,34 @@ impl Store for ObjectStore {
self.bucket
.get_object_stream(&self.client, path, writer)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Self::Error::from(e)))?;
.map_err(ObjectError::from)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Error::from(e)))?;
Ok(())
}
#[tracing::instrument]
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error> {
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
let path = identifier.as_str();
let (head, _) = self.bucket.head_object(&self.client, path).await?;
let (head, _) = self
.bucket
.head_object(&self.client, path)
.await
.map_err(ObjectError::from)?;
let length = head.content_length.ok_or(ObjectError::Length)?;
Ok(length as u64)
}
#[tracing::instrument]
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> {
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
let path = identifier.as_str();
self.bucket.delete_object(&self.client, path).await?;
self.bucket
.delete_object(&self.client, path)
.await
.map_err(ObjectError::from)?;
Ok(())
}
}
@ -161,7 +167,7 @@ impl ObjectStore {
session_token: Option<String>,
repo: Repo,
client: reqwest::Client,
) -> Result<ObjectStore, ObjectError> {
) -> Result<ObjectStore, Error> {
let path_gen = init_generator(&repo).await?;
Ok(ObjectStore {
@ -182,12 +188,13 @@ impl ObjectStore {
security_token,
session_token,
},
)?,
)
.map_err(ObjectError::from)?,
client,
})
}
async fn next_directory(&self) -> Result<Path, ObjectError> {
async fn next_directory(&self) -> Result<Path, Error> {
let path = self.path_gen.next();
match self.repo {
@ -201,7 +208,7 @@ impl ObjectStore {
Ok(path)
}
async fn next_file(&self) -> Result<String, ObjectError> {
async fn next_file(&self) -> Result<String, Error> {
let path = self.next_directory().await?.to_strings().join("/");
let filename = uuid::Uuid::new_v4().to_string();
@ -209,7 +216,7 @@ impl ObjectStore {
}
}
async fn init_generator(repo: &Repo) -> Result<Generator, ObjectError> {
async fn init_generator(repo: &Repo) -> Result<Generator, Error> {
match repo {
Repo::Sled(sled_repo) => {
if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? {
@ -250,7 +257,7 @@ where
impl std::fmt::Debug for ObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObjectStore")
.field("path_gen", &self.path_gen)
.field("path_gen", &"generator")
.field("bucket", &self.bucket.name)
.field("region", &self.bucket.region)
.finish()

View File

@ -1,17 +1,20 @@
use crate::store::{object_store::ObjectError, Identifier};
use crate::{
error::Error,
store::{object_store::ObjectError, Identifier},
};
#[derive(Debug, Clone)]
pub(crate) struct ObjectId(String);
impl Identifier for ObjectId {
type Error = ObjectError;
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error> {
fn to_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.0.as_bytes().to_vec())
}
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error> {
Ok(ObjectId(String::from_utf8(bytes)?))
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Error> {
Ok(ObjectId(
String::from_utf8(bytes).map_err(ObjectError::from)?,
))
}
}

View File

@ -51,7 +51,6 @@ impl UploadManager {
where
S1: Store,
S2: Store,
Error: From<S1::Error> + From<S2::Error>,
{
match self.inner.repo {
Repo::Sled(ref sled_repo) => do_migrate_store(sled_repo, from, to).await,
@ -62,10 +61,7 @@ impl UploadManager {
&self,
store: S,
alias: &Alias,
) -> Result<S::Identifier, Error>
where
Error: From<S::Error>,
{
) -> Result<S::Identifier, Error> {
let identifier = self.identifier_from_alias::<S>(alias).await?;
let details = if let Some(details) = self.details(&identifier).await? {
details
@ -205,10 +201,7 @@ impl UploadManager {
pub(crate) async fn details<I: Identifier>(
&self,
identifier: &I,
) -> Result<Option<Details>, Error>
where
Error: From<I::Error>,
{
) -> Result<Option<Details>, Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => Ok(sled_repo.details(identifier).await?),
}
@ -240,10 +233,7 @@ impl UploadManager {
&self,
store: S,
alias: Alias,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
) -> Result<(), Error> {
let token = match self.inner.repo {
Repo::Sled(ref sled_repo) => sled_repo.delete_token(&alias).await?,
};
@ -258,10 +248,7 @@ impl UploadManager {
store: S,
alias: Alias,
token: DeleteToken,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
) -> Result<(), Error> {
let hash = match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let saved_delete_token = sled_repo.delete_token(&alias).await?;
@ -282,10 +269,7 @@ impl UploadManager {
&self,
store: S,
hash: Vec<u8>,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash: <SledRepo as HashRepo>::Bytes = hash.into();
@ -309,7 +293,7 @@ impl UploadManager {
HashRepo::cleanup(sled_repo, hash).await?;
let cleanup_span = tracing::info_span!("Cleaning files");
let cleanup_span = tracing::info_span!(parent: None, "Cleaning files");
cleanup_span.follows_from(Span::current());
actix_rt::spawn(
@ -323,12 +307,10 @@ impl UploadManager {
{
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(identifier).await {
let e: Error = e.into();
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(&repo, identifier).await {
let e: Error = e.into();
errors.push(e);
}
}
@ -350,10 +332,7 @@ impl UploadManager {
Ok(())
}
pub(crate) fn session<S: Store + Clone + 'static>(&self, store: S) -> UploadManagerSession<S>
where
Error: From<S::Error>,
{
pub(crate) fn session<S: Store + Clone + 'static>(&self, store: S) -> UploadManagerSession<S> {
UploadManagerSession::new(self.clone(), store)
}
}
@ -366,7 +345,6 @@ async fn migrate_file<S1, S2>(
where
S1: Store,
S2: Store,
Error: From<S1::Error> + From<S2::Error>,
{
let stream = from.to_stream(identifier, None, None).await?;
futures_util::pin_mut!(stream);
@ -382,7 +360,6 @@ where
R: IdentifierRepo,
I1: Identifier,
I2: Identifier,
Error: From<<R as IdentifierRepo>::Error>,
{
if let Some(details) = repo.details(&from).await? {
repo.relate_details(to, &details).await?;
@ -396,11 +373,7 @@ async fn do_migrate_store<R, S1, S2>(repo: &R, from: S1, to: S2) -> Result<(), E
where
S1: Store,
S2: Store,
Error: From<S1::Error> + From<S2::Error>,
R: IdentifierRepo + HashRepo + SettingsRepo,
Error: From<<R as IdentifierRepo>::Error>,
Error: From<<R as HashRepo>::Error>,
Error: From<<R as SettingsRepo>::Error>,
{
let stream = repo.hashes().await;
let mut stream = Box::pin(stream);

View File

@ -13,20 +13,14 @@ use futures_util::stream::{Stream, StreamExt};
use tracing::{debug, instrument, Span};
use tracing_futures::Instrument;
pub(crate) struct UploadManagerSession<S: Store + Clone + 'static>
where
Error: From<S::Error>,
{
pub(crate) struct UploadManagerSession<S: Store + Clone + 'static> {
store: S,
manager: UploadManager,
alias: Option<Alias>,
finished: bool,
}
impl<S: Store + Clone + 'static> UploadManagerSession<S>
where
Error: From<S::Error>,
{
impl<S: Store + Clone + 'static> UploadManagerSession<S> {
pub(super) fn new(manager: UploadManager, store: S) -> Self {
UploadManagerSession {
store,
@ -45,10 +39,7 @@ where
}
}
impl<S: Store + Clone + 'static> Drop for UploadManagerSession<S>
where
Error: From<S::Error>,
{
impl<S: Store + Clone + 'static> Drop for UploadManagerSession<S> {
fn drop(&mut self) {
if self.finished {
return;
@ -91,10 +82,7 @@ where
}
}
impl<S: Store> UploadManagerSession<S>
where
Error: From<S::Error>,
{
impl<S: Store> UploadManagerSession<S> {
/// Generate a delete token for an alias
#[instrument(skip(self))]
pub(crate) async fn delete_token(&self) -> Result<DeleteToken, Error> {