2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-22 11:21:24 +00:00

Build out migration path

This commit is contained in:
Aode (lion) 2022-03-25 18:47:50 -05:00
parent 750ce4782e
commit 323016f994
13 changed files with 755 additions and 258 deletions

20
Cargo.lock generated
View file

@ -1064,9 +1064,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.17"
version = "0.14.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043f0e083e9901b6cc658a77d1eb86f4fc650bbb977a4337dd63192826aa85dd"
checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2"
dependencies = [
"bytes",
"futures-channel",
@ -1245,9 +1245,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.14"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
dependencies = [
"cfg-if",
]
@ -1820,9 +1820,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.2.11"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0"
dependencies = [
"bitflags",
]
@ -2317,9 +2317,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.7"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"itoa",
"libc",
@ -2330,9 +2330,9 @@ dependencies = [
[[package]]
name = "time-macros"
version = "0.2.3"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25eb0ca3468fc0acc11828786797f6ef9aa1555e4a211a60d64cc8e4d1be47d6"
checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792"
[[package]]
name = "tinyvec"

View file

@ -10,8 +10,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["object-storage"]
object-storage = ["reqwest", "rust-s3"]
default = []
io-uring = [
"actix-rt/io-uring",
"actix-server/io-uring",
@ -42,11 +41,11 @@ pin-project-lite = "0.2.7"
reqwest = { version = "0.11.5", default-features = false, features = [
"rustls-tls",
"stream",
], optional = true }
] }
rust-s3 = { version = "0.29.0", default-features = false, features = [
"fail-on-err",
"with-reqwest",
], optional = true, git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" }
], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10.0"

View file

@ -76,13 +76,6 @@ pub(crate) struct Overrides {
#[serde(skip_serializing_if = "Option::is_none")]
max_image_area: Option<usize>,
#[clap(
long,
help = "Specify the number of bytes sled is allowed to use for it's cache"
)]
#[serde(skip_serializing_if = "Option::is_none")]
sled_cache_capacity: Option<u64>,
#[clap(
long,
help = "Specify the number of events the console subscriber is allowed to buffer"
@ -106,12 +99,22 @@ pub(crate) struct Overrides {
opentelemetry_url: Option<Url>,
#[serde(skip_serializing_if = "Option::is_none")]
#[clap(
short = 'R',
long,
help = "Set the database implementation. Available options are 'sled'. Default is 'sled'"
)]
repo: Option<Repo>,
#[clap(flatten)]
sled_repo: SledRepo,
sled: Sled,
#[serde(skip_serializing_if = "Option::is_none")]
#[clap(
short = 'S',
long,
help = "Set the image store. Available options are 'object-storage' or 'filesystem'. Default is 'filesystem'"
)]
store: Option<Store>,
#[clap(flatten)]
@ -125,19 +128,20 @@ impl ObjectStorage {
pub(crate) fn required(&self) -> Result<RequiredObjectStorage, RequiredError> {
Ok(RequiredObjectStorage {
bucket_name: self
.s3_store_bucket_name
.object_store_bucket_name
.as_ref()
.cloned()
.ok_or(RequiredError)?,
.ok_or(RequiredError("object-store-bucket-name"))?,
region: self
.s3_store_region
.object_store_region
.as_ref()
.cloned()
.map(Serde::into_inner)
.ok_or(RequiredError)?,
access_key: self.s3_store_access_key.as_ref().cloned(),
security_token: self.s3_store_security_token.as_ref().cloned(),
session_token: self.s3_store_session_token.as_ref().cloned(),
.ok_or(RequiredError("object-store-region"))?,
access_key: self.object_store_access_key.as_ref().cloned(),
secret_key: self.object_store_secret_key.as_ref().cloned(),
security_token: self.object_store_security_token.as_ref().cloned(),
session_token: self.object_store_session_token.as_ref().cloned(),
})
}
}
@ -153,7 +157,6 @@ impl Overrides {
&& self.max_image_width.is_none()
&& self.max_image_height.is_none()
&& self.max_image_area.is_none()
&& self.sled_cache_capacity.is_none()
&& self.console_buffer_capacity.is_none()
&& self.api_key.is_none()
&& self.opentelemetry_url.is_none()
@ -171,37 +174,47 @@ pub(crate) enum Command {
MigrateRepo { to: Repo },
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, ArgEnum)]
pub(crate) enum CommandConfig {
Run,
MigrateStore {
to: Storage,
},
MigrateRepo {
#[allow(dead_code)]
to: Repository,
},
}
#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, ArgEnum)]
#[serde(rename_all = "snake_case")]
pub(crate) enum Repo {
Sled,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Parser)]
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, Parser)]
#[serde(rename_all = "snake_case")]
pub(crate) struct SledRepo {
pub(crate) struct Sled {
// defaults to {config.path}
#[clap(long, help = "Path in which pict-rs will create it's 'repo' directory")]
#[serde(skip_serializing_if = "Option::is_none")]
sled_repo_path: Option<PathBuf>,
pub(crate) sled_path: Option<PathBuf>,
#[clap(
long,
help = "The number of bytes sled is allowed to use for it's in-memory cache"
)]
#[serde(skip_serializing_if = "Option::is_none")]
sled_repo_cache_capacity: Option<u64>,
pub(crate) sled_cache_capacity: Option<u64>,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, ArgEnum)]
#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, ArgEnum)]
#[serde(rename_all = "snake_case")]
pub(crate) enum Store {
Filesystem,
#[cfg(feature = "object-storage")]
ObjectStorage,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Parser)]
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, Parser)]
#[serde(rename_all = "snake_case")]
pub(crate) struct FilesystemStorage {
// defaults to {config.path}
@ -210,51 +223,71 @@ pub(crate) struct FilesystemStorage {
help = "Path in which pict-rs will create it's 'files' directory"
)]
#[serde(skip_serializing_if = "Option::is_none")]
filesystem_storage_path: Option<PathBuf>,
pub(crate) filesystem_storage_path: Option<PathBuf>,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Parser)]
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, Parser)]
#[serde(rename_all = "snake_case")]
pub(crate) struct ObjectStorage {
#[serde(skip_serializing_if = "Option::is_none")]
#[clap(long, help = "Name of the bucket in which pict-rs will store images")]
s3_store_bucket_name: Option<String>,
object_store_bucket_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[clap(
long,
help = "Region in which the bucket exists, can be an http endpoint"
)]
s3_store_region: Option<Serde<s3::Region>>,
object_store_region: Option<Serde<s3::Region>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[clap(long)]
s3_store_access_key: Option<String>,
object_store_access_key: Option<String>,
#[clap(long)]
#[serde(skip_serializing_if = "Option::is_none")]
s3_store_secret_key: Option<String>,
object_store_secret_key: Option<String>,
#[clap(long)]
#[serde(skip_serializing_if = "Option::is_none")]
s3_store_security_token: Option<String>,
object_store_security_token: Option<String>,
#[clap(long)]
#[serde(skip_serializing_if = "Option::is_none")]
s3_store_session_token: Option<String>,
object_store_session_token: Option<String>,
}
pub(crate) struct RequiredSledRepo {
pub(crate) path: PathBuf,
pub(crate) cache_capacity: u64,
}
pub(crate) struct RequiredObjectStorage {
pub(crate) bucket_name: String,
pub(crate) region: s3::Region,
pub(crate) access_key: Option<String>,
pub(crate) secret_key: Option<String>,
pub(crate) security_token: Option<String>,
pub(crate) session_token: Option<String>,
}
pub(crate) struct RequiredFilesystemStorage {
pub(crate) path: PathBuf,
}
pub(crate) enum Storage {
ObjectStorage(RequiredObjectStorage),
Filesystem(RequiredFilesystemStorage),
}
pub(crate) enum Repository {
Sled(RequiredSledRepo),
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct Config {
command: Command,
skip_validate_imports: bool,
addr: SocketAddr,
path: PathBuf,
@ -264,59 +297,52 @@ pub(crate) struct Config {
max_image_width: usize,
max_image_height: usize,
max_image_area: usize,
sled_cache_capacity: u64,
console_buffer_capacity: Option<usize>,
api_key: Option<String>,
opentelemetry_url: Option<Url>,
repo: Repo,
sled_repo: SledRepo,
sled: Option<Sled>,
store: Store,
filesystem_storage: FilesystemStorage,
object_storage: ObjectStorage,
filesystem_storage: Option<FilesystemStorage>,
object_storage: Option<ObjectStorage>,
}
#[derive(serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct Defaults {
command: Command,
skip_validate_imports: bool,
addr: SocketAddr,
max_file_size: usize,
max_image_width: usize,
max_image_height: usize,
max_image_area: usize,
sled_cache_capacity: u64,
repo: Repo,
sled_repo: SledRepoDefaults,
sled: SledDefaults,
store: Store,
filesystem_store: FilesystemDefaults,
}
#[derive(serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct SledRepoDefaults {
sled_repo_cache_capacity: usize,
struct SledDefaults {
sled_cache_capacity: usize,
}
#[derive(serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct FilesystemDefaults {}
impl Defaults {
fn new() -> Self {
Defaults {
command: Command::Run,
skip_validate_imports: false,
addr: ([0, 0, 0, 0], 8080).into(),
max_file_size: 40,
max_image_width: 10_000,
max_image_height: 10_000,
max_image_area: 40_000_000,
sled_cache_capacity: 1024 * 1024 * 64, // 16 times smaller than sled's default of 1GB
repo: Repo::Sled,
sled_repo: SledRepoDefaults {
sled_repo_cache_capacity: 1024 * 1024 * 64,
sled: SledDefaults {
sled_cache_capacity: 1024 * 1024 * 64,
},
store: Store::Filesystem,
filesystem_store: FilesystemDefaults {},
}
}
}
@ -332,8 +358,6 @@ impl Config {
base_config = base_config.add_source(config::File::from(path));
};
// TODO: Command parsing
if !args.overrides.is_default() {
let merging = config::Config::try_from(&args.overrides)?;
@ -348,20 +372,88 @@ impl Config {
Ok(config)
}
pub(crate) fn store(&self) -> &Store {
&self.store
pub(crate) fn command(&self) -> anyhow::Result<CommandConfig> {
Ok(match &self.command {
Command::Run => CommandConfig::Run,
Command::MigrateStore { to } => CommandConfig::MigrateStore {
to: match to {
Store::ObjectStorage => Storage::ObjectStorage(
self.object_storage
.as_ref()
.cloned()
.unwrap_or_default()
.required()?,
),
Store::Filesystem => Storage::Filesystem(RequiredFilesystemStorage {
path: self
.filesystem_storage
.as_ref()
.and_then(|f| f.filesystem_storage_path.clone())
.unwrap_or_else(|| {
let mut path = self.path.clone();
path.push("files");
path
}),
}),
},
},
Command::MigrateRepo { to } => CommandConfig::MigrateRepo {
to: match to {
Repo::Sled => {
let sled = self.sled.as_ref().cloned().unwrap_or_default();
Repository::Sled(RequiredSledRepo {
path: sled.sled_path.unwrap_or_else(|| {
let mut path = self.path.clone();
path.push("sled-repo");
path
}),
cache_capacity: sled.sled_cache_capacity.unwrap_or(1024 * 1024 * 64),
})
}
},
},
})
}
pub(crate) fn repo(&self) -> &Repo {
&self.repo
pub(crate) fn store(&self) -> anyhow::Result<Storage> {
Ok(match self.store {
Store::Filesystem => Storage::Filesystem(RequiredFilesystemStorage {
path: self
.filesystem_storage
.as_ref()
.and_then(|f| f.filesystem_storage_path.clone())
.unwrap_or_else(|| {
let mut path = self.path.clone();
path.push("files");
path
}),
}),
Store::ObjectStorage => Storage::ObjectStorage(
self.object_storage
.as_ref()
.cloned()
.unwrap_or_default()
.required()?,
),
})
}
pub(crate) fn object_storage(&self) -> Result<RequiredObjectStorage, RequiredError> {
self.object_storage.required()
}
pub(crate) fn repo(&self) -> Repository {
match self.repo {
Repo::Sled => {
let sled = self.sled.as_ref().cloned().unwrap_or_default();
pub(crate) fn filesystem_storage_path(&self) -> Option<&PathBuf> {
self.filesystem_storage.filesystem_storage_path.as_ref()
Repository::Sled(RequiredSledRepo {
path: sled.sled_path.unwrap_or_else(|| {
let mut path = self.path.clone();
path.push("sled-repo");
path
}),
cache_capacity: sled.sled_cache_capacity.unwrap_or(1024 * 1024 * 64),
})
}
}
}
pub(crate) fn bind_address(&self) -> SocketAddr {
@ -372,10 +464,6 @@ impl Config {
self.path.clone()
}
pub(crate) fn sled_cache_capacity(&self) -> u64 {
self.sled_cache_capacity
}
pub(crate) fn console_buffer_capacity(&self) -> Option<usize> {
self.console_buffer_capacity
}
@ -430,10 +518,10 @@ pub(crate) struct StoreError(String);
pub(crate) struct RepoError(String);
#[derive(Debug, thiserror::Error)]
#[error("Missing required fields")]
pub(crate) struct RequiredError;
#[error("Missing required {0} field")]
pub(crate) struct RequiredError(&'static str);
#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize, ArgEnum)]
#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, ArgEnum)]
#[serde(rename_all = "snake_case")]
pub(crate) enum Format {
Jpeg,

View file

@ -69,7 +69,6 @@ pub(crate) enum UploadError {
#[error(transparent)]
FileStore(#[from] crate::store::file_store::FileError),
#[cfg(feature = "object-storage")]
#[error(transparent)]
ObjectStore(#[from] crate::store::object_store::ObjectError),

View file

@ -9,7 +9,7 @@ use futures_util::{
stream::{empty, once},
Stream,
};
use once_cell::sync::{Lazy, OnceCell};
use once_cell::sync::Lazy;
use std::{
collections::HashSet,
future::ready,
@ -47,18 +47,17 @@ mod tmp_file;
mod upload_manager;
mod validate;
use crate::{magick::details_hint, store::file_store::FileStore};
use self::{
concurrent_processor::CancelSafeProcessor,
config::{Config, Format, Migrate},
config::{CommandConfig, Config, Format, RequiredFilesystemStorage, RequiredObjectStorage},
details::Details,
either::Either,
error::{Error, UploadError},
init_tracing::init_tracing,
magick::details_hint,
middleware::{Deadline, Internal},
migrate::LatestDb,
store::Store,
store::{file_store::FileStore, object_store::ObjectStore, Store},
upload_manager::{UploadManager, UploadManagerSession},
};
@ -67,7 +66,6 @@ const MINUTES: u32 = 60;
const HOURS: u32 = 60 * MINUTES;
const DAYS: u32 = 24 * HOURS;
static MIGRATE: OnceCell<Migrate> = OnceCell::new();
static CONFIG: Lazy<Config> = Lazy::new(|| Config::build().unwrap());
static PROCESS_SEMAPHORE: Lazy<Semaphore> =
Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)));
@ -694,7 +692,6 @@ fn build_client() -> awc::Client {
.finish()
}
#[cfg(feature = "object-storage")]
fn build_reqwest_client() -> reqwest::Result<reqwest::Client> {
reqwest::Client::builder()
.user_agent("pict-rs v0.3.0-main")
@ -839,35 +836,30 @@ async fn migrate_inner<S1>(
manager: &UploadManager,
db: &sled::Db,
from: S1,
to: &config::Store,
to: &config::Storage,
) -> anyhow::Result<()>
where
S1: Store,
Error: From<S1::Error>,
{
match to {
config::Store::FileStore { path } => {
let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir());
let to = FileStore::build(path, db)?;
config::Storage::Filesystem(RequiredFilesystemStorage { path }) => {
let to = FileStore::build(path.clone(), db)?;
manager.restructure(&to).await?;
manager.migrate_store::<S1, FileStore>(from, to).await?;
}
#[cfg(feature = "object-storage")]
config::Store::S3Store {
config::Storage::ObjectStorage(RequiredObjectStorage {
bucket_name,
region,
access_key,
secret_key,
security_token,
session_token,
} => {
use store::object_store::ObjectStore;
}) => {
let to = ObjectStore::build(
bucket_name,
(**region).clone(),
region.clone(),
access_key.clone(),
secret_key.clone(),
security_token.clone(),
@ -891,75 +883,78 @@ async fn main() -> anyhow::Result<()> {
CONFIG.console_buffer_capacity(),
)?;
let db = LatestDb::exists(CONFIG.data_dir(), CONFIG.sled_cache_capacity()).migrate()?;
let db = LatestDb::exists(CONFIG.data_dir()).migrate()?;
let repo = self::repo::Repo::open(CONFIG.repo())?;
repo.from_db(db).await?;
let manager = UploadManager::new(db.clone(), CONFIG.format()).await?;
if let Some(m) = MIGRATE.get() {
let from = m.from();
let to = m.to();
match CONFIG.command()? {
CommandConfig::Run => (),
CommandConfig::MigrateRepo { to: _ } => {
unimplemented!("Repo migrations are currently unsupported")
}
CommandConfig::MigrateStore { to } => {
let from = CONFIG.store()?;
match from {
config::Store::FileStore { path } => {
let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir());
let from = FileStore::build(path, &db)?;
config::Storage::Filesystem(RequiredFilesystemStorage { path }) => {
let from = FileStore::build(path.clone(), &db)?;
manager.restructure(&from).await?;
migrate_inner(&manager, &db, from, to).await?;
migrate_inner(&manager, &db, from, &to).await?;
}
#[cfg(feature = "object-storage")]
config::Store::S3Store {
config::Storage::ObjectStorage(RequiredObjectStorage {
bucket_name,
region,
access_key,
secret_key,
security_token,
session_token,
} => {
let from = crate::store::object_store::ObjectStore::build(
bucket_name,
(**region).clone(),
access_key.clone(),
secret_key.clone(),
security_token.clone(),
session_token.clone(),
}) => {
let from = ObjectStore::build(
&bucket_name,
region,
access_key,
secret_key,
security_token,
session_token,
&db,
build_reqwest_client()?,
)?;
migrate_inner(&manager, &db, from, to).await?;
migrate_inner(&manager, &db, from, &to).await?;
}
}
return Ok(());
}
}
match CONFIG.store() {
config::Store::FileStore { path } => {
let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir());
let store = FileStore::build(path.clone(), &db)?;
match CONFIG.store()? {
config::Storage::Filesystem(RequiredFilesystemStorage { path }) => {
let store = FileStore::build(path, &db)?;
manager.restructure(&store).await?;
launch(manager, store).await
}
#[cfg(feature = "object-storage")]
config::Store::S3Store {
config::Storage::ObjectStorage(RequiredObjectStorage {
bucket_name,
region,
access_key,
secret_key,
security_token,
session_token,
} => {
let store = crate::store::object_store::ObjectStore::build(
bucket_name,
(**region).clone(),
access_key.clone(),
secret_key.clone(),
security_token.clone(),
session_token.clone(),
}) => {
let store = ObjectStore::build(
&bucket_name,
region,
access_key,
secret_key,
security_token,
session_token,
&db,
build_reqwest_client()?,
)?;

View file

@ -51,30 +51,21 @@ trait SledTree {
pub(crate) struct LatestDb {
root_dir: PathBuf,
version: DbVersion,
cache_capacity: u64,
}
impl LatestDb {
pub(crate) fn exists(root_dir: PathBuf, cache_capacity: u64) -> Self {
let version = DbVersion::exists(root_dir.clone(), cache_capacity);
pub(crate) fn exists(root_dir: PathBuf) -> Self {
let version = DbVersion::exists(root_dir.clone());
LatestDb {
root_dir,
version,
cache_capacity,
}
LatestDb { root_dir, version }
}
pub(crate) fn migrate(self) -> Result<sled::Db, UploadError> {
let LatestDb {
root_dir,
version,
cache_capacity,
} = self;
let LatestDb { root_dir, version } = self;
loop {
let root_dir2 = root_dir.clone();
let res = std::panic::catch_unwind(move || version.migrate(root_dir2, cache_capacity));
let res = std::panic::catch_unwind(move || version.migrate(root_dir2));
if let Ok(res) = res {
return res;
@ -90,17 +81,17 @@ enum DbVersion {
}
impl DbVersion {
fn exists(root: PathBuf, cache_capacity: u64) -> Self {
if s034::exists(root.clone()) && !s034::migrating(root, cache_capacity) {
fn exists(root: PathBuf) -> Self {
if s034::exists(root.clone()) && !s034::migrating(root) {
return DbVersion::Sled034;
}
DbVersion::Fresh
}
fn migrate(self, root: PathBuf, cache_capacity: u64) -> Result<sled::Db, UploadError> {
fn migrate(self, root: PathBuf) -> Result<sled::Db, UploadError> {
match self {
DbVersion::Sled034 | DbVersion::Fresh => s034::open(root, cache_capacity),
DbVersion::Sled034 | DbVersion::Fresh => s034::open(root),
}
}
}

View file

View file

@ -14,8 +14,8 @@ pub(crate) fn exists(mut base: PathBuf) -> bool {
std::fs::metadata(base).is_ok()
}
pub(crate) fn migrating(base: PathBuf, cache_capacity: u64) -> bool {
if let Ok(db) = open(base, cache_capacity) {
pub(crate) fn migrating(base: PathBuf) -> bool {
if let Ok(db) = open(base) {
if let Ok(tree) = db.open_tree("migrate") {
if let Ok(Some(_)) = tree.get("done") {
return false;
@ -26,12 +26,12 @@ pub(crate) fn migrating(base: PathBuf, cache_capacity: u64) -> bool {
true
}
pub(crate) fn open(mut base: PathBuf, cache_capacity: u64) -> Result<sled034::Db, UploadError> {
pub(crate) fn open(mut base: PathBuf) -> Result<sled034::Db, UploadError> {
base.push("sled");
base.push(SLED_034);
let db = sled034::Config::default()
.cache_capacity(cache_capacity)
.cache_capacity(1024 * 1024 * 64)
.path(base)
.open()?;

View file

@ -1,9 +1,17 @@
use crate::{details::Details, store::Identifier};
use crate::config::RequiredSledRepo;
use crate::{config::Repository, details::Details, store::Identifier};
use futures_util::Stream;
use uuid::Uuid;
mod old;
pub(crate) mod sled;
#[derive(Debug)]
pub(crate) enum Repo {
Sled(self::sled::SledRepo),
}
#[derive(Clone, Debug)]
pub(crate) struct Alias {
id: Uuid,
extension: String,
@ -15,6 +23,201 @@ pub(crate) struct DeleteToken {
pub(crate) struct AlreadyExists;
#[async_trait::async_trait]
pub(crate) trait SettingsRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>;
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Self::Error>;
async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>;
}
#[async_trait::async_trait]
pub(crate) trait IdentifierRepo<I: Identifier> {
type Error: std::error::Error;
async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error>;
async fn details(&self, identifier: I) -> Result<Option<Details>, Self::Error>;
async fn cleanup(&self, identifier: I) -> Result<(), Self::Error>;
}
#[async_trait::async_trait]
pub(crate) trait HashRepo<I: Identifier> {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
type Stream: Stream<Item = Result<Self::Bytes, Self::Error>>;
async fn hashes(&self) -> Self::Stream;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>;
async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>;
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Self::Error>;
async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error>;
async fn identifier(&self, hash: Self::Bytes) -> Result<I, Self::Error>;
async fn relate_variant_identifier(
&self,
hash: Self::Bytes,
variant: String,
identifier: I,
) -> Result<(), Self::Error>;
async fn variant_identifier(
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Self::Error>;
async fn relate_motion_identifier(
&self,
hash: Self::Bytes,
identifier: I,
) -> Result<(), Self::Error>;
async fn motion_identifier(&self, hash: Self::Bytes) -> Result<Option<I>, Self::Error>;
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>;
}
#[async_trait::async_trait]
pub(crate) trait AliasRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
async fn create(&self, alias: Alias) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn relate_delete_token(
&self,
alias: Alias,
delete_token: DeleteToken,
) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn delete_token(&self, alias: Alias) -> Result<DeleteToken, Self::Error>;
async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error>;
async fn hash(&self, alias: Alias) -> Result<Self::Bytes, Self::Error>;
async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error>;
}
impl Repo {
pub(crate) fn open(config: Repository) -> anyhow::Result<Self> {
match config {
Repository::Sled(RequiredSledRepo {
mut path,
cache_capacity,
}) => {
path.push("v0.4.0-alpha.1");
let db = ::sled::Config::new()
.cache_capacity(cache_capacity)
.path(path)
.open()?;
Ok(Self::Sled(self::sled::SledRepo::new(db)?))
}
}
}
#[tracing::instrument(skip_all)]
pub(crate) async fn from_db(&self, db: ::sled::Db) -> anyhow::Result<()> {
if self.has_migrated().await? {
return Ok(());
}
let old = self::old::Old::open(db)?;
for hash in old.hashes() {
match self {
Self::Sled(repo) => {
if let Err(e) = migrate_hash(repo, &old, hash).await {
tracing::error!("Failed to migrate hash: {}", e);
}
}
}
}
self.mark_migrated().await?;
Ok(())
}
async fn has_migrated(&self) -> anyhow::Result<bool> {
match self {
Self::Sled(repo) => Ok(repo.get(REPO_MIGRATION_O1).await?.is_some()),
}
}
async fn mark_migrated(&self) -> anyhow::Result<()> {
match self {
Self::Sled(repo) => {
repo.set(REPO_MIGRATION_O1, b"1".to_vec().into()).await?;
}
}
Ok(())
}
}
const REPO_MIGRATION_O1: &[u8] = b"repo-migration-01";
const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress";
const GENERATOR_KEY: &[u8] = b"last-path";
async fn migrate_hash<T>(repo: &T, old: &old::Old, hash: ::sled::IVec) -> anyhow::Result<()>
where
T: IdentifierRepo<::sled::IVec>,
<T as IdentifierRepo<::sled::IVec>>::Error: Send + Sync + 'static,
T: HashRepo<::sled::IVec>,
<T as HashRepo<::sled::IVec>>::Error: Send + Sync + 'static,
T: AliasRepo,
<T as AliasRepo>::Error: Send + Sync + 'static,
T: SettingsRepo,
<T as SettingsRepo>::Error: Send + Sync + 'static,
{
HashRepo::create(repo, hash.to_vec().into()).await?;
let main_ident = old.main_identifier(&hash)?;
HashRepo::relate_identifier(repo, hash.to_vec().into(), main_ident.clone()).await?;
for alias in old.aliases(&hash) {
if let Ok(Ok(())) = AliasRepo::create(repo, alias.clone()).await {
let _ = HashRepo::relate_alias(repo, hash.to_vec().into(), alias.clone()).await;
let _ = AliasRepo::relate_hash(repo, alias.clone(), hash.to_vec().into()).await;
if let Ok(Some(delete_token)) = old.delete_token(&alias) {
let _ = AliasRepo::relate_delete_token(repo, alias, delete_token).await;
}
}
}
if let Ok(Some(identifier)) = old.motion_identifier(&hash) {
HashRepo::relate_motion_identifier(repo, hash.to_vec().into(), identifier).await;
}
for (variant, identifier) in old.variants(&hash)? {
let _ =
HashRepo::relate_variant_identifier(repo, hash.to_vec().into(), variant, identifier)
.await;
}
for (identifier, details) in old.details(&hash)? {
let _ = IdentifierRepo::relate_details(repo, identifier, details).await;
}
if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS) {
SettingsRepo::set(repo, STORE_MIGRATION_PROGRESS, value.to_vec().into()).await?;
}
if let Ok(Some(value)) = old.setting(GENERATOR_KEY) {
SettingsRepo::set(repo, GENERATOR_KEY, value.to_vec().into()).await?;
}
Ok(())
}
impl Alias {
fn to_bytes(&self) -> Vec<u8> {
let mut v = self.id.as_bytes().to_vec();
@ -48,66 +251,38 @@ impl DeleteToken {
}
}
#[async_trait::async_trait]
pub(crate) trait SettingsRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>;
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Self::Error>;
async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>;
impl std::fmt::Display for Alias {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}", self.id, self.extension)
}
}
#[async_trait::async_trait]
pub(crate) trait IdentifierRepo<I: Identifier> {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
impl Identifier for Vec<u8> {
type Error = std::convert::Infallible;
async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error>;
async fn details(&self, identifier: I) -> Result<Option<Details>, Self::Error>;
async fn relate_hash(&self, identifier: I, hash: Self::Bytes) -> Result<(), Self::Error>;
async fn hash(&self, identifier: I) -> Result<Self::Bytes, Self::Error>;
async fn cleanup(&self, identifier: I) -> Result<(), Self::Error>;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(bytes)
}
#[async_trait::async_trait]
pub(crate) trait HashRepo<I: Identifier> {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
type Stream: Stream<Item = Result<Self::Bytes, Self::Error>>;
async fn hashes(&self) -> Self::Stream;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>;
async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>;
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Self::Error>;
async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error>;
async fn identifier(&self, hash: Self::Bytes) -> Result<I, Self::Error>;
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>;
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error> {
Ok(self.clone())
}
}
#[async_trait::async_trait]
pub(crate) trait AliasRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
type Error: std::error::Error;
impl Identifier for ::sled::IVec {
type Error = std::convert::Infallible;
async fn create(&self, alias: Alias) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn relate_delete_token(
&self,
alias: Alias,
delete_token: DeleteToken,
) -> Result<Result<(), AlreadyExists>, Self::Error>;
async fn delete_token(&self, alias: Alias) -> Result<DeleteToken, Self::Error>;
async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error>;
async fn hash(&self, alias: Alias) -> Result<Self::Bytes, Self::Error>;
async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error>;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(bytes.into())
}
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error> {
Ok(self.to_vec())
}
}

171
src/repo/old.rs Normal file
View file

@ -0,0 +1,171 @@
// TREE STRUCTURE
// - Alias Tree
// - alias -> hash
// - alias / id -> u64(id)
// - alias / delete -> delete token
// - Main Tree
// - hash -> filename
// - hash 0 u64(id) -> alias
// - Filename Tree
// - filename -> hash
// - Details Tree
// - filename / S::Identifier -> details
// - Identifier Tree
// - filename -> S::Identifier
// - filename / variant path -> S::Identifier
// - filename / motion -> S::Identifier
// - Settings Tree
// - store-migration-progress -> Path Tree Key
use super::{Alias, DeleteToken, Details};
use uuid::Uuid;
pub(super) struct Old {
alias_tree: ::sled::Tree,
filename_tree: ::sled::Tree,
main_tree: ::sled::Tree,
details_tree: ::sled::Tree,
settings_tree: ::sled::Tree,
identifier_tree: ::sled::Tree,
_db: ::sled::Db,
}
impl Old {
pub(super) fn open(db: sled::Db) -> anyhow::Result<Self> {
Ok(Self {
alias_tree: db.open_tree("alias")?,
filename_tree: db.open_tree("filename")?,
main_tree: db.open_tree("main")?,
details_tree: db.open_tree("details")?,
settings_tree: db.open_tree("settings")?,
identifier_tree: db.open_tree("path")?,
_db: db,
})
}
pub(super) fn setting(&self, key: &[u8]) -> anyhow::Result<Option<sled::IVec>> {
Ok(self.settings_tree.get(key)?)
}
pub(super) fn hashes(&self) -> impl std::iter::Iterator<Item = sled::IVec> {
self.filename_tree
.iter()
.values()
.filter_map(|res| res.ok())
}
pub(super) fn details(&self, hash: &sled::IVec) -> anyhow::Result<Vec<(sled::IVec, Details)>> {
let filename = self
.main_tree
.get(hash)?
.ok_or_else(|| anyhow::anyhow!("missing filename"))?;
let filename = String::from_utf8_lossy(&filename);
Ok(self
.identifier_tree
.scan_prefix(filename.as_bytes())
.values()
.filter_map(Result::ok)
.filter_map(|identifier| {
let mut key = filename.as_bytes().to_vec();
key.push(b'/');
key.extend_from_slice(&identifier);
let details = self.details_tree.get(key).ok()??;
let details = serde_json::from_slice(&details).ok()?;
Some((identifier, details))
})
.collect())
}
pub(super) fn main_identifier(&self, hash: &sled::IVec) -> anyhow::Result<sled::IVec> {
let filename = self
.main_tree
.get(hash)?
.ok_or_else(|| anyhow::anyhow!("Missing filename"))?;
self.filename_tree
.get(filename)?
.ok_or_else(|| anyhow::anyhow!("Missing identifier"))
}
pub(super) fn variants(&self, hash: &sled::IVec) -> anyhow::Result<Vec<(String, sled::IVec)>> {
let filename = self
.main_tree
.get(hash)?
.ok_or_else(|| anyhow::anyhow!("Missing filename"))?;
let filename_string = String::from_utf8_lossy(&filename);
let variant_prefix = format!("{}/", filename_string);
Ok(self
.identifier_tree
.scan_prefix(&variant_prefix)
.filter_map(|res| res.ok())
.filter_map(|(key, value)| {
let key_str = String::from_utf8_lossy(&key);
let variant_path = key_str.trim_start_matches(&variant_prefix);
if variant_path == "motion" {
return None;
}
Some((variant_path.to_string(), value))
})
.collect())
}
pub(super) fn motion_identifier(
&self,
hash: &sled::IVec,
) -> anyhow::Result<Option<sled::IVec>> {
let filename = self
.main_tree
.get(hash)?
.ok_or_else(|| anyhow::anyhow!("Missing filename"))?;
let filename_string = String::from_utf8_lossy(&filename);
let motion_key = format!("{}/motion", filename_string);
Ok(self.filename_tree.get(motion_key)?)
}
pub(super) fn aliases(&self, hash: &sled::IVec) -> Vec<Alias> {
let mut key = hash.to_vec();
key.push(0);
self.main_tree
.scan_prefix(key)
.values()
.filter_map(|res| res.ok())
.filter_map(|alias| {
let alias_str = String::from_utf8_lossy(&alias);
let (uuid, ext) = alias_str.split_once('.')?;
let uuid = uuid.parse::<Uuid>().ok()?;
Some(Alias {
id: uuid,
extension: ext.to_string(),
})
})
.collect()
}
pub(super) fn delete_token(&self, alias: &Alias) -> anyhow::Result<Option<DeleteToken>> {
let key = format!("{}{}/delete", alias.id, alias.extension);
if let Some(ivec) = self.alias_tree.get(key)? {
let token_str = String::from_utf8_lossy(&ivec);
if let Ok(uuid) = token_str.parse::<Uuid>() {
return Ok(Some(DeleteToken { id: uuid }));
}
}
Ok(None)
}
}

View file

@ -18,7 +18,7 @@ pub(crate) enum Error {
Sled(#[from] sled::Error),
#[error("Invalid identifier")]
Identifier(#[source] Box<dyn std::error::Error + Send>),
Identifier(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("Invalid details json")]
Details(#[from] serde_json::Error),
@ -32,11 +32,12 @@ pub(crate) enum Error {
pub(crate) struct SledRepo {
settings: Tree,
identifier_hashes: Tree,
identifier_details: Tree,
hashes: Tree,
hash_aliases: Tree,
hash_identifiers: Tree,
hash_variant_identifiers: Tree,
hash_motion_identifiers: Tree,
aliases: Tree,
alias_hashes: Tree,
alias_delete_tokens: Tree,
@ -47,11 +48,12 @@ impl SledRepo {
pub(crate) fn new(db: Db) -> Result<Self, Error> {
Ok(SledRepo {
settings: db.open_tree("pict-rs-settings-tree")?,
identifier_hashes: db.open_tree("pict-rs-identifier-hashes-tree")?,
identifier_details: db.open_tree("pict-rs-identifier-details-tree")?,
hashes: db.open_tree("pict-rs-hashes-tree")?,
hash_aliases: db.open_tree("pict-rs-hash-aliases-tree")?,
hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?,
hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?,
hash_motion_identifiers: db.open_tree("pict-rs-hash-motion-identifiers-tree")?,
aliases: db.open_tree("pict-rs-aliases-tree")?,
alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?,
alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?,
@ -87,20 +89,26 @@ impl SettingsRepo for SledRepo {
fn identifier_bytes<I>(identifier: &I) -> Result<Vec<u8>, Error>
where
I: Identifier,
I::Error: Send + 'static,
I::Error: Send + Sync + 'static,
{
identifier
.to_bytes()
.map_err(|e| Error::Identifier(Box::new(e)))
}
fn variant_key(hash: &[u8], variant: &str) -> Result<Vec<u8>, Error> {
let mut bytes = hash.to_vec();
bytes.push(b'/');
bytes.extend_from_slice(variant.as_bytes());
Ok(bytes)
}
#[async_trait::async_trait]
impl<I> IdentifierRepo<I> for SledRepo
where
I: Identifier + 'static,
I::Error: Send + 'static,
I::Error: Send + Sync + 'static,
{
type Bytes = IVec;
type Error = Error;
async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error> {
@ -128,27 +136,9 @@ where
}
}
async fn relate_hash(&self, identifier: I, hash: Self::Bytes) -> Result<(), Self::Error> {
let key = identifier_bytes(&identifier)?;
b!(self.identifier_hashes, identifier_hashes.insert(key, hash));
Ok(())
}
async fn hash(&self, identifier: I) -> Result<Self::Bytes, Self::Error> {
let key = identifier_bytes(&identifier)?;
let opt = b!(self.identifier_hashes, identifier_hashes.get(key));
opt.ok_or(Error::Missing)
}
async fn cleanup(&self, identifier: I) -> Result<(), Self::Error> {
let key = identifier_bytes(&identifier)?;
let key2 = key.clone();
b!(self.identifier_hashes, identifier_hashes.remove(key2));
b!(self.identifier_details, identifier_details.remove(key));
Ok(())
@ -218,7 +208,7 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec<u8> {
impl<I> HashRepo<I> for SledRepo
where
I: Identifier + 'static,
I::Error: Send + 'static,
I::Error: Send + Sync + 'static,
{
type Bytes = IVec;
type Error = Error;
@ -290,6 +280,74 @@ where
})
}
async fn relate_variant_identifier(
&self,
hash: Self::Bytes,
variant: String,
identifier: I,
) -> Result<(), Self::Error> {
let key = variant_key(&hash, &variant)?;
let value = identifier_bytes(&identifier)?;
b!(
self.hash_variant_identifiers,
hash_variant_identifiers.insert(key, value)
);
Ok(())
}
async fn variant_identifier(
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Self::Error> {
let key = variant_key(&hash, &variant)?;
let opt = b!(
self.hash_variant_identifiers,
hash_variant_identifiers.get(key)
);
if let Some(ivec) = opt {
Ok(Some(
I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))?,
))
} else {
Ok(None)
}
}
async fn relate_motion_identifier(
&self,
hash: Self::Bytes,
identifier: I,
) -> Result<(), Self::Error> {
let bytes = identifier_bytes(&identifier)?;
b!(
self.hash_motion_identifiers,
hash_motion_identifiers.insert(hash, bytes)
);
Ok(())
}
async fn motion_identifier(&self, hash: Self::Bytes) -> Result<Option<I>, Self::Error> {
let opt = b!(
self.hash_motion_identifiers,
hash_motion_identifiers.get(hash)
);
if let Some(ivec) = opt {
Ok(Some(
I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))?,
))
} else {
Ok(None)
}
}
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error> {
let hash2 = hash.clone();
b!(self.hashes, hashes.remove(hash2));
@ -297,17 +355,39 @@ where
let hash2 = hash.clone();
b!(self.hash_identifiers, hash_identifiers.remove(hash2));
let aliases = HashRepo::<I>::aliases(self, hash.clone()).await?;
let hash2 = hash.clone();
b!(
self.hash_motion_identifiers,
hash_motion_identifiers.remove(hash2)
);
let aliases = HashRepo::<I>::aliases(self, hash.clone()).await?;
let hash2 = hash.clone();
b!(self.hash_aliases, {
for alias in aliases {
let key = hash_alias_key(&hash, &alias);
let key = hash_alias_key(&hash2, &alias);
let _ = hash_aliases.remove(key);
}
Ok(()) as Result<(), Error>
});
let variant_keys = b!(self.hash_variant_identifiers, {
let v = hash_variant_identifiers
.scan_prefix(hash)
.keys()
.filter_map(Result::ok)
.collect::<Vec<_>>();
Ok(v) as Result<Vec<_>, Error>
});
b!(self.hash_variant_identifiers, {
for key in variant_keys {
let _ = hash_variant_identifiers.remove(key);
}
Ok(()) as Result<(), Error>
});
Ok(())
}
}

View file

@ -5,7 +5,6 @@ use futures_util::stream::Stream;
use tokio::io::{AsyncRead, AsyncWrite};
pub(crate) mod file_store;
#[cfg(feature = "object-storage")]
pub(crate) mod object_store;
pub(crate) trait Identifier: Send + Sync + Clone + Debug {

View file

@ -159,7 +159,7 @@ impl FileStore {
self.settings_tree
.insert(GENERATOR_KEY, path.to_be_bytes())?;
let mut target_path = self.root_dir.join("files");
let mut target_path = self.root_dir.clone();
for dir in path.to_strings() {
target_path.push(dir)
}