mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Split upload_manager files
This commit is contained in:
parent
2fb8d3e39a
commit
0f3be6566e
5 changed files with 699 additions and 642 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -1205,6 +1205,7 @@ dependencies = [
|
||||||
"tracing-opentelemetry",
|
"tracing-opentelemetry",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"url",
|
"url",
|
||||||
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2303,6 +2304,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
|
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"getrandom",
|
"getrandom",
|
||||||
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -47,6 +47,7 @@ tracing-log = "0.1.2"
|
||||||
tracing-opentelemetry = "0.15"
|
tracing-opentelemetry = "0.15"
|
||||||
tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] }
|
tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] }
|
||||||
url = "2.2"
|
url = "2.2"
|
||||||
|
uuid = { version = "0.8.2", features = ["v4", "serde"]}
|
||||||
|
|
||||||
[dependencies.tracing-actix-web]
|
[dependencies.tracing-actix-web]
|
||||||
version = "0.4.0-beta.14"
|
version = "0.4.0-beta.14"
|
||||||
|
|
132
src/upload_manager/hasher.rs
Normal file
132
src/upload_manager/hasher.rs
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
use crate::error::Error;
|
||||||
|
use actix_web::web;
|
||||||
|
use sha2::Digest;
|
||||||
|
use std::{
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
use tokio::io::{AsyncRead, ReadBuf};
|
||||||
|
|
||||||
|
pin_project_lite::pin_project! {
|
||||||
|
pub(crate) struct Hasher<I, D> {
|
||||||
|
#[pin]
|
||||||
|
inner: I,
|
||||||
|
|
||||||
|
hasher: D,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) struct Hash {
|
||||||
|
inner: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I, D> Hasher<I, D>
|
||||||
|
where
|
||||||
|
D: Digest + Send + 'static,
|
||||||
|
{
|
||||||
|
pub(super) fn new(reader: I, digest: D) -> Self {
|
||||||
|
Hasher {
|
||||||
|
inner: reader,
|
||||||
|
hasher: digest,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn finalize_reset(self) -> Result<Hash, Error> {
|
||||||
|
let mut hasher = self.hasher;
|
||||||
|
let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?;
|
||||||
|
Ok(hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Hash {
|
||||||
|
fn new(inner: Vec<u8>) -> Self {
|
||||||
|
Hash { inner }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn as_slice(&self) -> &[u8] {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn into_inner(self) -> Vec<u8> {
|
||||||
|
self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I, D> AsyncRead for Hasher<I, D>
|
||||||
|
where
|
||||||
|
I: AsyncRead,
|
||||||
|
D: Digest,
|
||||||
|
{
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<std::io::Result<()>> {
|
||||||
|
let this = self.as_mut().project();
|
||||||
|
|
||||||
|
let reader = this.inner;
|
||||||
|
let hasher = this.hasher;
|
||||||
|
|
||||||
|
let before_len = buf.filled().len();
|
||||||
|
let poll_res = reader.poll_read(cx, buf);
|
||||||
|
let after_len = buf.filled().len();
|
||||||
|
if after_len > before_len {
|
||||||
|
hasher.update(&buf.filled()[before_len..after_len]);
|
||||||
|
}
|
||||||
|
poll_res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for Hash {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", base64::encode(&self.inner))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::Hasher;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
|
macro_rules! test_on_arbiter {
|
||||||
|
($fut:expr) => {
|
||||||
|
actix_rt::System::new().block_on(async move {
|
||||||
|
let arbiter = actix_rt::Arbiter::new();
|
||||||
|
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
|
||||||
|
arbiter.spawn(async move {
|
||||||
|
let handle = actix_rt::spawn($fut);
|
||||||
|
|
||||||
|
let _ = tx.send(handle.await.unwrap());
|
||||||
|
});
|
||||||
|
|
||||||
|
rx.await.unwrap()
|
||||||
|
})
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn hasher_works() {
|
||||||
|
let hash = test_on_arbiter!(async move {
|
||||||
|
let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?;
|
||||||
|
|
||||||
|
let mut hasher = Hasher::new(file1, Sha256::new());
|
||||||
|
|
||||||
|
tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?;
|
||||||
|
|
||||||
|
hasher.finalize_reset().await
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap();
|
||||||
|
let mut vec = Vec::new();
|
||||||
|
file.read_to_end(&mut vec).unwrap();
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(vec);
|
||||||
|
let correct_hash = hasher.finalize_reset().to_vec();
|
||||||
|
|
||||||
|
assert_eq!(hash.inner, correct_hash);
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,21 +2,18 @@ use crate::{
|
||||||
config::Format,
|
config::Format,
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb},
|
migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb},
|
||||||
to_ext,
|
|
||||||
};
|
};
|
||||||
use actix_web::web;
|
use actix_web::web;
|
||||||
use futures_util::stream::{LocalBoxStream, StreamExt};
|
|
||||||
use sha2::Digest;
|
use sha2::Digest;
|
||||||
use std::{
|
use std::{path::PathBuf, sync::Arc};
|
||||||
path::PathBuf,
|
|
||||||
pin::Pin,
|
|
||||||
sync::Arc,
|
|
||||||
task::{Context, Poll},
|
|
||||||
};
|
|
||||||
use tokio::io::{AsyncRead, ReadBuf};
|
|
||||||
use tracing::{debug, error, info, instrument, warn, Span};
|
use tracing::{debug, error, info, instrument, warn, Span};
|
||||||
use tracing_futures::Instrument;
|
use tracing_futures::Instrument;
|
||||||
|
|
||||||
|
mod hasher;
|
||||||
|
mod session;
|
||||||
|
|
||||||
|
pub(super) use session::UploadManagerSession;
|
||||||
|
|
||||||
// TREE STRUCTURE
|
// TREE STRUCTURE
|
||||||
// - Alias Tree
|
// - Alias Tree
|
||||||
// - alias -> hash
|
// - alias -> hash
|
||||||
|
@ -28,110 +25,19 @@ use tracing_futures::Instrument;
|
||||||
// - hash 2 variant path -> variant path
|
// - hash 2 variant path -> variant path
|
||||||
// - Filename Tree
|
// - Filename Tree
|
||||||
// - filename -> hash
|
// - filename -> hash
|
||||||
|
// - Path Tree
|
||||||
|
// - filename -> relative path
|
||||||
|
// - filename / variant operation path -> relative path
|
||||||
|
// - Settings Tree
|
||||||
|
// - last-path -> last generated path
|
||||||
|
// - fs-restructure-01-started -> bool
|
||||||
|
// - fs-restructure-01-complete -> bool
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct UploadManager {
|
pub struct UploadManager {
|
||||||
inner: Arc<UploadManagerInner>,
|
inner: Arc<UploadManagerInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct UploadManagerSession {
|
|
||||||
manager: UploadManager,
|
|
||||||
alias: Option<String>,
|
|
||||||
finished: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UploadManagerSession {
|
|
||||||
pub(crate) fn succeed(mut self) {
|
|
||||||
self.finished = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn alias(&self) -> Option<&str> {
|
|
||||||
self.alias.as_deref()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for UploadManagerSession {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if self.finished {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(alias) = self.alias.take() {
|
|
||||||
let manager = self.manager.clone();
|
|
||||||
let cleanup_span = tracing::info_span!(
|
|
||||||
parent: None,
|
|
||||||
"Upload cleanup",
|
|
||||||
alias = &tracing::field::display(&alias),
|
|
||||||
);
|
|
||||||
cleanup_span.follows_from(Span::current());
|
|
||||||
actix_rt::spawn(
|
|
||||||
async move {
|
|
||||||
// undo alias -> hash mapping
|
|
||||||
debug!("Remove alias -> hash mapping");
|
|
||||||
if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) {
|
|
||||||
// undo alias -> id mapping
|
|
||||||
debug!("Remove alias -> id mapping");
|
|
||||||
let key = alias_id_key(&alias);
|
|
||||||
if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) {
|
|
||||||
// undo hash/id -> alias mapping
|
|
||||||
debug!("Remove hash/id -> alias mapping");
|
|
||||||
let id = String::from_utf8_lossy(&id);
|
|
||||||
let key = alias_key(&hash, &id);
|
|
||||||
let _ = manager.inner.main_tree.remove(&key);
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = manager.check_delete_files(hash).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.instrument(cleanup_span),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Hasher<I, D> {
|
|
||||||
inner: I,
|
|
||||||
hasher: D,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, D> Hasher<I, D>
|
|
||||||
where
|
|
||||||
D: Digest + Send + 'static,
|
|
||||||
{
|
|
||||||
fn new(reader: I, digest: D) -> Self {
|
|
||||||
Hasher {
|
|
||||||
inner: reader,
|
|
||||||
hasher: digest,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn finalize_reset(self) -> Result<Hash, Error> {
|
|
||||||
let mut hasher = self.hasher;
|
|
||||||
let hash = web::block(move || Hash::new(hasher.finalize_reset().to_vec())).await?;
|
|
||||||
Ok(hash)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I, D> AsyncRead for Hasher<I, D>
|
|
||||||
where
|
|
||||||
I: AsyncRead + Unpin,
|
|
||||||
D: Digest + Unpin,
|
|
||||||
{
|
|
||||||
fn poll_read(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut ReadBuf<'_>,
|
|
||||||
) -> Poll<std::io::Result<()>> {
|
|
||||||
let before_len = buf.filled().len();
|
|
||||||
let poll_res = Pin::new(&mut self.inner).poll_read(cx, buf);
|
|
||||||
let after_len = buf.filled().len();
|
|
||||||
if after_len > before_len {
|
|
||||||
self.hasher.update(&buf.filled()[before_len..after_len]);
|
|
||||||
}
|
|
||||||
poll_res
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct UploadManagerInner {
|
struct UploadManagerInner {
|
||||||
format: Option<Format>,
|
format: Option<Format>,
|
||||||
hasher: sha2::Sha256,
|
hasher: sha2::Sha256,
|
||||||
|
@ -139,59 +45,16 @@ struct UploadManagerInner {
|
||||||
alias_tree: sled::Tree,
|
alias_tree: sled::Tree,
|
||||||
filename_tree: sled::Tree,
|
filename_tree: sled::Tree,
|
||||||
main_tree: sled::Tree,
|
main_tree: sled::Tree,
|
||||||
|
path_tree: sled::Tree,
|
||||||
|
settings_tree: sled::Tree,
|
||||||
db: sled::Db,
|
db: sled::Db,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for UploadManager {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
f.debug_struct("UploadManager").finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type UploadStream<E> = LocalBoxStream<'static, Result<web::Bytes, E>>;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub(crate) struct Serde<T> {
|
pub(crate) struct Serde<T> {
|
||||||
inner: T,
|
inner: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Serde<T> {
|
|
||||||
pub(crate) fn new(inner: T) -> Self {
|
|
||||||
Serde { inner }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> serde::Serialize for Serde<T>
|
|
||||||
where
|
|
||||||
T: std::fmt::Display,
|
|
||||||
{
|
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: serde::Serializer,
|
|
||||||
{
|
|
||||||
let s = self.inner.to_string();
|
|
||||||
serde::Serialize::serialize(s.as_str(), serializer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'de, T> serde::Deserialize<'de> for Serde<T>
|
|
||||||
where
|
|
||||||
T: std::str::FromStr,
|
|
||||||
<T as std::str::FromStr>::Err: std::fmt::Display,
|
|
||||||
{
|
|
||||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
|
||||||
where
|
|
||||||
D: serde::Deserializer<'de>,
|
|
||||||
{
|
|
||||||
let s: String = serde::Deserialize::deserialize(deserializer)?;
|
|
||||||
let inner = s
|
|
||||||
.parse::<T>()
|
|
||||||
.map_err(|e| serde::de::Error::custom(e.to_string()))?;
|
|
||||||
|
|
||||||
Ok(Serde { inner })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Details {
|
pub(crate) struct Details {
|
||||||
width: usize,
|
width: usize,
|
||||||
|
@ -200,93 +63,10 @@ pub(crate) struct Details {
|
||||||
created_at: time::OffsetDateTime,
|
created_at: time::OffsetDateTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Details {
|
|
||||||
#[tracing::instrument("Details from bytes", skip(input))]
|
|
||||||
pub(crate) async fn from_bytes(input: web::Bytes) -> Result<Self, Error> {
|
|
||||||
let details = crate::magick::details_bytes(input).await?;
|
|
||||||
|
|
||||||
Ok(Details::now(
|
|
||||||
details.width,
|
|
||||||
details.height,
|
|
||||||
details.mime_type,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument("Details from path", fields(path = &tracing::field::debug(&path.as_ref())))]
|
|
||||||
pub(crate) async fn from_path<P>(path: P) -> Result<Self, Error>
|
|
||||||
where
|
|
||||||
P: AsRef<std::path::Path>,
|
|
||||||
{
|
|
||||||
let details = crate::magick::details(&path).await?;
|
|
||||||
|
|
||||||
Ok(Details::now(
|
|
||||||
details.width,
|
|
||||||
details.height,
|
|
||||||
details.mime_type,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn now(width: usize, height: usize, content_type: mime::Mime) -> Self {
|
|
||||||
Details {
|
|
||||||
width,
|
|
||||||
height,
|
|
||||||
content_type: Serde::new(content_type),
|
|
||||||
created_at: time::OffsetDateTime::now_utc(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn content_type(&self) -> mime::Mime {
|
|
||||||
self.content_type.inner.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn system_time(&self) -> std::time::SystemTime {
|
|
||||||
self.created_at.into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct FilenameIVec {
|
struct FilenameIVec {
|
||||||
inner: sled::IVec,
|
inner: sled::IVec,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FilenameIVec {
|
|
||||||
fn new(inner: sled::IVec) -> Self {
|
|
||||||
FilenameIVec { inner }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Debug for FilenameIVec {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
write!(f, "{:?}", String::from_utf8(self.inner.to_vec()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Hash {
|
|
||||||
inner: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Hash {
|
|
||||||
fn new(inner: Vec<u8>) -> Self {
|
|
||||||
Hash { inner }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Debug for Hash {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
||||||
write!(f, "{}", base64::encode(&self.inner))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum Dup {
|
|
||||||
Exists,
|
|
||||||
New,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Dup {
|
|
||||||
fn exists(&self) -> bool {
|
|
||||||
matches!(self, Dup::Exists)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UploadManager {
|
impl UploadManager {
|
||||||
/// Get the image directory
|
/// Get the image directory
|
||||||
pub(crate) fn image_dir(&self) -> PathBuf {
|
pub(crate) fn image_dir(&self) -> PathBuf {
|
||||||
|
@ -304,7 +84,9 @@ impl UploadManager {
|
||||||
// Ensure file dir exists
|
// Ensure file dir exists
|
||||||
tokio::fs::create_dir_all(&root_dir).await?;
|
tokio::fs::create_dir_all(&root_dir).await?;
|
||||||
|
|
||||||
Ok(UploadManager {
|
let settings_tree = db.open_tree("settings")?;
|
||||||
|
|
||||||
|
let manager = UploadManager {
|
||||||
inner: Arc::new(UploadManagerInner {
|
inner: Arc::new(UploadManagerInner {
|
||||||
format,
|
format,
|
||||||
hasher: sha2::Sha256::new(),
|
hasher: sha2::Sha256::new(),
|
||||||
|
@ -312,9 +94,13 @@ impl UploadManager {
|
||||||
alias_tree: db.open_tree("alias")?,
|
alias_tree: db.open_tree("alias")?,
|
||||||
filename_tree: db.open_tree("filename")?,
|
filename_tree: db.open_tree("filename")?,
|
||||||
main_tree: db.open_tree("main")?,
|
main_tree: db.open_tree("main")?,
|
||||||
|
path_tree: db.open_tree("path")?,
|
||||||
|
settings_tree,
|
||||||
db,
|
db,
|
||||||
}),
|
}),
|
||||||
})
|
};
|
||||||
|
|
||||||
|
Ok(manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Store the path to a generated image variant so we can easily clean it up later
|
/// Store the path to a generated image variant so we can easily clean it up later
|
||||||
|
@ -572,11 +358,7 @@ impl UploadManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn session(&self) -> UploadManagerSession {
|
pub(crate) fn session(&self) -> UploadManagerSession {
|
||||||
UploadManagerSession {
|
UploadManagerSession::new(self.clone())
|
||||||
manager: self.clone(),
|
|
||||||
alias: None,
|
|
||||||
finished: false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find image variants and remove them from the DB and the disk
|
// Find image variants and remove them from the DB and the disk
|
||||||
|
@ -635,367 +417,60 @@ impl UploadManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UploadManagerSession {
|
impl<T> Serde<T> {
|
||||||
/// Generate a delete token for an alias
|
pub(crate) fn new(inner: T) -> Self {
|
||||||
#[instrument(skip(self))]
|
Serde { inner }
|
||||||
pub(crate) async fn delete_token(&self) -> Result<String, Error> {
|
|
||||||
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
|
|
||||||
|
|
||||||
debug!("Generating delete token");
|
|
||||||
use rand::distributions::{Alphanumeric, Distribution};
|
|
||||||
let rng = rand::thread_rng();
|
|
||||||
let s: String = Alphanumeric
|
|
||||||
.sample_iter(rng)
|
|
||||||
.take(10)
|
|
||||||
.map(char::from)
|
|
||||||
.collect();
|
|
||||||
let delete_token = s.clone();
|
|
||||||
|
|
||||||
debug!("Saving delete token");
|
|
||||||
let alias_tree = self.manager.inner.alias_tree.clone();
|
|
||||||
let key = delete_key(&alias);
|
|
||||||
let res = web::block(move || {
|
|
||||||
alias_tree.compare_and_swap(
|
|
||||||
key.as_bytes(),
|
|
||||||
None as Option<sled::IVec>,
|
|
||||||
Some(s.as_bytes()),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
if let Err(sled::CompareAndSwapError {
|
|
||||||
current: Some(ivec),
|
|
||||||
..
|
|
||||||
}) = res
|
|
||||||
{
|
|
||||||
let s = String::from_utf8(ivec.to_vec())?;
|
|
||||||
|
|
||||||
debug!("Returning existing delete token, {}", s);
|
|
||||||
return Ok(s);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Returning new delete token, {}", delete_token);
|
|
||||||
Ok(delete_token)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Upload the file while preserving the filename, optionally validating the uploaded image
|
|
||||||
#[instrument(skip(self, stream))]
|
|
||||||
pub(crate) async fn import<E>(
|
|
||||||
mut self,
|
|
||||||
alias: String,
|
|
||||||
content_type: mime::Mime,
|
|
||||||
validate: bool,
|
|
||||||
mut stream: UploadStream<E>,
|
|
||||||
) -> Result<Self, Error>
|
|
||||||
where
|
|
||||||
Error: From<E>,
|
|
||||||
E: Unpin + 'static,
|
|
||||||
{
|
|
||||||
let mut bytes_mut = actix_web::web::BytesMut::new();
|
|
||||||
|
|
||||||
debug!("Reading stream to memory");
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
let bytes = res?;
|
|
||||||
bytes_mut.extend_from_slice(&bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Validating bytes");
|
|
||||||
let (content_type, validated_reader) = crate::validate::validate_image_bytes(
|
|
||||||
bytes_mut.freeze(),
|
|
||||||
self.manager.inner.format.clone(),
|
|
||||||
validate,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
|
|
||||||
|
|
||||||
let tmpfile = crate::tmp_file();
|
|
||||||
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
|
|
||||||
let hash = hasher_reader.finalize_reset().await?;
|
|
||||||
|
|
||||||
debug!("Storing alias");
|
|
||||||
self.alias = Some(alias.clone());
|
|
||||||
self.add_existing_alias(&hash, &alias).await?;
|
|
||||||
|
|
||||||
debug!("Saving file");
|
|
||||||
self.save_upload(tmpfile, hash, content_type).await?;
|
|
||||||
|
|
||||||
// Return alias to file
|
|
||||||
Ok(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Upload the file, discarding bytes if it's already present, or saving if it's new
|
|
||||||
#[instrument(skip(self, stream))]
|
|
||||||
pub(crate) async fn upload<E>(mut self, mut stream: UploadStream<E>) -> Result<Self, Error>
|
|
||||||
where
|
|
||||||
Error: From<E>,
|
|
||||||
{
|
|
||||||
let mut bytes_mut = actix_web::web::BytesMut::new();
|
|
||||||
|
|
||||||
debug!("Reading stream to memory");
|
|
||||||
while let Some(res) = stream.next().await {
|
|
||||||
let bytes = res?;
|
|
||||||
bytes_mut.extend_from_slice(&bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Validating bytes");
|
|
||||||
let (content_type, validated_reader) = crate::validate::validate_image_bytes(
|
|
||||||
bytes_mut.freeze(),
|
|
||||||
self.manager.inner.format.clone(),
|
|
||||||
true,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
|
|
||||||
|
|
||||||
let tmpfile = crate::tmp_file();
|
|
||||||
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
|
|
||||||
let hash = hasher_reader.finalize_reset().await?;
|
|
||||||
|
|
||||||
debug!("Adding alias");
|
|
||||||
self.add_alias(&hash, content_type.clone()).await?;
|
|
||||||
|
|
||||||
debug!("Saving file");
|
|
||||||
self.save_upload(tmpfile, hash, content_type).await?;
|
|
||||||
|
|
||||||
// Return alias to file
|
|
||||||
Ok(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check duplicates & store image if new
|
|
||||||
async fn save_upload(
|
|
||||||
&self,
|
|
||||||
tmpfile: PathBuf,
|
|
||||||
hash: Hash,
|
|
||||||
content_type: mime::Mime,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let (dup, name) = self.check_duplicate(hash, content_type).await?;
|
|
||||||
|
|
||||||
// bail early with alias to existing file if this is a duplicate
|
|
||||||
if dup.exists() {
|
|
||||||
debug!("Duplicate exists, not saving file");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// -- WRITE NEW FILE --
|
|
||||||
let mut real_path = self.manager.image_dir();
|
|
||||||
real_path.push(name);
|
|
||||||
|
|
||||||
crate::safe_move_file(tmpfile, real_path).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// check for an already-uploaded image with this hash, returning the path to the target file
|
|
||||||
#[instrument(skip(self, hash, content_type))]
|
|
||||||
async fn check_duplicate(
|
|
||||||
&self,
|
|
||||||
hash: Hash,
|
|
||||||
content_type: mime::Mime,
|
|
||||||
) -> Result<(Dup, String), Error> {
|
|
||||||
let main_tree = self.manager.inner.main_tree.clone();
|
|
||||||
|
|
||||||
let filename = self.next_file(content_type).await?;
|
|
||||||
let filename2 = filename.clone();
|
|
||||||
let hash2 = hash.inner.clone();
|
|
||||||
debug!("Inserting filename for hash");
|
|
||||||
let res = web::block(move || {
|
|
||||||
main_tree.compare_and_swap(
|
|
||||||
hash2,
|
|
||||||
None as Option<sled::IVec>,
|
|
||||||
Some(filename2.as_bytes()),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
if let Err(sled::CompareAndSwapError {
|
|
||||||
current: Some(ivec),
|
|
||||||
..
|
|
||||||
}) = res
|
|
||||||
{
|
|
||||||
let name = String::from_utf8(ivec.to_vec())?;
|
|
||||||
debug!("Filename exists for hash, {}", name);
|
|
||||||
return Ok((Dup::Exists, name));
|
|
||||||
}
|
|
||||||
|
|
||||||
let fname_tree = self.manager.inner.filename_tree.clone();
|
|
||||||
let filename2 = filename.clone();
|
|
||||||
debug!("Saving filename -> hash relation");
|
|
||||||
web::block(move || fname_tree.insert(filename2, hash.inner)).await??;
|
|
||||||
|
|
||||||
Ok((Dup::New, filename))
|
|
||||||
}
|
|
||||||
|
|
||||||
// generate a short filename that isn't already in-use
|
|
||||||
#[instrument(skip(self, content_type))]
|
|
||||||
async fn next_file(&self, content_type: mime::Mime) -> Result<String, Error> {
|
|
||||||
let image_dir = self.manager.image_dir();
|
|
||||||
use rand::distributions::{Alphanumeric, Distribution};
|
|
||||||
let mut limit: usize = 10;
|
|
||||||
let mut rng = rand::thread_rng();
|
|
||||||
loop {
|
|
||||||
debug!("Filename generation loop");
|
|
||||||
let mut path = image_dir.clone();
|
|
||||||
let s: String = Alphanumeric
|
|
||||||
.sample_iter(&mut rng)
|
|
||||||
.take(limit)
|
|
||||||
.map(char::from)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let filename = file_name(s, content_type.clone())?;
|
|
||||||
|
|
||||||
path.push(filename.clone());
|
|
||||||
|
|
||||||
if let Err(e) = tokio::fs::metadata(path).await {
|
|
||||||
if e.kind() == std::io::ErrorKind::NotFound {
|
|
||||||
debug!("Generated unused filename {}", filename);
|
|
||||||
return Ok(filename);
|
|
||||||
}
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Filename exists, trying again");
|
|
||||||
|
|
||||||
limit += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(skip(self, hash, alias))]
|
|
||||||
async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), Error> {
|
|
||||||
self.save_alias_hash_mapping(hash, alias).await??;
|
|
||||||
|
|
||||||
self.store_hash_id_alias_mapping(hash, alias).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add an alias to an existing file
|
|
||||||
//
|
|
||||||
// This will help if multiple 'users' upload the same file, and one of them wants to delete it
|
|
||||||
#[instrument(skip(self, hash, content_type))]
|
|
||||||
async fn add_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<(), Error> {
|
|
||||||
let alias = self.next_alias(hash, content_type).await?;
|
|
||||||
|
|
||||||
self.store_hash_id_alias_mapping(hash, &alias).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add a pre-defined alias to an existin file
|
|
||||||
//
|
|
||||||
// DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files
|
|
||||||
#[instrument(skip(self, hash))]
|
|
||||||
async fn store_hash_id_alias_mapping(&self, hash: &Hash, alias: &str) -> Result<(), Error> {
|
|
||||||
let alias = alias.to_string();
|
|
||||||
loop {
|
|
||||||
debug!("hash -> alias save loop");
|
|
||||||
let db = self.manager.inner.db.clone();
|
|
||||||
let id = web::block(move || db.generate_id()).await??.to_string();
|
|
||||||
|
|
||||||
let alias_tree = self.manager.inner.alias_tree.clone();
|
|
||||||
let key = alias_id_key(&alias);
|
|
||||||
let id2 = id.clone();
|
|
||||||
debug!("Saving alias -> id mapping");
|
|
||||||
web::block(move || alias_tree.insert(key.as_bytes(), id2.as_bytes())).await??;
|
|
||||||
|
|
||||||
let key = alias_key(&hash.inner, &id);
|
|
||||||
let main_tree = self.manager.inner.main_tree.clone();
|
|
||||||
let alias2 = alias.clone();
|
|
||||||
debug!("Saving hash/id -> alias mapping");
|
|
||||||
let res = web::block(move || {
|
|
||||||
main_tree.compare_and_swap(key, None as Option<sled::IVec>, Some(alias2.as_bytes()))
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
if res.is_ok() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Id exists, trying again");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate an alias to the file
|
|
||||||
#[instrument(skip(self, hash, content_type))]
|
|
||||||
async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<String, Error> {
|
|
||||||
use rand::distributions::{Alphanumeric, Distribution};
|
|
||||||
let mut limit: usize = 10;
|
|
||||||
let mut rng = rand::thread_rng();
|
|
||||||
loop {
|
|
||||||
debug!("Alias gen loop");
|
|
||||||
let s: String = Alphanumeric
|
|
||||||
.sample_iter(&mut rng)
|
|
||||||
.take(limit)
|
|
||||||
.map(char::from)
|
|
||||||
.collect();
|
|
||||||
let alias = file_name(s, content_type.clone())?;
|
|
||||||
self.alias = Some(alias.clone());
|
|
||||||
|
|
||||||
let res = self.save_alias_hash_mapping(hash, &alias).await?;
|
|
||||||
|
|
||||||
if res.is_ok() {
|
|
||||||
return Ok(alias);
|
|
||||||
}
|
|
||||||
debug!("Alias exists, regenning");
|
|
||||||
|
|
||||||
limit += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save an alias to the database
|
|
||||||
#[instrument(skip(self, hash))]
|
|
||||||
async fn save_alias_hash_mapping(
|
|
||||||
&self,
|
|
||||||
hash: &Hash,
|
|
||||||
alias: &str,
|
|
||||||
) -> Result<Result<(), Error>, Error> {
|
|
||||||
let tree = self.manager.inner.alias_tree.clone();
|
|
||||||
let vec = hash.inner.clone();
|
|
||||||
let alias = alias.to_string();
|
|
||||||
|
|
||||||
debug!("Saving alias -> hash mapping");
|
|
||||||
let res = web::block(move || {
|
|
||||||
tree.compare_and_swap(alias.as_bytes(), None as Option<sled::IVec>, Some(vec))
|
|
||||||
})
|
|
||||||
.await??;
|
|
||||||
|
|
||||||
if res.is_err() {
|
|
||||||
warn!("Duplicate alias");
|
|
||||||
return Ok(Err(UploadError::DuplicateAlias.into()));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Ok(()))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(input))]
|
impl Details {
|
||||||
pub(crate) async fn safe_save_reader(
|
#[tracing::instrument("Details from bytes", skip(input))]
|
||||||
to: PathBuf,
|
pub(crate) async fn from_bytes(input: web::Bytes) -> Result<Self, Error> {
|
||||||
input: &mut (impl AsyncRead + Unpin),
|
let details = crate::magick::details_bytes(input).await?;
|
||||||
) -> Result<(), Error> {
|
|
||||||
if let Some(path) = to.parent() {
|
Ok(Details::now(
|
||||||
debug!("Creating directory {:?}", path);
|
details.width,
|
||||||
tokio::fs::create_dir_all(path.to_owned()).await?;
|
details.height,
|
||||||
|
details.mime_type,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Checking if {:?} already exists", to);
|
#[tracing::instrument("Details from path", fields(path = &tracing::field::debug(&path.as_ref())))]
|
||||||
if let Err(e) = tokio::fs::metadata(to.clone()).await {
|
pub(crate) async fn from_path<P>(path: P) -> Result<Self, Error>
|
||||||
if e.kind() != std::io::ErrorKind::NotFound {
|
where
|
||||||
return Err(e.into());
|
P: AsRef<std::path::Path>,
|
||||||
|
{
|
||||||
|
let details = crate::magick::details(&path).await?;
|
||||||
|
|
||||||
|
Ok(Details::now(
|
||||||
|
details.width,
|
||||||
|
details.height,
|
||||||
|
details.mime_type,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn now(width: usize, height: usize, content_type: mime::Mime) -> Self {
|
||||||
|
Details {
|
||||||
|
width,
|
||||||
|
height,
|
||||||
|
content_type: Serde::new(content_type),
|
||||||
|
created_at: time::OffsetDateTime::now_utc(),
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return Err(UploadError::FileExists.into());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Writing stream to {:?}", to);
|
pub(crate) fn content_type(&self) -> mime::Mime {
|
||||||
|
self.content_type.inner.clone()
|
||||||
|
}
|
||||||
|
|
||||||
let mut file = crate::file::File::create(to).await?;
|
pub(crate) fn system_time(&self) -> std::time::SystemTime {
|
||||||
|
self.created_at.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
file.write_from_async_read(input).await?;
|
impl FilenameIVec {
|
||||||
|
fn new(inner: sled::IVec) -> Self {
|
||||||
Ok(())
|
FilenameIVec { inner }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_path(path: sled::IVec) -> Result<(), Error> {
|
async fn remove_path(path: sled::IVec) -> Result<(), Error> {
|
||||||
|
@ -1011,10 +486,6 @@ where
|
||||||
sled::transaction::ConflictableTransactionError::Abort(e.into())
|
sled::transaction::ConflictableTransactionError::Abort(e.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn file_name(name: String, content_type: mime::Mime) -> Result<String, Error> {
|
|
||||||
Ok(format!("{}{}", name, to_ext(content_type)?))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete_key(alias: &str) -> String {
|
fn delete_key(alias: &str) -> String {
|
||||||
format!("{}/delete", alias)
|
format!("{}/delete", alias)
|
||||||
}
|
}
|
||||||
|
@ -1034,50 +505,45 @@ fn variant_details_key(hash: &[u8], path: &str) -> Vec<u8> {
|
||||||
key
|
key
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
impl std::fmt::Debug for UploadManager {
|
||||||
mod test {
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
use super::Hasher;
|
f.debug_struct("UploadManager").finish()
|
||||||
use sha2::{Digest, Sha256};
|
}
|
||||||
use std::io::Read;
|
}
|
||||||
|
|
||||||
macro_rules! test_on_arbiter {
|
impl<T> serde::Serialize for Serde<T>
|
||||||
($fut:expr) => {
|
where
|
||||||
actix_rt::System::new().block_on(async move {
|
T: std::fmt::Display,
|
||||||
let arbiter = actix_rt::Arbiter::new();
|
{
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
where
|
||||||
|
S: serde::Serializer,
|
||||||
arbiter.spawn(async move {
|
{
|
||||||
let handle = actix_rt::spawn($fut);
|
let s = self.inner.to_string();
|
||||||
|
serde::Serialize::serialize(s.as_str(), serializer)
|
||||||
let _ = tx.send(handle.await.unwrap());
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
rx.await.unwrap()
|
impl<'de, T> serde::Deserialize<'de> for Serde<T>
|
||||||
})
|
where
|
||||||
};
|
T: std::str::FromStr,
|
||||||
}
|
<T as std::str::FromStr>::Err: std::fmt::Display,
|
||||||
|
{
|
||||||
#[test]
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
fn hasher_works() {
|
where
|
||||||
let hash = test_on_arbiter!(async move {
|
D: serde::Deserializer<'de>,
|
||||||
let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?;
|
{
|
||||||
|
let s: String = serde::Deserialize::deserialize(deserializer)?;
|
||||||
let mut hasher = Hasher::new(file1, Sha256::new());
|
let inner = s
|
||||||
|
.parse::<T>()
|
||||||
tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?;
|
.map_err(|e| serde::de::Error::custom(e.to_string()))?;
|
||||||
|
|
||||||
hasher.finalize_reset().await
|
Ok(Serde { inner })
|
||||||
})
|
}
|
||||||
.unwrap();
|
}
|
||||||
|
|
||||||
let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap();
|
impl std::fmt::Debug for FilenameIVec {
|
||||||
let mut vec = Vec::new();
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
file.read_to_end(&mut vec).unwrap();
|
write!(f, "{:?}", String::from_utf8(self.inner.to_vec()))
|
||||||
let mut hasher = Sha256::new();
|
|
||||||
hasher.update(vec);
|
|
||||||
let correct_hash = hasher.finalize_reset().to_vec();
|
|
||||||
|
|
||||||
assert_eq!(hash.inner, correct_hash);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
456
src/upload_manager/session.rs
Normal file
456
src/upload_manager/session.rs
Normal file
|
@ -0,0 +1,456 @@
|
||||||
|
use crate::{
|
||||||
|
error::{Error, UploadError},
|
||||||
|
migrate::{alias_id_key, alias_key},
|
||||||
|
to_ext,
|
||||||
|
upload_manager::{
|
||||||
|
delete_key,
|
||||||
|
hasher::{Hash, Hasher},
|
||||||
|
UploadManager,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use actix_web::web;
|
||||||
|
use futures_util::stream::{LocalBoxStream, StreamExt};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use tokio::io::AsyncRead;
|
||||||
|
use tracing::{debug, instrument, warn, Span};
|
||||||
|
use tracing_futures::Instrument;
|
||||||
|
|
||||||
|
type UploadStream<E> = LocalBoxStream<'static, Result<web::Bytes, E>>;
|
||||||
|
|
||||||
|
pub(crate) struct UploadManagerSession {
|
||||||
|
manager: UploadManager,
|
||||||
|
alias: Option<String>,
|
||||||
|
finished: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UploadManagerSession {
|
||||||
|
pub(super) fn new(manager: UploadManager) -> Self {
|
||||||
|
UploadManagerSession {
|
||||||
|
manager,
|
||||||
|
alias: None,
|
||||||
|
finished: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn succeed(mut self) {
|
||||||
|
self.finished = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn alias(&self) -> Option<&str> {
|
||||||
|
self.alias.as_deref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Dup {
|
||||||
|
Exists,
|
||||||
|
New,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Dup {
|
||||||
|
fn exists(&self) -> bool {
|
||||||
|
matches!(self, Dup::Exists)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for UploadManagerSession {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.finished {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(alias) = self.alias.take() {
|
||||||
|
let manager = self.manager.clone();
|
||||||
|
let cleanup_span = tracing::info_span!(
|
||||||
|
parent: None,
|
||||||
|
"Upload cleanup",
|
||||||
|
alias = &tracing::field::display(&alias),
|
||||||
|
);
|
||||||
|
cleanup_span.follows_from(Span::current());
|
||||||
|
actix_rt::spawn(
|
||||||
|
async move {
|
||||||
|
// undo alias -> hash mapping
|
||||||
|
debug!("Remove alias -> hash mapping");
|
||||||
|
if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) {
|
||||||
|
// undo alias -> id mapping
|
||||||
|
debug!("Remove alias -> id mapping");
|
||||||
|
let key = alias_id_key(&alias);
|
||||||
|
if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) {
|
||||||
|
// undo hash/id -> alias mapping
|
||||||
|
debug!("Remove hash/id -> alias mapping");
|
||||||
|
let id = String::from_utf8_lossy(&id);
|
||||||
|
let key = alias_key(&hash, &id);
|
||||||
|
let _ = manager.inner.main_tree.remove(&key);
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = manager.check_delete_files(hash).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.instrument(cleanup_span),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UploadManagerSession {
|
||||||
|
/// Generate a delete token for an alias
|
||||||
|
#[instrument(skip(self))]
|
||||||
|
pub(crate) async fn delete_token(&self) -> Result<String, Error> {
|
||||||
|
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
|
||||||
|
|
||||||
|
debug!("Generating delete token");
|
||||||
|
use rand::distributions::{Alphanumeric, Distribution};
|
||||||
|
let rng = rand::thread_rng();
|
||||||
|
let s: String = Alphanumeric
|
||||||
|
.sample_iter(rng)
|
||||||
|
.take(10)
|
||||||
|
.map(char::from)
|
||||||
|
.collect();
|
||||||
|
let delete_token = s.clone();
|
||||||
|
|
||||||
|
debug!("Saving delete token");
|
||||||
|
let alias_tree = self.manager.inner.alias_tree.clone();
|
||||||
|
let key = delete_key(&alias);
|
||||||
|
let res = web::block(move || {
|
||||||
|
alias_tree.compare_and_swap(
|
||||||
|
key.as_bytes(),
|
||||||
|
None as Option<sled::IVec>,
|
||||||
|
Some(s.as_bytes()),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
if let Err(sled::CompareAndSwapError {
|
||||||
|
current: Some(ivec),
|
||||||
|
..
|
||||||
|
}) = res
|
||||||
|
{
|
||||||
|
let s = String::from_utf8(ivec.to_vec())?;
|
||||||
|
|
||||||
|
debug!("Returning existing delete token, {}", s);
|
||||||
|
return Ok(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Returning new delete token, {}", delete_token);
|
||||||
|
Ok(delete_token)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Upload the file while preserving the filename, optionally validating the uploaded image
|
||||||
|
#[instrument(skip(self, stream))]
|
||||||
|
pub(crate) async fn import<E>(
|
||||||
|
mut self,
|
||||||
|
alias: String,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
validate: bool,
|
||||||
|
mut stream: UploadStream<E>,
|
||||||
|
) -> Result<Self, Error>
|
||||||
|
where
|
||||||
|
Error: From<E>,
|
||||||
|
E: Unpin + 'static,
|
||||||
|
{
|
||||||
|
let mut bytes_mut = actix_web::web::BytesMut::new();
|
||||||
|
|
||||||
|
debug!("Reading stream to memory");
|
||||||
|
while let Some(res) = stream.next().await {
|
||||||
|
let bytes = res?;
|
||||||
|
bytes_mut.extend_from_slice(&bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Validating bytes");
|
||||||
|
let (content_type, validated_reader) = crate::validate::validate_image_bytes(
|
||||||
|
bytes_mut.freeze(),
|
||||||
|
self.manager.inner.format.clone(),
|
||||||
|
validate,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
|
||||||
|
|
||||||
|
let tmpfile = crate::tmp_file();
|
||||||
|
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
|
||||||
|
let hash = hasher_reader.finalize_reset().await?;
|
||||||
|
|
||||||
|
debug!("Storing alias");
|
||||||
|
self.alias = Some(alias.clone());
|
||||||
|
self.add_existing_alias(&hash, &alias).await?;
|
||||||
|
|
||||||
|
debug!("Saving file");
|
||||||
|
self.save_upload(tmpfile, hash, content_type).await?;
|
||||||
|
|
||||||
|
// Return alias to file
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Upload the file, discarding bytes if it's already present, or saving if it's new
|
||||||
|
#[instrument(skip(self, stream))]
|
||||||
|
pub(crate) async fn upload<E>(mut self, mut stream: UploadStream<E>) -> Result<Self, Error>
|
||||||
|
where
|
||||||
|
Error: From<E>,
|
||||||
|
{
|
||||||
|
let mut bytes_mut = actix_web::web::BytesMut::new();
|
||||||
|
|
||||||
|
debug!("Reading stream to memory");
|
||||||
|
while let Some(res) = stream.next().await {
|
||||||
|
let bytes = res?;
|
||||||
|
bytes_mut.extend_from_slice(&bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Validating bytes");
|
||||||
|
let (content_type, validated_reader) = crate::validate::validate_image_bytes(
|
||||||
|
bytes_mut.freeze(),
|
||||||
|
self.manager.inner.format.clone(),
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
|
||||||
|
|
||||||
|
let tmpfile = crate::tmp_file();
|
||||||
|
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
|
||||||
|
let hash = hasher_reader.finalize_reset().await?;
|
||||||
|
|
||||||
|
debug!("Adding alias");
|
||||||
|
self.add_alias(&hash, content_type.clone()).await?;
|
||||||
|
|
||||||
|
debug!("Saving file");
|
||||||
|
self.save_upload(tmpfile, hash, content_type).await?;
|
||||||
|
|
||||||
|
// Return alias to file
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check duplicates & store image if new
|
||||||
|
async fn save_upload(
|
||||||
|
&self,
|
||||||
|
tmpfile: PathBuf,
|
||||||
|
hash: Hash,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let (dup, name) = self.check_duplicate(hash, content_type).await?;
|
||||||
|
|
||||||
|
// bail early with alias to existing file if this is a duplicate
|
||||||
|
if dup.exists() {
|
||||||
|
debug!("Duplicate exists, not saving file");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- WRITE NEW FILE --
|
||||||
|
let mut real_path = self.manager.image_dir();
|
||||||
|
real_path.push(name);
|
||||||
|
|
||||||
|
crate::safe_move_file(tmpfile, real_path).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// check for an already-uploaded image with this hash, returning the path to the target file
|
||||||
|
#[instrument(skip(self, hash, content_type))]
|
||||||
|
async fn check_duplicate(
|
||||||
|
&self,
|
||||||
|
hash: Hash,
|
||||||
|
content_type: mime::Mime,
|
||||||
|
) -> Result<(Dup, String), Error> {
|
||||||
|
let main_tree = self.manager.inner.main_tree.clone();
|
||||||
|
|
||||||
|
let filename = self.next_file(content_type).await?;
|
||||||
|
let filename2 = filename.clone();
|
||||||
|
let hash2 = hash.as_slice().to_vec();
|
||||||
|
debug!("Inserting filename for hash");
|
||||||
|
let res = web::block(move || {
|
||||||
|
main_tree.compare_and_swap(
|
||||||
|
hash2,
|
||||||
|
None as Option<sled::IVec>,
|
||||||
|
Some(filename2.as_bytes()),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
if let Err(sled::CompareAndSwapError {
|
||||||
|
current: Some(ivec),
|
||||||
|
..
|
||||||
|
}) = res
|
||||||
|
{
|
||||||
|
let name = String::from_utf8(ivec.to_vec())?;
|
||||||
|
debug!("Filename exists for hash, {}", name);
|
||||||
|
return Ok((Dup::Exists, name));
|
||||||
|
}
|
||||||
|
|
||||||
|
let fname_tree = self.manager.inner.filename_tree.clone();
|
||||||
|
let filename2 = filename.clone();
|
||||||
|
debug!("Saving filename -> hash relation");
|
||||||
|
web::block(move || fname_tree.insert(filename2, hash.into_inner())).await??;
|
||||||
|
|
||||||
|
Ok((Dup::New, filename))
|
||||||
|
}
|
||||||
|
|
||||||
|
// generate a short filename that isn't already in-use
|
||||||
|
#[instrument(skip(self, content_type))]
|
||||||
|
async fn next_file(&self, content_type: mime::Mime) -> Result<String, Error> {
|
||||||
|
let image_dir = self.manager.image_dir();
|
||||||
|
use rand::distributions::{Alphanumeric, Distribution};
|
||||||
|
let mut limit: usize = 10;
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
loop {
|
||||||
|
debug!("Filename generation loop");
|
||||||
|
let mut path = image_dir.clone();
|
||||||
|
let s: String = Alphanumeric
|
||||||
|
.sample_iter(&mut rng)
|
||||||
|
.take(limit)
|
||||||
|
.map(char::from)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let filename = file_name(s, content_type.clone())?;
|
||||||
|
|
||||||
|
path.push(filename.clone());
|
||||||
|
|
||||||
|
if let Err(e) = tokio::fs::metadata(path).await {
|
||||||
|
if e.kind() == std::io::ErrorKind::NotFound {
|
||||||
|
debug!("Generated unused filename {}", filename);
|
||||||
|
return Ok(filename);
|
||||||
|
}
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Filename exists, trying again");
|
||||||
|
|
||||||
|
limit += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self, hash, alias))]
|
||||||
|
async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), Error> {
|
||||||
|
self.save_alias_hash_mapping(hash, alias).await??;
|
||||||
|
|
||||||
|
self.store_hash_id_alias_mapping(hash, alias).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add an alias to an existing file
|
||||||
|
//
|
||||||
|
// This will help if multiple 'users' upload the same file, and one of them wants to delete it
|
||||||
|
#[instrument(skip(self, hash, content_type))]
|
||||||
|
async fn add_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<(), Error> {
|
||||||
|
let alias = self.next_alias(hash, content_type).await?;
|
||||||
|
|
||||||
|
self.store_hash_id_alias_mapping(hash, &alias).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a pre-defined alias to an existin file
|
||||||
|
//
|
||||||
|
// DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files
|
||||||
|
#[instrument(skip(self, hash))]
|
||||||
|
async fn store_hash_id_alias_mapping(&self, hash: &Hash, alias: &str) -> Result<(), Error> {
|
||||||
|
let alias = alias.to_string();
|
||||||
|
loop {
|
||||||
|
debug!("hash -> alias save loop");
|
||||||
|
let db = self.manager.inner.db.clone();
|
||||||
|
let id = web::block(move || db.generate_id()).await??.to_string();
|
||||||
|
|
||||||
|
let alias_tree = self.manager.inner.alias_tree.clone();
|
||||||
|
let key = alias_id_key(&alias);
|
||||||
|
let id2 = id.clone();
|
||||||
|
debug!("Saving alias -> id mapping");
|
||||||
|
web::block(move || alias_tree.insert(key.as_bytes(), id2.as_bytes())).await??;
|
||||||
|
|
||||||
|
let key = alias_key(hash.as_slice(), &id);
|
||||||
|
let main_tree = self.manager.inner.main_tree.clone();
|
||||||
|
let alias2 = alias.clone();
|
||||||
|
debug!("Saving hash/id -> alias mapping");
|
||||||
|
let res = web::block(move || {
|
||||||
|
main_tree.compare_and_swap(key, None as Option<sled::IVec>, Some(alias2.as_bytes()))
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
if res.is_ok() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Id exists, trying again");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate an alias to the file
|
||||||
|
#[instrument(skip(self, hash, content_type))]
|
||||||
|
async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<String, Error> {
|
||||||
|
use rand::distributions::{Alphanumeric, Distribution};
|
||||||
|
let mut limit: usize = 10;
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
loop {
|
||||||
|
debug!("Alias gen loop");
|
||||||
|
let s: String = Alphanumeric
|
||||||
|
.sample_iter(&mut rng)
|
||||||
|
.take(limit)
|
||||||
|
.map(char::from)
|
||||||
|
.collect();
|
||||||
|
let alias = file_name(s, content_type.clone())?;
|
||||||
|
self.alias = Some(alias.clone());
|
||||||
|
|
||||||
|
let res = self.save_alias_hash_mapping(hash, &alias).await?;
|
||||||
|
|
||||||
|
if res.is_ok() {
|
||||||
|
return Ok(alias);
|
||||||
|
}
|
||||||
|
debug!("Alias exists, regenning");
|
||||||
|
|
||||||
|
limit += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save an alias to the database
|
||||||
|
#[instrument(skip(self, hash))]
|
||||||
|
async fn save_alias_hash_mapping(
|
||||||
|
&self,
|
||||||
|
hash: &Hash,
|
||||||
|
alias: &str,
|
||||||
|
) -> Result<Result<(), Error>, Error> {
|
||||||
|
let tree = self.manager.inner.alias_tree.clone();
|
||||||
|
let vec = hash.as_slice().to_vec();
|
||||||
|
let alias = alias.to_string();
|
||||||
|
|
||||||
|
debug!("Saving alias -> hash mapping");
|
||||||
|
let res = web::block(move || {
|
||||||
|
tree.compare_and_swap(alias.as_bytes(), None as Option<sled::IVec>, Some(vec))
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
if res.is_err() {
|
||||||
|
warn!("Duplicate alias");
|
||||||
|
return Ok(Err(UploadError::DuplicateAlias.into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn file_name(name: String, content_type: mime::Mime) -> Result<String, Error> {
|
||||||
|
Ok(format!("{}{}", name, to_ext(content_type)?))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(input))]
|
||||||
|
async fn safe_save_reader(to: PathBuf, input: &mut (impl AsyncRead + Unpin)) -> Result<(), Error> {
|
||||||
|
if let Some(path) = to.parent() {
|
||||||
|
debug!("Creating directory {:?}", path);
|
||||||
|
tokio::fs::create_dir_all(path.to_owned()).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Checking if {:?} already exists", to);
|
||||||
|
if let Err(e) = tokio::fs::metadata(to.clone()).await {
|
||||||
|
if e.kind() != std::io::ErrorKind::NotFound {
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(UploadError::FileExists.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Writing stream to {:?}", to);
|
||||||
|
|
||||||
|
let mut file = crate::file::File::create(to).await?;
|
||||||
|
|
||||||
|
file.write_from_async_read(input).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
Reference in a new issue