Get it compiling again

This commit is contained in:
Aode (lion) 2022-03-26 16:49:23 -05:00
parent 323016f994
commit 15b52ba6ec
16 changed files with 1032 additions and 1263 deletions

2
Cargo.lock generated
View File

@ -1610,7 +1610,7 @@ dependencies = [
[[package]]
name = "pict-rs"
version = "0.3.0-rc.7"
version = "0.4.0-alpha.1"
dependencies = [
"actix-form-data",
"actix-rt",

View File

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.3.0-rc.7"
version = "0.4.0-alpha.1"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"

View File

@ -37,39 +37,33 @@ where
}
}
impl From<sled::transaction::TransactionError<Error>> for Error {
fn from(e: sled::transaction::TransactionError<Error>) -> Self {
match e {
sled::transaction::TransactionError::Abort(t) => t,
sled::transaction::TransactionError::Storage(e) => e.into(),
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum UploadError {
#[error("Couln't upload file, {0}")]
#[error("Couln't upload file")]
Upload(#[from] actix_form_data::Error),
#[error("Error in DB, {0}")]
Db(#[from] sled::Error),
#[error("Error in DB")]
Sled(#[from] crate::repo::sled::Error),
#[error("Error parsing string, {0}")]
#[error("Error in old sled DB")]
OldSled(#[from] ::sled::Error),
#[error("Error parsing string")]
ParseString(#[from] std::string::FromUtf8Error),
#[error("Error interacting with filesystem, {0}")]
#[error("Error interacting with filesystem")]
Io(#[from] std::io::Error),
#[error(transparent)]
#[error("Error generating path")]
PathGenerator(#[from] storage_path_generator::PathError),
#[error(transparent)]
#[error("Error stripping prefix")]
StripPrefix(#[from] std::path::StripPrefixError),
#[error(transparent)]
#[error("Error storing file")]
FileStore(#[from] crate::store::file_store::FileError),
#[error(transparent)]
#[error("Error storing object")]
ObjectStore(#[from] crate::store::object_store::ObjectError),
#[error("Provided process path is invalid")]
@ -87,9 +81,6 @@ pub(crate) enum UploadError {
#[error("Requested a file that doesn't exist")]
MissingAlias,
#[error("Alias directed to missing file")]
MissingFile,
#[error("Provided token did not match expected token")]
InvalidToken,
@ -102,7 +93,7 @@ pub(crate) enum UploadError {
#[error("Unable to download image, bad response {0}")]
Download(actix_web::http::StatusCode),
#[error("Unable to download image, {0}")]
#[error("Unable to download image")]
Payload(#[from] awc::error::PayloadError),
#[error("Unable to send request, {0}")]
@ -117,13 +108,13 @@ pub(crate) enum UploadError {
#[error("Tried to save an image with an already-taken name")]
DuplicateAlias,
#[error("{0}")]
#[error("Error in json")]
Json(#[from] serde_json::Error),
#[error("Range header not satisfiable")]
Range,
#[error(transparent)]
#[error("Hit limit")]
Limit(#[from] super::LimitError),
}

View File

@ -2,6 +2,7 @@ use crate::{
config::Format,
error::{Error, UploadError},
process::Process,
repo::Alias,
store::Store,
};
use actix_web::web::Bytes;
@ -11,8 +12,9 @@ use tokio::{
};
use tracing::instrument;
pub(crate) fn details_hint(filename: &str) -> Option<ValidInputType> {
if filename.ends_with(".mp4") {
pub(crate) fn details_hint(alias: &Alias) -> Option<ValidInputType> {
let ext = alias.extension()?;
if ext.ends_with(".mp4") {
Some(ValidInputType::Mp4)
} else {
None

View File

@ -57,6 +57,7 @@ use self::{
magick::details_hint,
middleware::{Deadline, Internal},
migrate::LatestDb,
repo::{Alias, DeleteToken, Repo},
store::{file_store::FileStore, object_store::ObjectStore, Store},
upload_manager::{UploadManager, UploadManagerSession},
};
@ -96,30 +97,26 @@ where
info!("Uploaded {} as {:?}", image.filename, alias);
let delete_token = image.result.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?;
let identifier = manager.identifier_from_filename::<S>(name.clone()).await?;
let details = manager.variant_details(&identifier, name.clone()).await?;
let identifier = manager.identifier_from_alias::<S>(alias).await?;
let details = manager.details(&identifier).await?;
let details = if let Some(details) = details {
debug!("details exist");
details
} else {
debug!("generating new details from {:?}", identifier);
let hint = details_hint(&name);
let hint = details_hint(alias);
let new_details =
Details::from_store((**store).clone(), identifier.clone(), hint).await?;
debug!("storing details for {:?} {}", identifier, name);
manager
.store_variant_details(&identifier, name, &new_details)
.await?;
debug!("storing details for {:?}", identifier);
manager.store_details(&identifier, &new_details).await?;
debug!("stored");
new_details
};
files.push(serde_json::json!({
"file": alias,
"delete_token": delete_token,
"file": alias.to_string(),
"delete_token": delete_token.to_string(),
"details": details,
}));
}
@ -222,19 +219,16 @@ where
drop(permit);
let delete_token = session.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?;
let identifier = manager.identifier_from_filename::<S>(name.clone()).await?;
let identifier = manager.identifier_from_alias::<S>(&alias).await?;
let details = manager.variant_details(&identifier, name.clone()).await?;
let details = manager.details(&identifier).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&name);
let hint = details_hint(&alias);
let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(&identifier, name, &new_details)
.await?;
manager.store_details(&identifier, &new_details).await?;
new_details
};
@ -242,8 +236,8 @@ where
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok",
"files": [{
"file": alias,
"delete_token": delete_token,
"file": alias.to_string(),
"delete_token": delete_token.to_string(),
"details": details,
}]
})))
@ -261,19 +255,21 @@ where
{
let (alias, token) = path_entries.into_inner();
manager.delete((**store).clone(), token, alias).await?;
let alias = Alias::from_existing(&alias);
let token = DeleteToken::from_existing(&token);
manager.delete((**store).clone(), alias, token).await?;
Ok(HttpResponse::NoContent().finish())
}
type ProcessQuery = Vec<(String, String)>;
async fn prepare_process(
fn prepare_process(
query: web::Query<ProcessQuery>,
ext: &str,
manager: &UploadManager,
filters: &Option<HashSet<String>>,
) -> Result<(Format, String, PathBuf, Vec<String>), Error> {
) -> Result<(Format, Alias, PathBuf, Vec<String>), Error> {
let (alias, operations) =
query
.into_inner()
@ -291,7 +287,7 @@ async fn prepare_process(
return Err(UploadError::MissingFilename.into());
}
let name = manager.from_alias(alias).await?;
let alias = Alias::from_existing(&alias);
let operations = if let Some(filters) = filters.as_ref() {
operations
@ -305,12 +301,10 @@ async fn prepare_process(
let format = ext
.parse::<Format>()
.map_err(|_| UploadError::UnsupportedFormat)?;
let processed_name = format!("{}.{}", name, ext);
let (thumbnail_path, thumbnail_args) =
self::processor::build_chain(&operations, processed_name)?;
let (thumbnail_path, thumbnail_args) = self::processor::build_chain(&operations)?;
Ok((format, name, thumbnail_path, thumbnail_args))
Ok((format, alias, thumbnail_path, thumbnail_args))
}
#[instrument(name = "Fetching derived details", skip(manager, filters))]
@ -324,15 +318,14 @@ async fn process_details<S: Store>(
where
Error: From<S::Error>,
{
let (_, name, thumbnail_path, _) =
prepare_process(query, ext.as_str(), &manager, &filters).await?;
let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?;
let identifier = manager
.variant_identifier::<S>(&thumbnail_path, &name)
.variant_identifier::<S>(&alias, &thumbnail_path)
.await?
.ok_or(UploadError::MissingAlias)?;
let details = manager.variant_details(&identifier, name).await?;
let details = manager.details(&identifier).await?;
let details = details.ok_or(UploadError::NoFiles)?;
@ -352,24 +345,22 @@ async fn process<S: Store + 'static>(
where
Error: From<S::Error>,
{
let (format, name, thumbnail_path, thumbnail_args) =
prepare_process(query, ext.as_str(), &manager, &filters).await?;
let (format, alias, thumbnail_path, thumbnail_args) =
prepare_process(query, ext.as_str(), &filters)?;
let identifier_opt = manager
.variant_identifier::<S>(&thumbnail_path, &name)
.variant_identifier::<S>(&alias, &thumbnail_path)
.await?;
if let Some(identifier) = identifier_opt {
let details_opt = manager.variant_details(&identifier, name.clone()).await?;
let details_opt = manager.details(&identifier).await?;
let details = if let Some(details) = details_opt {
details
} else {
let hint = details_hint(&name);
let hint = details_hint(&alias);
let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(&identifier, name, &details)
.await?;
manager.store_details(&identifier, &details).await?;
details
};
@ -377,7 +368,7 @@ where
}
let identifier = manager
.still_identifier_from_filename((**store).clone(), name.clone())
.still_identifier_from_alias((**store).clone(), &alias)
.await?;
let thumbnail_path2 = thumbnail_path.clone();
@ -405,29 +396,27 @@ where
parent: None,
"Saving variant information",
path = tracing::field::debug(&thumbnail_path),
name = tracing::field::display(&name),
name = tracing::field::display(&alias),
);
save_span.follows_from(Span::current());
let details2 = details.clone();
let bytes2 = bytes.clone();
let alias2 = alias.clone();
actix_rt::spawn(
async move {
let identifier = match store.save_bytes(bytes2, &name).await {
let identifier = match store.save_bytes(bytes2).await {
Ok(identifier) => identifier,
Err(e) => {
tracing::warn!("Failed to generate directory path: {}", e);
return;
}
};
if let Err(e) = manager
.store_variant_details(&identifier, name.clone(), &details2)
.await
{
if let Err(e) = manager.store_details(&identifier, &details2).await {
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager
.store_variant(Some(&thumbnail_path), &identifier, &name)
.store_variant(&alias2, &thumbnail_path, &identifier)
.await
{
tracing::warn!("Error saving variant info: {}", e);
@ -483,19 +472,19 @@ async fn details<S: Store>(
where
Error: From<S::Error>,
{
let name = manager.from_alias(alias.into_inner()).await?;
let identifier = manager.identifier_from_filename::<S>(name.clone()).await?;
let alias = alias.into_inner();
let alias = Alias::from_existing(&alias);
let details = manager.variant_details(&identifier, name.clone()).await?;
let identifier = manager.identifier_from_alias::<S>(&alias).await?;
let details = manager.details(&identifier).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&name);
let hint = details_hint(&alias);
let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(&identifier, name, &new_details)
.await?;
manager.store_details(&identifier, &new_details).await?;
new_details
};
@ -513,19 +502,18 @@ async fn serve<S: Store>(
where
Error: From<S::Error>,
{
let name = manager.from_alias(alias.into_inner()).await?;
let identifier = manager.identifier_from_filename::<S>(name.clone()).await?;
let alias = alias.into_inner();
let alias = Alias::from_existing(&alias);
let identifier = manager.identifier_from_alias::<S>(&alias).await?;
let details = manager.variant_details(&identifier, name.clone()).await?;
let details = manager.details(&identifier).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&name);
let hint = details_hint(&alias);
let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(&identifier, name, &details)
.await?;
manager.store_details(&identifier, &details).await?;
details
};
@ -605,25 +593,21 @@ where
}
#[derive(Debug, serde::Deserialize)]
#[serde(untagged)]
enum FileOrAlias {
File { file: String },
Alias { alias: String },
struct AliasQuery {
alias: String,
}
#[instrument(name = "Purging file", skip(upload_manager))]
async fn purge<S: Store>(
query: web::Query<FileOrAlias>,
query: web::Query<AliasQuery>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let aliases = match query.into_inner() {
FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?,
FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?,
};
let alias = Alias::from_existing(&query.alias);
let aliases = upload_manager.aliases_by_alias(&alias).await?;
for alias in aliases.iter() {
upload_manager
@ -633,49 +617,25 @@ where
Ok(HttpResponse::Ok().json(&serde_json::json!({
"msg": "ok",
"aliases": aliases
"aliases": aliases.iter().map(|a| a.to_string()).collect::<Vec<_>>()
})))
}
#[instrument(name = "Fetching aliases", skip(upload_manager))]
async fn aliases<S: Store>(
query: web::Query<FileOrAlias>,
query: web::Query<AliasQuery>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let aliases = match query.into_inner() {
FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?,
FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?,
};
let alias = Alias::from_existing(&query.alias);
let aliases = upload_manager.aliases_by_alias(&alias).await?;
Ok(HttpResponse::Ok().json(&serde_json::json!({
"msg": "ok",
"aliases": aliases,
})))
}
#[derive(Debug, serde::Deserialize)]
struct ByAlias {
alias: String,
}
#[instrument(name = "Fetching filename", skip(upload_manager))]
async fn filename_by_alias<S: Store>(
query: web::Query<ByAlias>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let filename = upload_manager.from_alias(query.into_inner().alias).await?;
Ok(HttpResponse::Ok().json(&serde_json::json!({
"msg": "ok",
"filename": filename,
"aliases": aliases.iter().map(|a| a.to_string()).collect::<Vec<_>>()
})))
}
@ -817,10 +777,7 @@ where
.route(web::post().to(upload::<S>)),
)
.service(web::resource("/purge").route(web::post().to(purge::<S>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<S>)))
.service(
web::resource("/filename").route(web::get().to(filename_by_alias::<S>)),
),
.service(web::resource("/aliases").route(web::get().to(aliases::<S>))),
)
})
.bind(CONFIG.bind_address())?
@ -834,7 +791,7 @@ where
async fn migrate_inner<S1>(
manager: &UploadManager,
db: &sled::Db,
repo: &Repo,
from: S1,
to: &config::Storage,
) -> anyhow::Result<()>
@ -844,9 +801,7 @@ where
{
match to {
config::Storage::Filesystem(RequiredFilesystemStorage { path }) => {
let to = FileStore::build(path.clone(), db)?;
manager.restructure(&to).await?;
let to = FileStore::build(path.clone(), repo.clone()).await?;
manager.migrate_store::<S1, FileStore>(from, to).await?;
}
config::Storage::ObjectStorage(RequiredObjectStorage {
@ -864,9 +819,10 @@ where
secret_key.clone(),
security_token.clone(),
session_token.clone(),
db,
repo.clone(),
build_reqwest_client()?,
)?;
)
.await?;
manager.migrate_store::<S1, ObjectStore>(from, to).await?;
}
@ -883,13 +839,12 @@ async fn main() -> anyhow::Result<()> {
CONFIG.console_buffer_capacity(),
)?;
let repo = Repo::open(CONFIG.repo())?;
let db = LatestDb::exists(CONFIG.data_dir()).migrate()?;
let repo = self::repo::Repo::open(CONFIG.repo())?;
repo.from_db(db).await?;
let manager = UploadManager::new(db.clone(), CONFIG.format()).await?;
let manager = UploadManager::new(repo.clone(), CONFIG.format()).await?;
match CONFIG.command()? {
CommandConfig::Run => (),
@ -901,10 +856,8 @@ async fn main() -> anyhow::Result<()> {
match from {
config::Storage::Filesystem(RequiredFilesystemStorage { path }) => {
let from = FileStore::build(path.clone(), &db)?;
manager.restructure(&from).await?;
migrate_inner(&manager, &db, from, &to).await?;
let from = FileStore::build(path.clone(), repo.clone()).await?;
migrate_inner(&manager, &repo, from, &to).await?;
}
config::Storage::ObjectStorage(RequiredObjectStorage {
bucket_name,
@ -921,11 +874,12 @@ async fn main() -> anyhow::Result<()> {
secret_key,
security_token,
session_token,
&db,
repo.clone(),
build_reqwest_client()?,
)?;
)
.await?;
migrate_inner(&manager, &db, from, &to).await?;
migrate_inner(&manager, &repo, from, &to).await?;
}
}
@ -935,9 +889,7 @@ async fn main() -> anyhow::Result<()> {
match CONFIG.store()? {
config::Storage::Filesystem(RequiredFilesystemStorage { path }) => {
let store = FileStore::build(path, &db)?;
manager.restructure(&store).await?;
let store = FileStore::build(path, repo).await?;
launch(manager, store).await
}
config::Storage::ObjectStorage(RequiredObjectStorage {
@ -955,9 +907,10 @@ async fn main() -> anyhow::Result<()> {
secret_key,
security_token,
session_token,
&db,
repo,
build_reqwest_client()?,
)?;
)
.await?;
launch(manager, store).await
}

View File

@ -95,26 +95,3 @@ impl DbVersion {
}
}
}
pub(crate) fn alias_key_bounds(hash: &[u8]) -> (Vec<u8>, Vec<u8>) {
let mut start = hash.to_vec();
start.extend(&[0]);
let mut end = hash.to_vec();
end.extend(&[1]);
(start, end)
}
pub(crate) fn alias_id_key(alias: &str) -> String {
format!("{}/id", alias)
}
pub(crate) fn alias_key(hash: &[u8], id: &str) -> Vec<u8> {
let mut key = hash.to_vec();
// add a separator to the key between the hash and the ID
key.extend(&[0]);
key.extend(id.as_bytes());
key
}

View File

@ -20,10 +20,7 @@ pub(crate) struct Crop(usize, usize);
pub(crate) struct Blur(f64);
#[instrument]
pub(crate) fn build_chain(
args: &[(String, String)],
filename: String,
) -> Result<(PathBuf, Vec<String>), Error> {
pub(crate) fn build_chain(args: &[(String, String)]) -> Result<(PathBuf, Vec<String>), Error> {
fn parse<P: Processor>(key: &str, value: &str) -> Result<Option<P>, UploadError> {
if key == P::NAME {
return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?));
@ -56,7 +53,7 @@ pub(crate) fn build_chain(
}
})?;
Ok((path.join(filename), args))
Ok((path, args))
}
impl Processor for Identity {

View File

@ -1,105 +1,131 @@
use crate::config::RequiredSledRepo;
use crate::{config::Repository, details::Details, store::Identifier};
use futures_util::Stream;
use tracing::debug;
use uuid::Uuid;
mod old;
pub(crate) mod sled;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub(crate) enum Repo {
Sled(self::sled::SledRepo),
}
#[derive(Clone, Debug)]
pub(crate) struct Alias {
id: Uuid,
extension: String,
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
enum MaybeUuid {
Uuid(Uuid),
Name(String),
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct Alias {
id: MaybeUuid,
extension: Option<String>,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct DeleteToken {
id: Uuid,
id: MaybeUuid,
}
pub(crate) struct AlreadyExists;
#[async_trait::async_trait]
#[async_trait::async_trait(?Send)]
pub(crate) trait SettingsRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
type Error: std::error::Error + Send + Sync + 'static;
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>;
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Self::Error>;
async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>;
}
#[async_trait::async_trait]
pub(crate) trait IdentifierRepo<I: Identifier> {
type Error: std::error::Error;
#[async_trait::async_trait(?Send)]
pub(crate) trait IdentifierRepo {
type Error: std::error::Error + Send + Sync + 'static;
async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error>;
async fn details(&self, identifier: I) -> Result<Option<Details>, Self::Error>;
async fn relate_details<I: Identifier>(
&self,
identifier: &I,
details: &Details,
) -> Result<(), Self::Error>;
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Self::Error>;
async fn cleanup(&self, identifier: I) -> Result<(), Self::Error>;
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Self::Error>;
}
#[async_trait::async_trait]
pub(crate) trait HashRepo<I: Identifier> {
#[async_trait::async_trait(?Send)]
pub(crate) trait HashRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
type Error: std::error::Error + Send + Sync + 'static;
type Stream: Stream<Item = Result<Self::Bytes, Self::Error>>;
async fn hashes(&self) -> Self::Stream;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>;
async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>;
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error>;
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error>;
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Self::Error>;
async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error>;
async fn identifier(&self, hash: Self::Bytes) -> Result<I, Self::Error>;
async fn relate_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Self::Error>;
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<I, Self::Error>;
async fn relate_variant_identifier(
async fn relate_variant_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
variant: String,
identifier: I,
identifier: &I,
) -> Result<(), Self::Error>;
async fn variant_identifier(
async fn variant_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Self::Error>;
async fn relate_motion_identifier(
async fn variants<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
identifier: I,
) -> Result<Vec<(String, I)>, Self::Error>;
async fn relate_motion_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Self::Error>;
async fn motion_identifier(&self, hash: Self::Bytes) -> Result<Option<I>, Self::Error>;
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Option<I>, Self::Error>;
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>;
}
#[async_trait::async_trait]
#[async_trait::async_trait(?Send)]
pub(crate) trait AliasRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
type Error: std::error::Error + Send + Sync + 'static;
async fn create(&self, alias: Alias) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn relate_delete_token(
&self,
alias: Alias,
delete_token: DeleteToken,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn delete_token(&self, alias: Alias) -> Result<DeleteToken, Self::Error>;
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Self::Error>;
async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error>;
async fn hash(&self, alias: Alias) -> Result<Self::Bytes, Self::Error>;
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Self::Error>;
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Self::Error>;
async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error>;
async fn cleanup(&self, alias: &Alias) -> Result<(), Self::Error>;
}
impl Repo {
@ -167,72 +193,134 @@ const GENERATOR_KEY: &[u8] = b"last-path";
async fn migrate_hash<T>(repo: &T, old: &old::Old, hash: ::sled::IVec) -> anyhow::Result<()>
where
T: IdentifierRepo<::sled::IVec>,
<T as IdentifierRepo<::sled::IVec>>::Error: Send + Sync + 'static,
T: HashRepo<::sled::IVec>,
<T as HashRepo<::sled::IVec>>::Error: Send + Sync + 'static,
T: AliasRepo,
<T as AliasRepo>::Error: Send + Sync + 'static,
T: SettingsRepo,
<T as SettingsRepo>::Error: Send + Sync + 'static,
T: IdentifierRepo + HashRepo + AliasRepo + SettingsRepo,
{
HashRepo::create(repo, hash.to_vec().into()).await?;
if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() {
debug!("Duplicate hash detected");
return Ok(());
}
let main_ident = old.main_identifier(&hash)?;
let main_ident = old.main_identifier(&hash)?.to_vec();
HashRepo::relate_identifier(repo, hash.to_vec().into(), main_ident.clone()).await?;
repo.relate_identifier(hash.to_vec().into(), &main_ident)
.await?;
for alias in old.aliases(&hash) {
if let Ok(Ok(())) = AliasRepo::create(repo, alias.clone()).await {
let _ = HashRepo::relate_alias(repo, hash.to_vec().into(), alias.clone()).await;
let _ = AliasRepo::relate_hash(repo, alias.clone(), hash.to_vec().into()).await;
if let Ok(Ok(())) = AliasRepo::create(repo, &alias).await {
let _ = repo.relate_alias(hash.to_vec().into(), &alias).await;
let _ = repo.relate_hash(&alias, hash.to_vec().into()).await;
if let Ok(Some(delete_token)) = old.delete_token(&alias) {
let _ = AliasRepo::relate_delete_token(repo, alias, delete_token).await;
let _ = repo.relate_delete_token(&alias, &delete_token).await;
}
}
}
if let Ok(Some(identifier)) = old.motion_identifier(&hash) {
HashRepo::relate_motion_identifier(repo, hash.to_vec().into(), identifier).await;
let _ = repo
.relate_motion_identifier(hash.to_vec().into(), &identifier.to_vec())
.await;
}
for (variant, identifier) in old.variants(&hash)? {
let _ =
HashRepo::relate_variant_identifier(repo, hash.to_vec().into(), variant, identifier)
for (variant_path, identifier) in old.variants(&hash)? {
let variant = variant_path.to_string_lossy().to_string();
let _ = repo
.relate_variant_identifier(hash.to_vec().into(), variant, &identifier.to_vec())
.await;
}
for (identifier, details) in old.details(&hash)? {
let _ = IdentifierRepo::relate_details(repo, identifier, details).await;
let _ = repo.relate_details(&identifier.to_vec(), &details).await;
}
if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS) {
SettingsRepo::set(repo, STORE_MIGRATION_PROGRESS, value.to_vec().into()).await?;
repo.set(STORE_MIGRATION_PROGRESS, value.to_vec().into())
.await?;
}
if let Ok(Some(value)) = old.setting(GENERATOR_KEY) {
SettingsRepo::set(repo, GENERATOR_KEY, value.to_vec().into()).await?;
repo.set(GENERATOR_KEY, value.to_vec().into()).await?;
}
Ok(())
}
impl MaybeUuid {
fn from_str(s: &str) -> Self {
if let Ok(uuid) = Uuid::parse_str(s) {
MaybeUuid::Uuid(uuid)
} else {
MaybeUuid::Name(s.into())
}
}
fn as_bytes(&self) -> &[u8] {
match self {
Self::Uuid(uuid) => &uuid.as_bytes()[..],
Self::Name(name) => name.as_bytes(),
}
}
}
fn split_at_dot(s: &str) -> Option<(&str, &str)> {
let index = s.find('.')?;
Some(s.split_at(index))
}
impl Alias {
pub(crate) fn generate(extension: String) -> Self {
Alias {
id: MaybeUuid::Uuid(Uuid::new_v4()),
extension: Some(extension),
}
}
pub(crate) fn from_existing(alias: &str) -> Self {
if let Some((start, end)) = split_at_dot(alias) {
Alias {
id: MaybeUuid::from_str(start),
extension: Some(end.into()),
}
} else {
Alias {
id: MaybeUuid::from_str(alias),
extension: None,
}
}
}
pub(crate) fn extension(&self) -> Option<&str> {
self.extension.as_deref()
}
fn to_bytes(&self) -> Vec<u8> {
let mut v = self.id.as_bytes().to_vec();
v.extend_from_slice(self.extension.as_bytes());
if let Some(ext) = self.extension() {
v.extend_from_slice(ext.as_bytes());
}
v
}
fn from_slice(bytes: &[u8]) -> Option<Self> {
if bytes.len() > 16 {
if let Ok(s) = std::str::from_utf8(bytes) {
Some(Self::from_existing(s))
} else if bytes.len() >= 16 {
let id = Uuid::from_slice(&bytes[0..16]).expect("Already checked length");
let extension = String::from_utf8_lossy(&bytes[16..]).to_string();
Some(Self { id, extension })
let extension = if bytes.len() > 16 {
Some(String::from_utf8_lossy(&bytes[16..]).to_string())
} else {
None
};
Some(Self {
id: MaybeUuid::Uuid(id),
extension,
})
} else {
None
}
@ -240,20 +328,63 @@ impl Alias {
}
impl DeleteToken {
pub(crate) fn from_existing(existing: &str) -> Self {
if let Ok(uuid) = Uuid::parse_str(existing) {
DeleteToken {
id: MaybeUuid::Uuid(uuid),
}
} else {
DeleteToken {
id: MaybeUuid::Name(existing.into()),
}
}
}
pub(crate) fn generate() -> Self {
Self {
id: MaybeUuid::Uuid(Uuid::new_v4()),
}
}
fn to_bytes(&self) -> Vec<u8> {
self.id.as_bytes().to_vec()
}
fn from_slice(bytes: &[u8]) -> Option<Self> {
if let Ok(s) = std::str::from_utf8(bytes) {
Some(DeleteToken::from_existing(s))
} else if bytes.len() == 16 {
Some(DeleteToken {
id: Uuid::from_slice(bytes).ok()?,
id: MaybeUuid::Uuid(Uuid::from_slice(bytes).ok()?),
})
} else {
None
}
}
}
impl std::fmt::Display for MaybeUuid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Uuid(id) => write!(f, "{}", id),
Self::Name(name) => write!(f, "{}", name),
}
}
}
impl std::fmt::Display for DeleteToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id)
}
}
impl std::fmt::Display for Alias {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}", self.id, self.extension)
if let Some(ext) = self.extension() {
write!(f, "{}{}", self.id, ext)
} else {
write!(f, "{}", self.id)
}
}
}
@ -272,17 +403,220 @@ impl Identifier for Vec<u8> {
}
}
impl Identifier for ::sled::IVec {
type Error = std::convert::Infallible;
#[cfg(test)]
mod tests {
use super::{Alias, DeleteToken, MaybeUuid, Uuid};
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(bytes.into())
#[test]
fn string_delete_token() {
let delete_token = DeleteToken::from_existing("blah");
assert_eq!(
delete_token,
DeleteToken {
id: MaybeUuid::Name(String::from("blah"))
}
)
}
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error> {
Ok(self.to_vec())
#[test]
fn uuid_string_delete_token() {
let uuid = Uuid::new_v4();
let delete_token = DeleteToken::from_existing(&uuid.to_string());
assert_eq!(
delete_token,
DeleteToken {
id: MaybeUuid::Uuid(uuid),
}
)
}
#[test]
fn bytes_delete_token() {
let delete_token = DeleteToken::from_slice(b"blah").unwrap();
assert_eq!(
delete_token,
DeleteToken {
id: MaybeUuid::Name(String::from("blah"))
}
)
}
#[test]
fn uuid_bytes_delete_token() {
let uuid = Uuid::new_v4();
let delete_token = DeleteToken::from_slice(&uuid.as_bytes()[..]).unwrap();
assert_eq!(
delete_token,
DeleteToken {
id: MaybeUuid::Uuid(uuid),
}
)
}
#[test]
fn uuid_bytes_string_delete_token() {
let uuid = Uuid::new_v4();
let delete_token = DeleteToken::from_slice(uuid.to_string().as_bytes()).unwrap();
assert_eq!(
delete_token,
DeleteToken {
id: MaybeUuid::Uuid(uuid),
}
)
}
#[test]
fn string_alias() {
let alias = Alias::from_existing("blah");
assert_eq!(
alias,
Alias {
id: MaybeUuid::Name(String::from("blah")),
extension: None
}
);
}
#[test]
fn string_alias_ext() {
let alias = Alias::from_existing("blah.mp4");
assert_eq!(
alias,
Alias {
id: MaybeUuid::Name(String::from("blah")),
extension: Some(String::from(".mp4")),
}
);
}
#[test]
fn uuid_string_alias() {
let uuid = Uuid::new_v4();
let alias = Alias::from_existing(&uuid.to_string());
assert_eq!(
alias,
Alias {
id: MaybeUuid::Uuid(uuid),
extension: None,
}
)
}
#[test]
fn uuid_string_alias_ext() {
let uuid = Uuid::new_v4();
let alias_str = format!("{}.mp4", uuid);
let alias = Alias::from_existing(&alias_str);
assert_eq!(
alias,
Alias {
id: MaybeUuid::Uuid(uuid),
extension: Some(String::from(".mp4")),
}
)
}
#[test]
fn bytes_alias() {
let alias = Alias::from_slice(b"blah").unwrap();
assert_eq!(
alias,
Alias {
id: MaybeUuid::Name(String::from("blah")),
extension: None
}
);
}
#[test]
fn bytes_alias_ext() {
let alias = Alias::from_slice(b"blah.mp4").unwrap();
assert_eq!(
alias,
Alias {
id: MaybeUuid::Name(String::from("blah")),
extension: Some(String::from(".mp4")),
}
);
}
#[test]
fn uuid_bytes_alias() {
let uuid = Uuid::new_v4();
let alias = Alias::from_slice(&uuid.as_bytes()[..]).unwrap();
assert_eq!(
alias,
Alias {
id: MaybeUuid::Uuid(uuid),
extension: None,
}
)
}
#[test]
fn uuid_bytes_string_alias() {
let uuid = Uuid::new_v4();
let alias = Alias::from_slice(uuid.to_string().as_bytes()).unwrap();
assert_eq!(
alias,
Alias {
id: MaybeUuid::Uuid(uuid),
extension: None,
}
)
}
#[test]
fn uuid_bytes_alias_ext() {
let uuid = Uuid::new_v4();
let mut alias_bytes = uuid.as_bytes().to_vec();
alias_bytes.extend_from_slice(b".mp4");
let alias = Alias::from_slice(&alias_bytes).unwrap();
assert_eq!(
alias,
Alias {
id: MaybeUuid::Uuid(uuid),
extension: Some(String::from(".mp4")),
}
)
}
#[test]
fn uuid_bytes_string_alias_ext() {
let uuid = Uuid::new_v4();
let alias_str = format!("{}.mp4", uuid);
let alias = Alias::from_slice(alias_str.as_bytes()).unwrap();
assert_eq!(
alias,
Alias {
id: MaybeUuid::Uuid(uuid),
extension: Some(String::from(".mp4")),
}
)
}
}

View File

@ -17,8 +17,9 @@
// - Settings Tree
// - store-migration-progress -> Path Tree Key
use std::path::PathBuf;
use super::{Alias, DeleteToken, Details};
use uuid::Uuid;
pub(super) struct Old {
alias_tree: ::sled::Tree,
@ -91,7 +92,7 @@ impl Old {
.ok_or_else(|| anyhow::anyhow!("Missing identifier"))
}
pub(super) fn variants(&self, hash: &sled::IVec) -> anyhow::Result<Vec<(String, sled::IVec)>> {
pub(super) fn variants(&self, hash: &sled::IVec) -> anyhow::Result<Vec<(PathBuf, sled::IVec)>> {
let filename = self
.main_tree
.get(hash)?
@ -106,13 +107,16 @@ impl Old {
.scan_prefix(&variant_prefix)
.filter_map(|res| res.ok())
.filter_map(|(key, value)| {
let key_str = String::from_utf8_lossy(&key);
let variant_path = key_str.trim_start_matches(&variant_prefix);
if variant_path == "motion" {
let variant_path_bytes = &key[variant_prefix.as_bytes().len()..];
if variant_path_bytes == b"motion" {
return None;
}
Some((variant_path.to_string(), value))
let path = String::from_utf8(variant_path_bytes.to_vec()).ok()?;
let mut path = PathBuf::from(path);
path.pop();
Some((path, value))
})
.collect())
}
@ -141,29 +145,15 @@ impl Old {
.scan_prefix(key)
.values()
.filter_map(|res| res.ok())
.filter_map(|alias| {
let alias_str = String::from_utf8_lossy(&alias);
let (uuid, ext) = alias_str.split_once('.')?;
let uuid = uuid.parse::<Uuid>().ok()?;
Some(Alias {
id: uuid,
extension: ext.to_string(),
})
})
.filter_map(|alias| Alias::from_slice(&alias))
.collect()
}
pub(super) fn delete_token(&self, alias: &Alias) -> anyhow::Result<Option<DeleteToken>> {
let key = format!("{}{}/delete", alias.id, alias.extension);
let key = format!("{}/delete", alias);
if let Some(ivec) = self.alias_tree.get(key)? {
let token_str = String::from_utf8_lossy(&ivec);
if let Ok(uuid) = token_str.parse::<Uuid>() {
return Ok(Some(DeleteToken { id: uuid }));
}
return Ok(DeleteToken::from_slice(&ivec));
}
Ok(None)

View File

@ -18,7 +18,7 @@ pub(crate) enum Error {
Sled(#[from] sled::Error),
#[error("Invalid identifier")]
Identifier(#[source] Box<dyn std::error::Error + Send + Sync>),
Identifier(#[source] Box<dyn std::error::Error + Sync + Send>),
#[error("Invalid details json")]
Details(#[from] serde_json::Error),
@ -30,6 +30,7 @@ pub(crate) enum Error {
Panic,
}
#[derive(Clone)]
pub(crate) struct SledRepo {
settings: Tree,
identifier_details: Tree,
@ -62,7 +63,7 @@ impl SledRepo {
}
}
#[async_trait::async_trait]
#[async_trait::async_trait(?Send)]
impl SettingsRepo for SledRepo {
type Bytes = IVec;
type Error = Error;
@ -89,31 +90,35 @@ impl SettingsRepo for SledRepo {
fn identifier_bytes<I>(identifier: &I) -> Result<Vec<u8>, Error>
where
I: Identifier,
I::Error: Send + Sync + 'static,
{
identifier
.to_bytes()
.map_err(|e| Error::Identifier(Box::new(e)))
}
fn variant_key(hash: &[u8], variant: &str) -> Result<Vec<u8>, Error> {
fn variant_key(hash: &[u8], variant: &str) -> Vec<u8> {
let mut bytes = hash.to_vec();
bytes.push(b'/');
bytes.extend_from_slice(variant.as_bytes());
Ok(bytes)
bytes
}
#[async_trait::async_trait]
impl<I> IdentifierRepo<I> for SledRepo
where
I: Identifier + 'static,
I::Error: Send + Sync + 'static,
{
fn variant_from_key(hash: &[u8], key: &[u8]) -> Option<String> {
let prefix_len = hash.len() + 1;
let variant_bytes = key.get(prefix_len..)?.to_vec();
String::from_utf8(variant_bytes).ok()
}
#[async_trait::async_trait(?Send)]
impl IdentifierRepo for SledRepo {
type Error = Error;
async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error> {
let key = identifier_bytes(&identifier)?;
async fn relate_details<I: Identifier>(
&self,
identifier: &I,
details: &Details,
) -> Result<(), Self::Error> {
let key = identifier_bytes(identifier)?;
let details = serde_json::to_vec(&details)?;
b!(
@ -124,8 +129,8 @@ where
Ok(())
}
async fn details(&self, identifier: I) -> Result<Option<Details>, Self::Error> {
let key = identifier_bytes(&identifier)?;
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Self::Error> {
let key = identifier_bytes(identifier)?;
let opt = b!(self.identifier_details, identifier_details.get(key));
@ -136,8 +141,8 @@ where
}
}
async fn cleanup(&self, identifier: I) -> Result<(), Self::Error> {
let key = identifier_bytes(&identifier)?;
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Self::Error> {
let key = identifier_bytes(identifier)?;
b!(self.identifier_details, identifier_details.remove(key));
@ -182,11 +187,12 @@ impl futures_util::Stream for HashStream {
} else if let Some(mut iter) = this.hashes.take() {
let fut = Box::pin(async move {
actix_rt::task::spawn_blocking(move || {
let opt = iter.next().map(|res| res.map_err(Error::from));
let opt = iter.next();
(iter, opt)
})
.await
.map(|(iter, opt)| (iter, opt.map(|res| res.map_err(Error::from))))
.map_err(Error::from)
});
@ -204,12 +210,8 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec<u8> {
v
}
#[async_trait::async_trait]
impl<I> HashRepo<I> for SledRepo
where
I: Identifier + 'static,
I::Error: Send + Sync + 'static,
{
#[async_trait::async_trait(?Send)]
impl HashRepo for SledRepo {
type Bytes = IVec;
type Error = Error;
type Stream = HashStream;
@ -232,19 +234,17 @@ where
Ok(res.map_err(|_| AlreadyExists))
}
async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error> {
let key = hash_alias_key(&hash, &alias);
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error> {
let key = hash_alias_key(&hash, alias);
let value = alias.to_bytes();
b!(
self.hash_aliases,
hash_aliases.insert(key, alias.to_bytes())
);
b!(self.hash_aliases, hash_aliases.insert(key, value));
Ok(())
}
async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error> {
let key = hash_alias_key(&hash, &alias);
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Self::Error> {
let key = hash_alias_key(&hash, alias);
b!(self.hash_aliases, hash_aliases.remove(key));
@ -258,21 +258,28 @@ where
.values()
.filter_map(Result::ok)
.filter_map(|ivec| Alias::from_slice(&ivec))
.collect::<Vec<_>>()) as Result<_, Error>
.collect::<Vec<_>>()) as Result<_, sled::Error>
});
Ok(v)
}
async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error> {
let bytes = identifier_bytes(&identifier)?;
async fn relate_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Self::Error> {
let bytes = identifier_bytes(identifier)?;
b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes));
Ok(())
}
async fn identifier(&self, hash: Self::Bytes) -> Result<I, Self::Error> {
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<I, Self::Error> {
let opt = b!(self.hash_identifiers, hash_identifiers.get(hash));
opt.ok_or(Error::Missing).and_then(|ivec| {
@ -280,14 +287,14 @@ where
})
}
async fn relate_variant_identifier(
async fn relate_variant_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
variant: String,
identifier: I,
identifier: &I,
) -> Result<(), Self::Error> {
let key = variant_key(&hash, &variant)?;
let value = identifier_bytes(&identifier)?;
let key = variant_key(&hash, &variant);
let value = identifier_bytes(identifier)?;
b!(
self.hash_variant_identifiers,
@ -297,12 +304,12 @@ where
Ok(())
}
async fn variant_identifier(
async fn variant_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Self::Error> {
let key = variant_key(&hash, &variant)?;
let key = variant_key(&hash, &variant);
let opt = b!(
self.hash_variant_identifiers,
@ -318,12 +325,33 @@ where
}
}
async fn relate_motion_identifier(
async fn variants<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
identifier: I,
) -> Result<Vec<(String, I)>, Self::Error> {
let vec = b!(
self.hash_variant_identifiers,
Ok(hash_variant_identifiers
.scan_prefix(&hash)
.filter_map(|res| res.ok())
.filter_map(|(key, ivec)| {
let identifier = I::from_bytes(ivec.to_vec()).ok()?;
let variant = variant_from_key(&hash, &key)?;
Some((variant, identifier))
})
.collect::<Vec<_>>()) as Result<Vec<_>, sled::Error>
);
Ok(vec)
}
async fn relate_motion_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Self::Error> {
let bytes = identifier_bytes(&identifier)?;
let bytes = identifier_bytes(identifier)?;
b!(
self.hash_motion_identifiers,
@ -333,7 +361,10 @@ where
Ok(())
}
async fn motion_identifier(&self, hash: Self::Bytes) -> Result<Option<I>, Self::Error> {
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Option<I>, Self::Error> {
let opt = b!(
self.hash_motion_identifiers,
hash_motion_identifiers.get(hash)
@ -361,7 +392,7 @@ where
hash_motion_identifiers.remove(hash2)
);
let aliases = HashRepo::<I>::aliases(self, hash.clone()).await?;
let aliases = self.aliases(hash.clone()).await?;
let hash2 = hash.clone();
b!(self.hash_aliases, {
for alias in aliases {
@ -369,7 +400,7 @@ where
let _ = hash_aliases.remove(key);
}
Ok(()) as Result<(), Error>
Ok(()) as Result<(), sled::Error>
});
let variant_keys = b!(self.hash_variant_identifiers, {
@ -379,25 +410,25 @@ where
.filter_map(Result::ok)
.collect::<Vec<_>>();
Ok(v) as Result<Vec<_>, Error>
Ok(v) as Result<Vec<_>, sled::Error>
});
b!(self.hash_variant_identifiers, {
for key in variant_keys {
let _ = hash_variant_identifiers.remove(key);
}
Ok(()) as Result<(), Error>
Ok(()) as Result<(), sled::Error>
});
Ok(())
}
}
#[async_trait::async_trait]
#[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo {
type Bytes = sled::IVec;
type Error = Error;
async fn create(&self, alias: Alias) -> Result<Result<(), AlreadyExists>, Self::Error> {
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Self::Error> {
let bytes = alias.to_bytes();
let bytes2 = bytes.clone();
@ -411,8 +442,8 @@ impl AliasRepo for SledRepo {
async fn relate_delete_token(
&self,
alias: Alias,
delete_token: DeleteToken,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, Self::Error> {
let key = alias.to_bytes();
let token = delete_token.to_bytes();
@ -425,7 +456,7 @@ impl AliasRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
async fn delete_token(&self, alias: Alias) -> Result<DeleteToken, Self::Error> {
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Self::Error> {
let key = alias.to_bytes();
let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key));
@ -434,7 +465,7 @@ impl AliasRepo for SledRepo {
.ok_or(Error::Missing)
}
async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error> {
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Self::Error> {
let key = alias.to_bytes();
b!(self.alias_hashes, alias_hashes.insert(key, hash));
@ -442,7 +473,7 @@ impl AliasRepo for SledRepo {
Ok(())
}
async fn hash(&self, alias: Alias) -> Result<Self::Bytes, Self::Error> {
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Self::Error> {
let key = alias.to_bytes();
let opt = b!(self.alias_hashes, alias_hashes.get(key));
@ -450,7 +481,7 @@ impl AliasRepo for SledRepo {
opt.ok_or(Error::Missing)
}
async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error> {
async fn cleanup(&self, alias: &Alias) -> Result<(), Self::Error> {
let key = alias.to_bytes();
let key2 = key.clone();

View File

@ -8,7 +8,7 @@ pub(crate) mod file_store;
pub(crate) mod object_store;
pub(crate) trait Identifier: Send + Sync + Clone + Debug {
type Error: std::error::Error;
type Error: std::error::Error + Send + Sync + 'static;
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error>;
@ -19,23 +19,18 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug {
#[async_trait::async_trait(?Send)]
pub(crate) trait Store: Send + Sync + Clone + Debug + 'static {
type Error: std::error::Error;
type Error: std::error::Error + Send + Sync + 'static;
type Identifier: Identifier<Error = Self::Error>;
type Stream: Stream<Item = std::io::Result<Bytes>>;
async fn save_async_read<Reader>(
&self,
reader: &mut Reader,
filename: &str,
) -> Result<Self::Identifier, Self::Error>
where
Reader: AsyncRead + Unpin;
async fn save_bytes(
&self,
bytes: Bytes,
filename: &str,
) -> Result<Self::Identifier, Self::Error>;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error>;
async fn to_stream(
&self,

View File

@ -1,4 +1,8 @@
use crate::{file::File, store::Store};
use crate::{
file::File,
repo::{Repo, SettingsRepo},
store::Store,
};
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use std::{
@ -10,24 +14,22 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, error, instrument};
mod file_id;
mod restructure;
pub(crate) use file_id::FileId;
// - Settings Tree
// - last-path -> last generated path
// - fs-restructure-01-complete -> bool
const GENERATOR_KEY: &[u8] = b"last-path";
#[derive(Debug, thiserror::Error)]
pub(crate) enum FileError {
#[error(transparent)]
Sled(#[from] sled::Error),
#[error("Failed to interact with sled db")]
Sled(#[from] crate::repo::sled::Error),
#[error(transparent)]
#[error("Failed to read or write file")]
Io(#[from] std::io::Error),
#[error(transparent)]
#[error("Failed to generate path")]
PathGenerator(#[from] storage_path_generator::PathError),
#[error("Error formatting file store identifier")]
@ -44,7 +46,7 @@ pub(crate) enum FileError {
pub(crate) struct FileStore {
path_gen: Generator,
root_dir: PathBuf,
settings_tree: sled::Tree,
repo: Repo,
}
#[async_trait::async_trait(?Send)]
@ -57,12 +59,11 @@ impl Store for FileStore {
async fn save_async_read<Reader>(
&self,
reader: &mut Reader,
filename: &str,
) -> Result<Self::Identifier, Self::Error>
where
Reader: AsyncRead + Unpin,
{
let path = self.next_file(filename)?;
let path = self.next_file().await?;
if let Err(e) = self.safe_save_reader(&path, reader).await {
self.safe_remove_file(&path).await?;
@ -73,12 +74,8 @@ impl Store for FileStore {
}
#[tracing::instrument(skip(bytes))]
async fn save_bytes(
&self,
bytes: Bytes,
filename: &str,
) -> Result<Self::Identifier, Self::Error> {
let path = self.next_file(filename)?;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error> {
let path = self.next_file().await?;
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
self.safe_remove_file(&path).await?;
@ -141,23 +138,26 @@ impl Store for FileStore {
}
impl FileStore {
pub fn build(root_dir: PathBuf, db: &sled::Db) -> Result<Self, FileError> {
let settings_tree = db.open_tree("settings")?;
let path_gen = init_generator(&settings_tree)?;
pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result<Self, FileError> {
let path_gen = init_generator(&repo).await?;
Ok(FileStore {
root_dir,
path_gen,
settings_tree,
repo,
})
}
fn next_directory(&self) -> Result<PathBuf, FileError> {
async fn next_directory(&self) -> Result<PathBuf, FileError> {
let path = self.path_gen.next();
self.settings_tree
.insert(GENERATOR_KEY, path.to_be_bytes())?;
match self.repo {
Repo::Sled(ref sled_repo) => {
sled_repo
.set(GENERATOR_KEY, path.to_be_bytes().into())
.await?;
}
}
let mut target_path = self.root_dir.clone();
for dir in path.to_strings() {
@ -167,8 +167,9 @@ impl FileStore {
Ok(target_path)
}
fn next_file(&self, filename: &str) -> Result<PathBuf, FileError> {
let target_path = self.next_directory()?;
async fn next_file(&self) -> Result<PathBuf, FileError> {
let target_path = self.next_directory().await?;
let filename = uuid::Uuid::new_v4().to_string();
Ok(target_path.join(filename))
}
@ -289,8 +290,10 @@ pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), Fi
Ok(())
}
fn init_generator(settings: &sled::Tree) -> Result<Generator, FileError> {
if let Some(ivec) = settings.get(GENERATOR_KEY)? {
async fn init_generator(repo: &Repo) -> Result<Generator, FileError> {
match repo {
Repo::Sled(sled_repo) => {
if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? {
Ok(Generator::from_existing(
storage_path_generator::Path::from_be_bytes(ivec.to_vec())?,
))
@ -298,6 +301,8 @@ fn init_generator(settings: &sled::Tree) -> Result<Generator, FileError> {
Ok(Generator::new())
}
}
}
}
impl std::fmt::Debug for FileStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {

View File

@ -1,118 +0,0 @@
use crate::{
error::{Error, UploadError},
store::file_store::FileStore,
upload_manager::UploadManager,
};
use std::path::{Path, PathBuf};
const RESTRUCTURE_COMPLETE: &[u8] = b"fs-restructure-01-complete";
const DETAILS: &[u8] = b"details";
impl UploadManager {
#[tracing::instrument(skip(self))]
pub(crate) async fn restructure(&self, store: &FileStore) -> Result<(), Error> {
if self.restructure_complete(store)? {
return Ok(());
}
for res in self.inner().filename_tree.iter() {
let (filename, hash) = res?;
let filename = String::from_utf8(filename.to_vec())?;
tracing::info!("Migrating {}", filename);
let file_path = store.root_dir.join("files").join(&filename);
if tokio::fs::metadata(&file_path).await.is_ok() {
let target_path = store.next_directory()?.join(&filename);
let target_path_bytes = self
.generalize_path(store, &target_path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
self.inner()
.identifier_tree
.insert(filename.as_bytes(), target_path_bytes)?;
store.safe_move_file(file_path, target_path).await?;
}
let (start, end) = variant_key_bounds(&hash);
for res in self.inner().main_tree.range(start..end) {
let (hash_variant_key, variant_path_or_details) = res?;
if !hash_variant_key.ends_with(DETAILS) {
let variant_path =
PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?);
if tokio::fs::metadata(&variant_path).await.is_ok() {
let target_path = store.next_directory()?.join(&filename);
let relative_target_path_bytes = self
.generalize_path(store, &target_path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
let variant_key =
self.migrate_variant_key(store, &variant_path, &filename)?;
self.inner()
.identifier_tree
.insert(variant_key, relative_target_path_bytes)?;
store
.safe_move_file(variant_path.clone(), target_path)
.await?;
store.try_remove_parents(&variant_path).await;
}
}
self.inner().main_tree.remove(hash_variant_key)?;
}
}
self.mark_restructure_complete(store)?;
Ok(())
}
fn restructure_complete(&self, store: &FileStore) -> Result<bool, Error> {
Ok(store.settings_tree.get(RESTRUCTURE_COMPLETE)?.is_some())
}
fn mark_restructure_complete(&self, store: &FileStore) -> Result<(), Error> {
store.settings_tree.insert(RESTRUCTURE_COMPLETE, b"true")?;
Ok(())
}
fn generalize_path<'a>(&self, store: &FileStore, path: &'a Path) -> Result<&'a Path, Error> {
Ok(path.strip_prefix(&store.root_dir)?)
}
fn migrate_variant_key(
&self,
store: &FileStore,
variant_process_path: &Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path = self
.generalize_path(store, variant_process_path)?
.strip_prefix("files")?;
self.variant_key(path, filename)
}
}
pub(crate) fn variant_key_bounds(hash: &[u8]) -> (Vec<u8>, Vec<u8>) {
let mut start = hash.to_vec();
start.extend(&[2]);
let mut end = hash.to_vec();
end.extend(&[3]);
(start, end)
}

View File

@ -1,4 +1,7 @@
use crate::store::Store;
use crate::{
repo::{Repo, SettingsRepo},
store::Store,
};
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use s3::{
@ -22,26 +25,26 @@ const GENERATOR_KEY: &[u8] = b"last-path";
#[derive(Debug, thiserror::Error)]
pub(crate) enum ObjectError {
#[error(transparent)]
#[error("Failed to generate path")]
PathGenerator(#[from] storage_path_generator::PathError),
#[error(transparent)]
Sled(#[from] sled::Error),
#[error("Failed to interact with sled repo")]
Sled(#[from] crate::repo::sled::Error),
#[error(transparent)]
#[error("Failed to parse string")]
Utf8(#[from] FromUtf8Error),
#[error("Invalid length")]
Length,
#[error("Storage error: {0}")]
#[error("Storage error")]
Anyhow(#[from] anyhow::Error),
}
#[derive(Clone)]
pub(crate) struct ObjectStore {
path_gen: Generator,
settings_tree: sled::Tree,
repo: Repo,
bucket: Bucket,
client: reqwest::Client,
}
@ -63,12 +66,11 @@ impl Store for ObjectStore {
async fn save_async_read<Reader>(
&self,
reader: &mut Reader,
filename: &str,
) -> Result<Self::Identifier, Self::Error>
where
Reader: AsyncRead + Unpin,
{
let path = self.next_file(filename)?;
let path = self.next_file().await?;
self.bucket
.put_object_stream(&self.client, reader, &path)
@ -78,12 +80,8 @@ impl Store for ObjectStore {
}
#[tracing::instrument(skip(bytes))]
async fn save_bytes(
&self,
bytes: Bytes,
filename: &str,
) -> Result<Self::Identifier, Self::Error> {
let path = self.next_file(filename)?;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error> {
let path = self.next_file().await?;
self.bucket.put_object(&self.client, &path, &bytes).await?;
@ -154,23 +152,21 @@ impl Store for ObjectStore {
impl ObjectStore {
#[allow(clippy::too_many_arguments)]
pub(crate) fn build(
pub(crate) async fn build(
bucket_name: &str,
region: Region,
access_key: Option<String>,
secret_key: Option<String>,
security_token: Option<String>,
session_token: Option<String>,
db: &sled::Db,
repo: Repo,
client: reqwest::Client,
) -> Result<ObjectStore, ObjectError> {
let settings_tree = db.open_tree("settings")?;
let path_gen = init_generator(&settings_tree)?;
let path_gen = init_generator(&repo).await?;
Ok(ObjectStore {
path_gen,
settings_tree,
repo,
bucket: Bucket::new_with_path_style(
bucket_name,
match region {
@ -191,24 +187,32 @@ impl ObjectStore {
})
}
fn next_directory(&self) -> Result<Path, ObjectError> {
async fn next_directory(&self) -> Result<Path, ObjectError> {
let path = self.path_gen.next();
self.settings_tree
.insert(GENERATOR_KEY, path.to_be_bytes())?;
match self.repo {
Repo::Sled(ref sled_repo) => {
sled_repo
.set(GENERATOR_KEY, path.to_be_bytes().into())
.await?;
}
}
Ok(path)
}
fn next_file(&self, filename: &str) -> Result<String, ObjectError> {
let path = self.next_directory()?.to_strings().join("/");
async fn next_file(&self) -> Result<String, ObjectError> {
let path = self.next_directory().await?.to_strings().join("/");
let filename = uuid::Uuid::new_v4().to_string();
Ok(format!("{}/{}", path, filename))
}
}
fn init_generator(settings: &sled::Tree) -> Result<Generator, ObjectError> {
if let Some(ivec) = settings.get(GENERATOR_KEY)? {
async fn init_generator(repo: &Repo) -> Result<Generator, ObjectError> {
match repo {
Repo::Sled(sled_repo) => {
if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? {
Ok(Generator::from_existing(
storage_path_generator::Path::from_be_bytes(ivec.to_vec())?,
))
@ -216,6 +220,8 @@ fn init_generator(settings: &sled::Tree) -> Result<Generator, ObjectError> {
Ok(Generator::new())
}
}
}
}
fn io_error<S, T, E>(stream: S) -> impl Stream<Item = std::io::Result<T>>
where

View File

@ -4,13 +4,15 @@ use crate::{
error::{Error, UploadError},
ffmpeg::{InputFormat, ThumbnailFormat},
magick::details_hint,
migrate::{alias_id_key, alias_key, alias_key_bounds},
repo::{
sled::SledRepo, Alias, AliasRepo, DeleteToken, HashRepo, IdentifierRepo, Repo, SettingsRepo,
},
store::{Identifier, Store},
};
use actix_web::web;
use futures_util::StreamExt;
use sha2::Digest;
use std::{string::FromUtf8Error, sync::Arc};
use tracing::{debug, error, info, instrument, warn, Span};
use std::sync::Arc;
use tracing::{debug, error, instrument, Span};
use tracing_futures::Instrument;
mod hasher;
@ -18,28 +20,6 @@ mod session;
pub(super) use session::UploadManagerSession;
// TREE STRUCTURE
// - Alias Tree
// - alias -> hash
// - alias / id -> u64(id)
// - alias / delete -> delete token
// - Main Tree
// - hash -> filename
// - hash 0 u64(id) -> alias
// - DEPRECATED:
// - hash 2 variant path -> variant path
// - hash 2 vairant path details -> details
// - Filename Tree
// - filename -> hash
// - Details Tree
// - filename / S::Identifier -> details
// - Identifier Tree
// - filename -> S::Identifier
// - filename / variant path -> S::Identifier
// - filename / motion -> S::Identifier
// - Settings Tree
// - store-migration-progress -> Path Tree Key
const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress";
#[derive(Clone)]
@ -50,33 +30,17 @@ pub(crate) struct UploadManager {
pub(crate) struct UploadManagerInner {
format: Option<Format>,
hasher: sha2::Sha256,
pub(crate) alias_tree: sled::Tree,
pub(crate) filename_tree: sled::Tree,
pub(crate) main_tree: sled::Tree,
details_tree: sled::Tree,
settings_tree: sled::Tree,
pub(crate) identifier_tree: sled::Tree,
db: sled::Db,
}
struct FilenameIVec {
inner: sled::IVec,
repo: Repo,
}
impl UploadManager {
/// Create a new UploadManager
pub(crate) async fn new(db: sled::Db, format: Option<Format>) -> Result<Self, Error> {
pub(crate) async fn new(repo: Repo, format: Option<Format>) -> Result<Self, Error> {
let manager = UploadManager {
inner: Arc::new(UploadManagerInner {
format,
hasher: sha2::Sha256::new(),
alias_tree: db.open_tree("alias")?,
filename_tree: db.open_tree("filename")?,
main_tree: db.open_tree("main")?,
details_tree: db.open_tree("details")?,
settings_tree: db.open_tree("settings")?,
identifier_tree: db.open_tree("path")?,
db,
repo,
}),
};
@ -89,84 +53,24 @@ impl UploadManager {
S2: Store,
Error: From<S1::Error> + From<S2::Error>,
{
let iter =
if let Some(starting_line) = self.inner.settings_tree.get(STORE_MIGRATION_PROGRESS)? {
self.inner.identifier_tree.range(starting_line..)
} else {
self.inner.identifier_tree.iter()
};
for res in iter {
let (key, identifier) = res?;
let identifier = S1::Identifier::from_bytes(identifier.to_vec())?;
let filename =
if let Some((filename, _)) = String::from_utf8_lossy(&key).split_once('/') {
filename.to_string()
} else {
String::from_utf8_lossy(&key).to_string()
};
let stream = from.to_stream(&identifier, None, None).await?;
futures_util::pin_mut!(stream);
let mut reader = tokio_util::io::StreamReader::new(stream);
let new_identifier = to.save_async_read(&mut reader, &filename).await?;
let details_key = self.details_key(&identifier, &filename)?;
if let Some(details) = self.inner.details_tree.get(details_key.clone())? {
let new_details_key = self.details_key(&new_identifier, &filename)?;
self.inner.details_tree.insert(new_details_key, details)?;
match self.inner.repo {
Repo::Sled(ref sled_repo) => do_migrate_store(sled_repo, from, to).await,
}
}
self.inner
.identifier_tree
.insert(key.clone(), new_identifier.to_bytes()?)?;
self.inner.details_tree.remove(details_key)?;
self.inner
.settings_tree
.insert(STORE_MIGRATION_PROGRESS, key)?;
let (ident, detail, settings) = futures_util::future::join3(
self.inner.identifier_tree.flush_async(),
self.inner.details_tree.flush_async(),
self.inner.settings_tree.flush_async(),
)
.await;
ident?;
detail?;
settings?;
}
// clean up the migration key to avoid interfering with future migrations
self.inner.settings_tree.remove(STORE_MIGRATION_PROGRESS)?;
self.inner.settings_tree.flush_async().await?;
Ok(())
}
pub(crate) fn inner(&self) -> &UploadManagerInner {
&self.inner
}
pub(crate) async fn still_identifier_from_filename<S: Store + Clone>(
pub(crate) async fn still_identifier_from_alias<S: Store + Clone>(
&self,
store: S,
filename: String,
alias: &Alias,
) -> Result<S::Identifier, Error>
where
Error: From<S::Error>,
{
let identifier = self.identifier_from_filename::<S>(filename.clone()).await?;
let details =
if let Some(details) = self.variant_details(&identifier, filename.clone()).await? {
let identifier = self.identifier_from_alias::<S>(alias).await?;
let details = if let Some(details) = self.details(&identifier).await? {
details
} else {
let hint = details_hint(&filename);
let hint = details_hint(alias);
Details::from_store(store.clone(), identifier.clone(), hint).await?
};
@ -174,7 +78,7 @@ impl UploadManager {
return Ok(identifier);
}
if let Some(motion_identifier) = self.motion_identifier::<S>(&filename).await? {
if let Some(motion_identifier) = self.motion_identifier::<S>(alias).await? {
return Ok(motion_identifier);
}
@ -186,101 +90,93 @@ impl UploadManager {
ThumbnailFormat::Jpeg,
)
.await?;
let motion_identifier = store.save_async_read(&mut reader, &filename).await?;
let motion_identifier = store.save_async_read(&mut reader).await?;
drop(permit);
self.store_motion_path(&filename, &motion_identifier)
self.store_motion_identifier(alias, &motion_identifier)
.await?;
Ok(motion_identifier)
}
async fn motion_identifier<S: Store>(
&self,
filename: &str,
) -> Result<Option<S::Identifier>, Error>
where
Error: From<S::Error>,
{
let identifier_tree = self.inner.identifier_tree.clone();
let motion_key = format!("{}/motion", filename);
let opt = web::block(move || identifier_tree.get(motion_key.as_bytes())).await??;
if let Some(ivec) = opt {
return Ok(Some(S::Identifier::from_bytes(ivec.to_vec())?));
alias: &Alias,
) -> Result<Option<S::Identifier>, Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.motion_identifier(hash).await?)
}
}
}
Ok(None)
}
async fn store_motion_path<I: Identifier>(
async fn store_motion_identifier<I: Identifier + 'static>(
&self,
filename: &str,
alias: &Alias,
identifier: &I,
) -> Result<(), Error>
where
Error: From<I::Error>,
{
let identifier_bytes = identifier.to_bytes()?;
let motion_key = format!("{}/motion", filename);
let identifier_tree = self.inner.identifier_tree.clone();
web::block(move || identifier_tree.insert(motion_key.as_bytes(), identifier_bytes))
.await??;
Ok(())
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.relate_motion_identifier(hash, identifier).await?)
}
}
}
#[instrument(skip(self))]
pub(crate) async fn identifier_from_filename<S: Store>(
pub(crate) async fn identifier_from_alias<S: Store>(
&self,
filename: String,
) -> Result<S::Identifier, Error>
where
Error: From<S::Error>,
{
let identifier_tree = self.inner.identifier_tree.clone();
let path_ivec = web::block(move || identifier_tree.get(filename.as_bytes()))
.await??
.ok_or(UploadError::MissingFile)?;
let identifier = S::Identifier::from_bytes(path_ivec.to_vec())?;
Ok(identifier)
alias: &Alias,
) -> Result<S::Identifier, Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.identifier(hash).await?)
}
}
}
#[instrument(skip(self))]
async fn store_identifier<I: Identifier>(
&self,
filename: String,
hash: Vec<u8>,
identifier: &I,
) -> Result<(), Error>
where
Error: From<I::Error>,
{
let identifier_bytes = identifier.to_bytes()?;
let identifier_tree = self.inner.identifier_tree.clone();
web::block(move || identifier_tree.insert(filename.as_bytes(), identifier_bytes)).await??;
Ok(())
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
Ok(sled_repo.relate_identifier(hash.into(), identifier).await?)
}
}
}
#[instrument(skip(self))]
pub(crate) async fn variant_identifier<S: Store>(
&self,
alias: &Alias,
process_path: &std::path::Path,
filename: &str,
) -> Result<Option<S::Identifier>, Error>
where
Error: From<S::Error>,
{
let key = self.variant_key(process_path, filename)?;
let identifier_tree = self.inner.identifier_tree.clone();
let path_opt = web::block(move || identifier_tree.get(key)).await??;
) -> Result<Option<S::Identifier>, Error> {
let variant = process_path.to_string_lossy().to_string();
if let Some(ivec) = path_opt {
let identifier = S::Identifier::from_bytes(ivec.to_vec())?;
Ok(Some(identifier))
} else {
Ok(None)
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.variant_identifier(hash, variant).await?)
}
}
}
/// Store the path to a generated image variant so we can easily clean it up later
#[instrument(skip(self))]
pub(crate) async fn store_full_res<I: Identifier>(
&self,
alias: &Alias,
identifier: &I,
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.relate_identifier(hash, identifier).await?)
}
}
}
@ -288,139 +184,71 @@ impl UploadManager {
#[instrument(skip(self))]
pub(crate) async fn store_variant<I: Identifier>(
&self,
variant_process_path: Option<&std::path::Path>,
alias: &Alias,
variant_process_path: &std::path::Path,
identifier: &I,
filename: &str,
) -> Result<(), Error>
where
Error: From<I::Error>,
{
let key = if let Some(path) = variant_process_path {
self.variant_key(path, filename)?
} else {
let mut vec = filename.as_bytes().to_vec();
vec.extend(b"/");
vec.extend(&identifier.to_bytes()?);
vec
};
let identifier_tree = self.inner.identifier_tree.clone();
let identifier_bytes = identifier.to_bytes()?;
) -> Result<(), Error> {
let variant = variant_process_path.to_string_lossy().to_string();
debug!("Storing variant");
web::block(move || identifier_tree.insert(key, identifier_bytes)).await??;
debug!("Stored variant");
Ok(())
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo
.relate_variant_identifier(hash, variant, identifier)
.await?)
}
}
}
/// Get the image details for a given variant
#[instrument(skip(self))]
pub(crate) async fn variant_details<I: Identifier>(
pub(crate) async fn details<I: Identifier>(
&self,
identifier: &I,
filename: String,
) -> Result<Option<Details>, Error>
where
Error: From<I::Error>,
{
let key = self.details_key(identifier, &filename)?;
let details_tree = self.inner.details_tree.clone();
debug!("Getting details");
let opt = match web::block(move || details_tree.get(key)).await?? {
Some(ivec) => match serde_json::from_slice(&ivec) {
Ok(details) => Some(details),
Err(_) => None,
},
None => None,
};
debug!("Got details");
Ok(opt)
match self.inner.repo {
Repo::Sled(ref sled_repo) => Ok(sled_repo.details(identifier).await?),
}
}
#[instrument(skip(self))]
pub(crate) async fn store_variant_details<I: Identifier>(
pub(crate) async fn store_details<I: Identifier>(
&self,
identifier: &I,
filename: String,
details: &Details,
) -> Result<(), Error>
where
Error: From<I::Error>,
{
let key = self.details_key(identifier, &filename)?;
let details_tree = self.inner.details_tree.clone();
let details_value = serde_json::to_vec(details)?;
debug!("Storing details");
web::block(move || details_tree.insert(key, details_value)).await??;
debug!("Stored details");
Ok(())
) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => Ok(sled_repo.relate_details(identifier, details).await?),
}
/// Get a list of aliases for a given file
pub(crate) async fn aliases_by_filename(&self, filename: String) -> Result<Vec<String>, Error> {
let fname_tree = self.inner.filename_tree.clone();
let hash = web::block(move || fname_tree.get(filename.as_bytes()))
.await??
.ok_or(UploadError::MissingAlias)?;
self.aliases_by_hash(&hash).await
}
/// Get a list of aliases for a given alias
pub(crate) async fn aliases_by_alias(&self, alias: String) -> Result<Vec<String>, Error> {
let alias_tree = self.inner.alias_tree.clone();
let hash = web::block(move || alias_tree.get(alias.as_bytes()))
.await??
.ok_or(UploadError::MissingFilename)?;
self.aliases_by_hash(&hash).await
pub(crate) async fn aliases_by_alias(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash = sled_repo.hash(alias).await?;
Ok(sled_repo.aliases(hash).await?)
}
async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result<Vec<String>, Error> {
let (start, end) = alias_key_bounds(hash);
let main_tree = self.inner.main_tree.clone();
let aliases = web::block(move || {
main_tree
.range(start..end)
.values()
.collect::<Result<Vec<_>, _>>()
})
.await??;
debug!("Got {} aliases for hash", aliases.len());
let aliases = aliases
.into_iter()
.filter_map(|s| String::from_utf8(s.to_vec()).ok())
.collect::<Vec<_>>();
for alias in aliases.iter() {
debug!("{}", alias);
}
Ok(aliases)
}
/// Delete an alias without a delete token
pub(crate) async fn delete_without_token<S: Store + 'static>(
&self,
store: S,
alias: String,
alias: Alias,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
let token_key = delete_key(&alias);
let alias_tree = self.inner.alias_tree.clone();
let token = web::block(move || alias_tree.get(token_key.as_bytes()))
.await??
.ok_or(UploadError::MissingAlias)?;
let token = match self.inner.repo {
Repo::Sled(ref sled_repo) => sled_repo.delete_token(&alias).await?,
};
self.delete(store, alias, String::from_utf8(token.to_vec())?)
.await
self.delete(store, alias, token).await
}
/// Delete the alias, and the file & variants if no more aliases exist
@ -428,57 +256,24 @@ impl UploadManager {
pub(crate) async fn delete<S: Store + 'static>(
&self,
store: S,
alias: String,
token: String,
alias: Alias,
token: DeleteToken,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
use sled::Transactional;
let main_tree = self.inner.main_tree.clone();
let alias_tree = self.inner.alias_tree.clone();
let span = Span::current();
let alias2 = alias.clone();
let hash = web::block(move || {
[&main_tree, &alias_tree].transaction(|v| {
let entered = span.enter();
let main_tree = &v[0];
let alias_tree = &v[1];
// -- GET TOKEN --
debug!("Deleting alias -> delete-token mapping");
let existing_token = alias_tree
.remove(delete_key(&alias2).as_bytes())?
.ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?;
// Bail if invalid token
if existing_token != token {
warn!("Invalid delete token");
return Err(trans_upload_error(UploadError::InvalidToken));
let hash = match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let saved_delete_token = sled_repo.delete_token(&alias).await?;
if saved_delete_token != token {
return Err(UploadError::InvalidToken.into());
}
// -- GET ID FOR HASH TREE CLEANUP --
debug!("Deleting alias -> id mapping");
let id = alias_tree
.remove(alias_id_key(&alias2).as_bytes())?
.ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?;
let id = String::from_utf8(id.to_vec()).map_err(trans_utf8_error)?;
// -- GET HASH FOR HASH TREE CLEANUP --
debug!("Deleting alias -> hash mapping");
let hash = alias_tree
.remove(alias2.as_bytes())?
.ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?;
// -- REMOVE HASH TREE ELEMENT --
debug!("Deleting hash -> alias mapping");
main_tree.remove(alias_key(&hash, &id))?;
drop(entered);
Ok(hash)
})
})
.await??;
let hash = sled_repo.hash(&alias).await?;
AliasRepo::cleanup(sled_repo, &alias).await?;
sled_repo.remove_alias(hash.clone(), &alias).await?;
hash.to_vec()
}
};
self.check_delete_files(store, hash).await
}
@ -486,80 +281,73 @@ impl UploadManager {
async fn check_delete_files<S: Store + 'static>(
&self,
store: S,
hash: sled::IVec,
hash: Vec<u8>,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
// -- CHECK IF ANY OTHER ALIASES EXIST --
let main_tree = self.inner.main_tree.clone();
let (start, end) = alias_key_bounds(&hash);
debug!("Checking for additional aliases referencing hash");
let any_aliases = web::block(move || {
Ok(main_tree.range(start..end).next().is_some()) as Result<bool, Error>
})
.await??;
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash: <SledRepo as HashRepo>::Bytes = hash.into();
// Bail if there are existing aliases
if any_aliases {
debug!("Other aliases reference file, not removing from disk");
let aliases = sled_repo.aliases(hash.clone()).await?;
if !aliases.is_empty() {
return Ok(());
}
// -- DELETE HASH ENTRY --
let main_tree = self.inner.main_tree.clone();
let hash2 = hash.clone();
debug!("Deleting hash -> filename mapping");
let filename = web::block(move || main_tree.remove(&hash2))
.await??
.ok_or(UploadError::MissingFile)?;
let variant_idents = sled_repo
.variants::<S::Identifier>(hash.clone())
.await?
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
let main_ident = sled_repo.identifier(hash.clone()).await?;
let motion_ident = sled_repo.motion_identifier(hash.clone()).await?;
// -- DELETE FILES --
let this = self.clone();
let cleanup_span = tracing::info_span!(
parent: None,
"Cleanup",
filename = &tracing::field::display(String::from_utf8_lossy(&filename)),
);
let repo = sled_repo.clone();
HashRepo::cleanup(sled_repo, hash).await?;
let cleanup_span = tracing::info_span!("Cleaning files");
cleanup_span.follows_from(Span::current());
debug!("Spawning cleanup task");
actix_rt::spawn(
async move {
if let Err(e) = this
.cleanup_files(store, FilenameIVec::new(filename.clone()))
.await
let mut errors = Vec::new();
for identifier in variant_idents
.iter()
.chain(&[main_ident])
.chain(motion_ident.iter())
{
error!("Error removing files from fs, {}", e);
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(identifier).await {
let e: Error = e.into();
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(&repo, identifier).await {
let e: Error = e.into();
errors.push(e);
}
}
if !errors.is_empty() {
let span = tracing::error_span!("Error deleting files");
span.in_scope(|| {
for error in errors {
error!("{}", error);
}
});
}
info!(
"Files deleted for {:?}",
String::from_utf8(filename.to_vec())
);
}
.instrument(cleanup_span),
);
Ok(())
}
}
/// Fetch the real on-disk filename given an alias
#[instrument(skip(self))]
pub(crate) async fn from_alias(&self, alias: String) -> Result<String, Error> {
let tree = self.inner.alias_tree.clone();
debug!("Getting hash from alias");
let hash = web::block(move || tree.get(alias.as_bytes()))
.await??
.ok_or(UploadError::MissingAlias)?;
let main_tree = self.inner.main_tree.clone();
debug!("Getting filename from hash");
let filename = web::block(move || main_tree.get(hash))
.await??
.ok_or(UploadError::MissingFile)?;
let filename = String::from_utf8(filename.to_vec())?;
Ok(filename)
Ok(())
}
pub(crate) fn session<S: Store + Clone + 'static>(&self, store: S) -> UploadManagerSession<S>
@ -568,124 +356,88 @@ impl UploadManager {
{
UploadManagerSession::new(self.clone(), store)
}
}
// Find image variants and remove them from the DB and the disk
#[instrument(skip(self))]
async fn cleanup_files<S: Store>(&self, store: S, filename: FilenameIVec) -> Result<(), Error>
async fn migrate_file<S1, S2>(
from: &S1,
to: &S2,
identifier: &S1::Identifier,
) -> Result<S2::Identifier, Error>
where
Error: From<S::Error>,
S1: Store,
S2: Store,
Error: From<S1::Error> + From<S2::Error>,
{
let filename = filename.inner;
let stream = from.to_stream(identifier, None, None).await?;
futures_util::pin_mut!(stream);
let mut reader = tokio_util::io::StreamReader::new(stream);
let filename2 = filename.clone();
let identifier_tree = self.inner.identifier_tree.clone();
let identifier = web::block(move || identifier_tree.remove(filename2)).await??;
let new_identifier = to.save_async_read(&mut reader).await?;
let mut errors = Vec::new();
if let Some(identifier) = identifier {
let identifier = S::Identifier::from_bytes(identifier.to_vec())?;
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(&identifier).await {
errors.push(e);
}
Ok(new_identifier)
}
let filename2 = filename.clone();
let fname_tree = self.inner.filename_tree.clone();
debug!("Deleting filename -> hash mapping");
web::block(move || fname_tree.remove(filename2)).await??;
let path_prefix = filename.clone();
let identifier_tree = self.inner.identifier_tree.clone();
debug!("Fetching file variants");
let identifiers = web::block(move || {
identifier_tree
.scan_prefix(path_prefix)
.values()
.collect::<Result<Vec<sled::IVec>, sled::Error>>()
})
.await??;
debug!("{} files prepared for deletion", identifiers.len());
for id in identifiers {
let identifier = S::Identifier::from_bytes(id.to_vec())?;
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(&identifier).await {
errors.push(e);
}
async fn migrate_details<R, I1, I2>(repo: &R, from: I1, to: &I2) -> Result<(), Error>
where
R: IdentifierRepo,
I1: Identifier,
I2: Identifier,
Error: From<<R as IdentifierRepo>::Error>,
{
if let Some(details) = repo.details(&from).await? {
repo.relate_details(to, &details).await?;
repo.cleanup(&from).await?;
}
let path_prefix = filename.clone();
let identifier_tree = self.inner.identifier_tree.clone();
debug!("Deleting path info");
web::block(move || {
for res in identifier_tree.scan_prefix(path_prefix).keys() {
let key = res?;
identifier_tree.remove(key)?;
}
Ok(()) as Result<(), Error>
})
.await??;
for error in errors {
error!("Error deleting files, {}", error);
}
Ok(())
}
pub(crate) fn variant_key(
&self,
variant_process_path: &std::path::Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path_string = variant_process_path
.to_str()
.ok_or(UploadError::Path)?
.to_string();
let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec();
Ok(vec)
}
fn details_key<I: Identifier>(&self, identifier: &I, filename: &str) -> Result<Vec<u8>, Error>
async fn do_migrate_store<R, S1, S2>(repo: &R, from: S1, to: S2) -> Result<(), Error>
where
Error: From<I::Error>,
S1: Store,
S2: Store,
Error: From<S1::Error> + From<S2::Error>,
R: IdentifierRepo + HashRepo + SettingsRepo,
Error: From<<R as IdentifierRepo>::Error>,
Error: From<<R as HashRepo>::Error>,
Error: From<<R as SettingsRepo>::Error>,
{
let mut vec = filename.as_bytes().to_vec();
vec.extend(b"/");
vec.extend(&identifier.to_bytes()?);
let stream = repo.hashes().await;
let mut stream = Box::pin(stream);
Ok(vec)
}
}
impl FilenameIVec {
fn new(inner: sled::IVec) -> Self {
FilenameIVec { inner }
}
}
fn trans_upload_error(
upload_error: UploadError,
) -> sled::transaction::ConflictableTransactionError<Error> {
trans_err(upload_error)
}
fn trans_utf8_error(e: FromUtf8Error) -> sled::transaction::ConflictableTransactionError<Error> {
trans_err(e)
}
fn trans_err<E>(e: E) -> sled::transaction::ConflictableTransactionError<Error>
where
Error: From<E>,
while let Some(hash) = stream.next().await {
let hash = hash?;
if let Some(identifier) = repo
.motion_identifier(hash.as_ref().to_vec().into())
.await?
{
sled::transaction::ConflictableTransactionError::Abort(e.into())
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
}
fn delete_key(alias: &str) -> String {
format!("{}/delete", alias)
for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? {
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier)
.await?;
}
let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?;
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into())
.await?;
}
// clean up the migration key to avoid interfering with future migrations
repo.remove(STORE_MIGRATION_PROGRESS).await?;
Ok(())
}
impl std::fmt::Debug for UploadManager {
@ -693,9 +445,3 @@ impl std::fmt::Debug for UploadManager {
f.debug_struct("UploadManager").finish()
}
}
impl std::fmt::Debug for FilenameIVec {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", String::from_utf8(self.inner.to_vec()))
}
}

View File

@ -1,19 +1,17 @@
use crate::{
error::{Error, UploadError},
magick::ValidInputType,
migrate::{alias_id_key, alias_key},
repo::{Alias, AliasRepo, AlreadyExists, DeleteToken, HashRepo, IdentifierRepo, Repo},
store::Store,
upload_manager::{
delete_key,
hasher::{Hash, Hasher},
UploadManager,
},
};
use actix_web::web;
use futures_util::stream::{Stream, StreamExt};
use tracing::{debug, instrument, warn, Span};
use tracing::{debug, instrument, Span};
use tracing_futures::Instrument;
use uuid::Uuid;
pub(crate) struct UploadManagerSession<S: Store + Clone + 'static>
where
@ -21,7 +19,7 @@ where
{
store: S,
manager: UploadManager,
alias: Option<String>,
alias: Option<Alias>,
finished: bool,
}
@ -42,19 +40,8 @@ where
self.finished = true;
}
pub(crate) fn alias(&self) -> Option<&str> {
self.alias.as_deref()
}
}
enum Dup {
Exists,
New,
}
impl Dup {
fn exists(&self) -> bool {
matches!(self, Dup::Exists)
pub(crate) fn alias(&self) -> Option<&Alias> {
self.alias.as_ref()
}
}
@ -79,20 +66,23 @@ where
actix_rt::spawn(
async move {
// undo alias -> hash mapping
debug!("Remove alias -> hash mapping");
if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) {
// undo alias -> id mapping
debug!("Remove alias -> id mapping");
let key = alias_id_key(&alias);
if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) {
// undo hash/id -> alias mapping
debug!("Remove hash/id -> alias mapping");
let id = String::from_utf8_lossy(&id);
let key = alias_key(&hash, &id);
let _ = manager.inner.main_tree.remove(&key);
}
match manager.inner.repo {
Repo::Sled(ref sled_repo) => {
if let Ok(hash) = sled_repo.hash(&alias).await {
debug!("Clean alias repo");
let _ = AliasRepo::cleanup(sled_repo, &alias).await;
let _ = manager.check_delete_files(store, hash).await;
if let Ok(identifier) = sled_repo.identifier(hash.clone()).await {
debug!("Clean identifier repo");
let _ = IdentifierRepo::cleanup(sled_repo, &identifier).await;
debug!("Remove stored files");
let _ = store.remove(&identifier).await;
}
debug!("Clean hash repo");
let _ = HashRepo::cleanup(sled_repo, hash).await;
}
}
}
}
.instrument(cleanup_span),
@ -107,42 +97,30 @@ where
{
/// Generate a delete token for an alias
#[instrument(skip(self))]
pub(crate) async fn delete_token(&self) -> Result<String, Error> {
pub(crate) async fn delete_token(&self) -> Result<DeleteToken, Error> {
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
debug!("Generating delete token");
let s: String = Uuid::new_v4().to_string();
let delete_token = s.clone();
let delete_token = DeleteToken::generate();
debug!("Saving delete token");
let alias_tree = self.manager.inner.alias_tree.clone();
let key = delete_key(&alias);
let res = web::block(move || {
alias_tree.compare_and_swap(
key.as_bytes(),
None as Option<sled::IVec>,
Some(s.as_bytes()),
)
match self.manager.inner.repo {
Repo::Sled(ref sled_repo) => {
let res = sled_repo.relate_delete_token(&alias, &delete_token).await?;
Ok(if res.is_err() {
let delete_token = sled_repo.delete_token(&alias).await?;
debug!("Returning existing delete token, {:?}", delete_token);
delete_token
} else {
debug!("Returning new delete token, {:?}", delete_token);
delete_token
})
.await??;
if let Err(sled::CompareAndSwapError {
current: Some(ivec),
..
}) = res
{
let s = String::from_utf8(ivec.to_vec())?;
debug!("Returning existing delete token, {}", s);
return Ok(s);
}
}
}
debug!("Returning new delete token, {}", delete_token);
Ok(delete_token)
}
/// Upload the file while preserving the filename, optionally validating the uploaded image
#[instrument(skip(self, stream))]
/// Import the file, discarding bytes if it's already present, or saving if it's new
pub(crate) async fn import(
mut self,
alias: String,
@ -158,7 +136,7 @@ where
}
debug!("Validating bytes");
let (content_type, validated_reader) = crate::validate::validate_image_bytes(
let (_, validated_reader) = crate::validate::validate_image_bytes(
bytes_mut.freeze(),
self.manager.inner.format,
validate,
@ -167,20 +145,14 @@ where
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let filename = self.next_file(content_type).await?;
let identifier = self
.store
.save_async_read(&mut hasher_reader, &filename)
.await?;
let identifier = self.store.save_async_read(&mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Storing alias");
self.alias = Some(alias.clone());
self.add_existing_alias(&hash, &alias).await?;
debug!("Adding alias");
self.add_existing_alias(&hash, alias).await?;
debug!("Saving file");
self.save_upload(&identifier, hash, filename).await?;
self.save_upload(&identifier, hash).await?;
// Return alias to file
Ok(self)
@ -210,106 +182,65 @@ where
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let filename = self.next_file(input_type).await?;
let identifier = self
.store
.save_async_read(&mut hasher_reader, &filename)
.await?;
let identifier = self.store.save_async_read(&mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Adding alias");
self.add_alias(&hash, input_type).await?;
debug!("Saving file");
self.save_upload(&identifier, hash, filename).await?;
self.save_upload(&identifier, hash).await?;
// Return alias to file
Ok(self)
}
// check duplicates & store image if new
async fn save_upload(
&self,
identifier: &S::Identifier,
hash: Hash,
filename: String,
) -> Result<(), Error> {
let dup = self.check_duplicate(hash, filename.clone()).await?;
#[instrument(skip(self, hash))]
async fn save_upload(&self, identifier: &S::Identifier, hash: Hash) -> Result<(), Error> {
let res = self.check_duplicate(&hash).await?;
// bail early with alias to existing file if this is a duplicate
if dup.exists() {
if res.is_err() {
debug!("Duplicate exists, removing file");
self.store.remove(identifier).await?;
return Ok(());
}
self.manager.store_identifier(filename, identifier).await?;
self.manager
.store_identifier(hash.into_inner(), identifier)
.await?;
Ok(())
}
// check for an already-uploaded image with this hash, returning the path to the target file
#[instrument(skip(self, hash))]
async fn check_duplicate(&self, hash: Hash, filename: String) -> Result<Dup, Error> {
let main_tree = self.manager.inner.main_tree.clone();
async fn check_duplicate(&self, hash: &Hash) -> Result<Result<(), AlreadyExists>, Error> {
let hash = hash.as_slice().to_vec();
let filename2 = filename.clone();
let hash2 = hash.as_slice().to_vec();
debug!("Inserting filename for hash");
let res = web::block(move || {
main_tree.compare_and_swap(
hash2,
None as Option<sled::IVec>,
Some(filename2.as_bytes()),
)
})
.await??;
if let Err(sled::CompareAndSwapError {
current: Some(ivec),
..
}) = res
{
let name = String::from_utf8(ivec.to_vec())?;
debug!("Filename exists for hash, {}", name);
return Ok(Dup::Exists);
}
let fname_tree = self.manager.inner.filename_tree.clone();
debug!("Saving filename -> hash relation");
web::block(move || fname_tree.insert(filename, hash.into_inner())).await??;
Ok(Dup::New)
}
// generate a short filename that isn't already in-use
#[instrument(skip(self, input_type))]
async fn next_file(&self, input_type: ValidInputType) -> Result<String, Error> {
loop {
debug!("Filename generation loop");
let filename = file_name(Uuid::new_v4(), input_type);
let identifier_tree = self.manager.inner.identifier_tree.clone();
let filename2 = filename.clone();
let filename_exists = web::block(move || identifier_tree.get(filename2.as_bytes()))
.await??
.is_some();
if !filename_exists {
return Ok(filename);
}
debug!("Filename exists, trying again");
match self.manager.inner.repo {
Repo::Sled(ref sled_repo) => Ok(HashRepo::create(sled_repo, hash.into()).await?),
}
}
#[instrument(skip(self, hash, alias))]
async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), Error> {
self.save_alias_hash_mapping(hash, alias).await??;
// Add an alias from an existing filename
async fn add_existing_alias(&mut self, hash: &Hash, filename: String) -> Result<(), Error> {
let alias = Alias::from_existing(&filename);
self.store_hash_id_alias_mapping(hash, alias).await?;
match self.manager.inner.repo {
Repo::Sled(ref sled_repo) => {
AliasRepo::create(sled_repo, &alias)
.await?
.map_err(|_| UploadError::DuplicateAlias)?;
self.alias = Some(alias.clone());
let hash = hash.as_slice().to_vec();
sled_repo.relate_hash(&alias, hash.clone().into()).await?;
sled_repo.relate_alias(hash.into(), &alias).await?;
}
}
Ok(())
}
@ -319,96 +250,25 @@ where
// This will help if multiple 'users' upload the same file, and one of them wants to delete it
#[instrument(skip(self, hash, input_type))]
async fn add_alias(&mut self, hash: &Hash, input_type: ValidInputType) -> Result<(), Error> {
let alias = self.next_alias(hash, input_type).await?;
self.store_hash_id_alias_mapping(hash, &alias).await?;
Ok(())
}
// Add a pre-defined alias to an existin file
//
// DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files
#[instrument(skip(self, hash))]
async fn store_hash_id_alias_mapping(&self, hash: &Hash, alias: &str) -> Result<(), Error> {
let alias = alias.to_string();
loop {
debug!("hash -> alias save loop");
let db = self.manager.inner.db.clone();
let id = web::block(move || db.generate_id()).await??.to_string();
let alias_tree = self.manager.inner.alias_tree.clone();
let key = alias_id_key(&alias);
let id2 = id.clone();
debug!("Saving alias -> id mapping");
web::block(move || alias_tree.insert(key.as_bytes(), id2.as_bytes())).await??;
let key = alias_key(hash.as_slice(), &id);
let main_tree = self.manager.inner.main_tree.clone();
let alias2 = alias.clone();
debug!("Saving hash/id -> alias mapping");
let res = web::block(move || {
main_tree.compare_and_swap(key, None as Option<sled::IVec>, Some(alias2.as_bytes()))
})
.await??;
if res.is_ok() {
break;
}
debug!("Id exists, trying again");
}
Ok(())
}
// Generate an alias to the file
#[instrument(skip(self, hash, input_type))]
async fn next_alias(
&mut self,
hash: &Hash,
input_type: ValidInputType,
) -> Result<String, Error> {
loop {
debug!("Alias gen loop");
let alias = file_name(Uuid::new_v4(), input_type);
self.alias = Some(alias.clone());
let alias = Alias::generate(input_type.as_ext().to_string());
let res = self.save_alias_hash_mapping(hash, &alias).await?;
match self.manager.inner.repo {
Repo::Sled(ref sled_repo) => {
let res = AliasRepo::create(sled_repo, &alias).await?;
if res.is_ok() {
return Ok(alias);
self.alias = Some(alias.clone());
let hash = hash.as_slice().to_vec();
sled_repo.relate_hash(&alias, hash.clone().into()).await?;
sled_repo.relate_alias(hash.into(), &alias).await?;
return Ok(());
}
}
};
debug!("Alias exists, regenning");
}
}
// Save an alias to the database
#[instrument(skip(self, hash))]
async fn save_alias_hash_mapping(
&self,
hash: &Hash,
alias: &str,
) -> Result<Result<(), Error>, Error> {
let tree = self.manager.inner.alias_tree.clone();
let vec = hash.as_slice().to_vec();
let alias = alias.to_string();
debug!("Saving alias -> hash mapping");
let res = web::block(move || {
tree.compare_and_swap(alias.as_bytes(), None as Option<sled::IVec>, Some(vec))
})
.await??;
if res.is_err() {
warn!("Duplicate alias");
return Ok(Err(UploadError::DuplicateAlias.into()));
}
Ok(Ok(()))
}
}
fn file_name(name: Uuid, input_type: ValidInputType) -> String {
format!("{}{}", name, input_type.as_ext())
}