From 577d358da1db611a966921846f25050baed85d38 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 16 Oct 2020 15:27:54 -0500 Subject: [PATCH] Enhance migrations to be more resilient to panics --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/migrate/mod.rs | 175 +++++++++++++++++++++++++++++++++--------- src/migrate/s032.rs | 4 + src/migrate/s034.rs | 16 ++++ src/upload_manager.rs | 27 ++++--- 6 files changed, 175 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86aaff1..9f5ec67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1446,7 +1446,7 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pict-rs" -version = "0.2.2" +version = "0.2.3" dependencies = [ "actix-form-data", "actix-fs", diff --git a/Cargo.toml b/Cargo.toml index 86ab6cf..37fcf73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pict-rs" description = "A simple image hosting service" -version = "0.2.2" +version = "0.2.3" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" diff --git a/src/migrate/mod.rs b/src/migrate/mod.rs index ae9d74d..866f5f3 100644 --- a/src/migrate/mod.rs +++ b/src/migrate/mod.rs @@ -47,6 +47,8 @@ trait SledTree { where K: AsRef<[u8]>, R: std::ops::RangeBounds; + + fn flush(&self) -> Result<(), UploadError>; } pub(crate) struct LatestDb { @@ -64,10 +66,20 @@ impl LatestDb { pub(crate) fn migrate(self) -> Result { let LatestDb { root_dir, version } = self; - version.migrate(root_dir) + loop { + let root_dir2 = root_dir.clone(); + let res = std::panic::catch_unwind(move || { + version.migrate(root_dir2) + }); + + if let Ok(res) = res { + return res; + } + } } } +#[derive(Clone, Copy)] enum DbVersion { Sled0320Rc1, Sled032, @@ -77,7 +89,7 @@ enum DbVersion { impl DbVersion { fn exists(root: PathBuf) -> Self { - if s034::exists(root.clone()) { + if s034::exists(root.clone()) && !s034::migrating(root.clone()) { return DbVersion::Sled034; } @@ -127,20 +139,48 @@ where let old_alias_tree = old_db.open_tree("alias")?; let new_alias_tree = new_db.open_tree("alias")?; - let old_fname_tree = old_db.open_tree("filename")?; - let new_fname_tree = new_db.open_tree("filename")?; + let new_migrate_tree = new_db.open_tree("migrate")?; + if let Some(_) = new_migrate_tree.get("done")? { + return Ok(new_db); + } - for res in old_alias_tree.iter() { + let (iterator, mut counter) = if let Some(last_migrated) = new_migrate_tree.get("last_migrated")? { + let mut last_migrated = String::from_utf8_lossy(&last_migrated).to_string(); + info!("Previous migration failed after {}, attempting to skip", last_migrated); + if let Some(index) = last_migrated.find('.') { + last_migrated = last_migrated.split_at(index).0.to_owned(); + } + if last_migrated.len() > 3 { + last_migrated = last_migrated.split_at(3).0.to_owned(); + } + let last_migrated = increment_alphanumeric(&last_migrated).as_bytes().to_owned(); + new_migrate_tree.insert("last_migrated", last_migrated.clone())?; + new_migrate_tree.flush()?; + if let Some(count) = new_migrate_tree.get("counter")? { + (old_alias_tree.range(last_migrated..), String::from_utf8_lossy(&count).parse::().unwrap()) + } else { + (old_alias_tree.range(last_migrated..), 0) + } + } else { + (old_alias_tree.iter(), 0) + }; + + for res in iterator { let (k, _) = res?; if let Some(v) = old_alias_tree.get(&k)? { if !k.contains(&b"/"[0]) { - // k is an alias - migrate_main_tree(&k, &v, &old_db, &new_db)?; - debug!( - "Moving alias -> hash for alias {}", - String::from_utf8_lossy(k.as_ref()), - ); + if let Some(id) = old_alias_tree.get(alias_id_key(&String::from_utf8_lossy(&k)))? { + counter += 1; + info!("Migrating alias #{}", counter); + // k is an alias + migrate_main_tree(&k, &v, &old_db, &new_db, &String::from_utf8_lossy(&id))?; + debug!( + "Moving alias -> hash for alias {}", + String::from_utf8_lossy(k.as_ref()), + ); + new_migrate_tree.insert("counter", format!("{}", counter))?; + } } else { debug!( "Moving {}, {}", @@ -148,24 +188,16 @@ where String::from_utf8_lossy(v.as_ref()) ); } - new_alias_tree.insert(k, v)?; + new_alias_tree.insert(k.clone(), v)?; + new_migrate_tree.insert("last_migrated", k)?; + new_migrate_tree.flush()?; } else { warn!("MISSING {}", String::from_utf8_lossy(k.as_ref())); } } + info!("Moved {} unique aliases", counter); - for res in old_fname_tree.iter() { - let (k, _) = res?; - if let Some(v) = old_fname_tree.get(&k)? { - debug!( - "Moving file -> hash for file {}", - String::from_utf8_lossy(k.as_ref()), - ); - new_fname_tree.insert(&k, &v)?; - } else { - warn!("MISSING {}", String::from_utf8_lossy(k.as_ref())); - } - } + new_migrate_tree.insert("done", "true")?; Ok(new_db) } @@ -175,6 +207,7 @@ fn migrate_main_tree( hash: &[u8], old_db: Old, new_db: New, + id: &str, ) -> Result<(), UploadError> where Old: SledDb, @@ -182,28 +215,38 @@ where { let main_tree = new_db.open_tree("main")?; + let new_fname_tree = new_db.open_tree("filename")?; + debug!( "Migrating files for {}", String::from_utf8_lossy(alias.as_ref()) ); - if let Some(v) = old_db.self_tree().get(&hash)? { - main_tree.insert(&hash, v)?; + if let Some(filename) = old_db.self_tree().get(&hash)? { + main_tree.insert(&hash, filename.clone())?; + new_fname_tree.insert(filename, hash.clone())?; + } else { warn!("Missing filename"); } - let (start, end) = alias_key_bounds(&hash); - for res in old_db.self_tree().range(start..end) { - let (k, v) = res?; - debug!("Moving alias {}", String::from_utf8_lossy(v.as_ref())); - main_tree.insert(k, v)?; + let key = alias_key(&hash, id); + if let Some(v) = old_db.self_tree().get(&key)? { + main_tree.insert(key, v)?; + } else { + warn!("Not migrating alias {} id {}", String::from_utf8_lossy(&alias), id); + return Ok(()); } let (start, end) = variant_key_bounds(&hash); - for res in old_db.self_tree().range(start..end) { - let (k, v) = res?; - debug!("Moving variant {}", String::from_utf8_lossy(v.as_ref())); - main_tree.insert(k, v)?; + if main_tree.range(start.clone()..end.clone()).next().is_none() { + let mut counter = 0; + for res in old_db.self_tree().range(start.clone()..end.clone()) { + counter += 1; + let (k, v) = res?; + debug!("Moving variant #{} for {}", counter, String::from_utf8_lossy(v.as_ref())); + main_tree.insert(k, v)?; + } + info!("Moved {} variants for {}", counter, String::from_utf8_lossy(alias.as_ref())); } Ok(()) @@ -228,3 +271,65 @@ pub(crate) fn variant_key_bounds(hash: &[u8]) -> (Vec, Vec) { (start, end) } + +pub(crate) fn alias_id_key(alias: &str) -> String { + format!("{}/id", alias) +} + +pub(crate) fn alias_key(hash: &[u8], id: &str) -> Vec { + let mut key = hash.to_vec(); + // add a separator to the key between the hash and the ID + key.extend(&[0]); + key.extend(id.as_bytes()); + + key +} + +const VALID: &[char] = &[ + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', +]; + +fn increment_alphanumeric(input: &str) -> String { + let (_, output) = input.chars().rev().fold((true, String::new()), |(incr_next, mut acc), item| { + if incr_next { + let mut index = None; + for (i, test) in VALID.iter().enumerate() { + if *test == item { + index = Some(i); + } + } + let index = index.unwrap_or(0); + let (set_incr_next, next_index) = if index == (VALID.len() - 1) { + (true, 0) + } else { + (false, index + 1) + }; + acc.extend(&[VALID[next_index]]); + (set_incr_next, acc) + } else { + acc.extend(&[item]); + (false, acc) + } + }); + + output.chars().rev().collect() +} + +#[cfg(test)] +mod tests { + use super::increment_alphanumeric; + + #[test] + fn increments() { + assert_eq!(increment_alphanumeric("hello"), "hellp"); + assert_eq!(increment_alphanumeric("0"), "1"); + assert_eq!(increment_alphanumeric("9"), "A"); + assert_eq!(increment_alphanumeric("Z"), "a"); + assert_eq!(increment_alphanumeric("z"), "0"); + assert_eq!(increment_alphanumeric("az"), "b0"); + assert_eq!(increment_alphanumeric("19"), "1A"); + assert_eq!(increment_alphanumeric("AZ"), "Aa"); + } +} diff --git a/src/migrate/s032.rs b/src/migrate/s032.rs index 84c8842..732b9dc 100644 --- a/src/migrate/s032.rs +++ b/src/migrate/s032.rs @@ -78,4 +78,8 @@ impl SledTree for sled032::Tree { .map_err(UploadError::from) })) } + + fn flush(&self) -> Result<(), UploadError> { + sled032::Tree::flush(self).map(|_| ()).map_err(UploadError::from) + } } diff --git a/src/migrate/s034.rs b/src/migrate/s034.rs index cbc1e54..a5f7793 100644 --- a/src/migrate/s034.rs +++ b/src/migrate/s034.rs @@ -14,6 +14,18 @@ pub(crate) fn exists(mut base: PathBuf) -> bool { std::fs::metadata(base).is_ok() } +pub(crate) fn migrating(base: PathBuf) -> bool { + if let Ok(db) = open(base) { + if let Ok(tree) = db.open_tree("migrate") { + if let Ok(Some(_)) = tree.get("done") { + return false; + } + } + } + + true +} + pub(crate) fn open(mut base: PathBuf) -> Result { base.push("sled"); base.push(SLED_034); @@ -66,4 +78,8 @@ impl SledTree for sled034::Tree { .map_err(UploadError::from) })) } + + fn flush(&self) -> Result<(), UploadError> { + sled034::Tree::flush(self).map(|_| ()).map_err(UploadError::from) + } } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index d95c6e0..8eebf14 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -1,7 +1,7 @@ use crate::{ config::Format, error::UploadError, - migrate::{alias_key_bounds, variant_key_bounds, LatestDb}, + migrate::{alias_key_bounds, variant_key_bounds, LatestDb, alias_id_key, alias_key}, to_ext, validate::validate_image, }; @@ -11,6 +11,18 @@ use sha2::Digest; use std::{path::PathBuf, pin::Pin, sync::Arc}; use tracing::{debug, error, info, instrument, warn, Span}; +// TREE STRUCTURE +// - Alias Tree +// - alias -> hash +// - alias / id -> u64(id) +// - alias / delete -> delete token +// - Main Tree +// - hash -> filename +// - hash 0 u64(id) -> alias +// - hash 2 variant path -> variant path +// - Filename Tree +// - filename -> hash + #[derive(Clone)] pub struct UploadManager { inner: Arc, @@ -740,19 +752,6 @@ fn file_name(name: String, content_type: mime::Mime) -> Result Vec { - let mut key = hash.to_vec(); - // add a separator to the key between the hash and the ID - key.extend(&[0]); - key.extend(id.as_bytes()); - - key -} - -fn alias_id_key(alias: &str) -> String { - format!("{}/id", alias) -} - fn delete_key(alias: &str) -> String { format!("{}/delete", alias) }