Reimplement hook registration and execution

This commit is contained in:
Matthias Beyer 2016-03-04 20:33:08 +01:00
parent d6a581e69f
commit 679865464b

View file

@ -23,11 +23,15 @@ use error::{StoreError, StoreErrorKind};
use storeid::{StoreId, StoreIdIterator};
use lazyfile::LazyFile;
use hook::aspect::Aspect;
use hook::result::HookResult;
use hook::accessor::{ MutableHookDataAccessor,
NonMutableHookDataAccessor,
StoreIdAccessor,
HookDataAccessor,
HookDataAccessorProvider};
use hook::position::HookPosition;
use hook::Hook;
/// The Result Type returned by any interaction with the store that could fail
pub type Result<T> = RResult<T, StoreError>;
@ -105,16 +109,16 @@ pub struct Store {
* Registered hooks
*/
pre_read_hooks : Arc<Mutex<Vec<Box<PreReadHook>>>>,
post_read_hooks : Arc<Mutex<Vec<Box<PostReadHook>>>>,
pre_create_hooks : Arc<Mutex<Vec<Box<PreCreateHook>>>>,
post_create_hooks : Arc<Mutex<Vec<Box<PostCreateHook>>>>,
pre_retrieve_hooks : Arc<Mutex<Vec<Box<PreRetrieveHook>>>>,
post_retrieve_hooks : Arc<Mutex<Vec<Box<PostRetrieveHook>>>>,
pre_update_hooks : Arc<Mutex<Vec<Box<PreUpdateHook>>>>,
post_update_hooks : Arc<Mutex<Vec<Box<PostUpdateHook>>>>,
pre_delete_hooks : Arc<Mutex<Vec<Box<PreDeleteHook>>>>,
post_delete_hooks : Arc<Mutex<Vec<Box<PostDeleteHook>>>>,
pre_read_aspects : Arc<Mutex<Vec<Aspect>>>,
post_read_aspects : Arc<Mutex<Vec<Aspect>>>,
pre_create_aspects : Arc<Mutex<Vec<Aspect>>>,
post_create_aspects : Arc<Mutex<Vec<Aspect>>>,
pre_retrieve_aspects : Arc<Mutex<Vec<Aspect>>>,
post_retrieve_aspects : Arc<Mutex<Vec<Aspect>>>,
pre_update_aspects : Arc<Mutex<Vec<Aspect>>>,
post_update_aspects : Arc<Mutex<Vec<Aspect>>>,
pre_delete_aspects : Arc<Mutex<Vec<Aspect>>>,
post_delete_aspects : Arc<Mutex<Vec<Aspect>>>,
/**
* Internal Path->File cache map
@ -151,16 +155,16 @@ impl Store {
debug!("Store building succeeded");
Ok(Store {
location: location,
pre_read_hooks : Arc::new(Mutex::new(vec![])),
post_read_hooks : Arc::new(Mutex::new(vec![])),
pre_create_hooks : Arc::new(Mutex::new(vec![])),
post_create_hooks : Arc::new(Mutex::new(vec![])),
pre_retrieve_hooks : Arc::new(Mutex::new(vec![])),
post_retrieve_hooks : Arc::new(Mutex::new(vec![])),
pre_update_hooks : Arc::new(Mutex::new(vec![])),
post_update_hooks : Arc::new(Mutex::new(vec![])),
pre_delete_hooks : Arc::new(Mutex::new(vec![])),
post_delete_hooks : Arc::new(Mutex::new(vec![])),
pre_read_aspects : Arc::new(Mutex::new(vec![])),
post_read_aspects : Arc::new(Mutex::new(vec![])),
pre_create_aspects : Arc::new(Mutex::new(vec![])),
post_create_aspects : Arc::new(Mutex::new(vec![])),
pre_retrieve_aspects : Arc::new(Mutex::new(vec![])),
post_retrieve_aspects : Arc::new(Mutex::new(vec![])),
pre_update_aspects : Arc::new(Mutex::new(vec![])),
post_update_aspects : Arc::new(Mutex::new(vec![])),
pre_delete_aspects : Arc::new(Mutex::new(vec![])),
post_delete_aspects : Arc::new(Mutex::new(vec![])),
entries: Arc::new(RwLock::new(HashMap::new())),
})
}
@ -176,7 +180,7 @@ impl Store {
/// Creates the Entry at the given location (inside the entry)
pub fn create<'a>(&'a self, id: StoreId) -> Result<FileLockEntry<'a>> {
let id = self.storify_id(id);
if let Err(e) = self.execute_pre_create_hooks(&id) {
if let Err(e) = self.execute_hooks_for_id(self.pre_create_aspects.clone(), &id) {
return Err(e);
}
@ -194,14 +198,17 @@ impl Store {
se
});
self.execute_post_create_hooks(FileLockEntry::new(self, Entry::new(id.clone()), id))
let mut fle = FileLockEntry::new(self, Entry::new(id.clone()), id);
self.execute_hooks_for_mut_file(self.post_create_aspects.clone(), &mut fle)
.map_err(|e| StoreError::new(StoreErrorKind::PostHookExecuteError, Some(Box::new(e))))
.map(|_| fle)
}
/// Borrow a given Entry. When the `FileLockEntry` is either `update`d or
/// dropped, the new Entry is written to disk
pub fn retrieve<'a>(&'a self, id: StoreId) -> Result<FileLockEntry<'a>> {
let id = self.storify_id(id);
if let Err(e) = self.execute_pre_retrieve_hooks(&id) {
if let Err(e) = self.execute_hooks_for_id(self.pre_retrieve_aspects.clone(), &id) {
return Err(e);
}
@ -214,7 +221,15 @@ impl Store {
se.status = StoreEntryStatus::Borrowed;
entry
})
.and_then(|e| self.execute_post_retrieve_hooks(FileLockEntry::new(self, e, id)))
.map(|e| FileLockEntry::new(self, e, id))
.and_then(|mut fle| {
if let Err(e) = self.execute_hooks_for_mut_file(self.pre_retrieve_aspects.clone(), &mut fle) {
Err(StoreError::new(StoreErrorKind::HookExecutionError, Some(Box::new(e))))
} else {
Ok(fle)
}
})
}
/// Iterate over all StoreIds for one module name
@ -224,7 +239,7 @@ impl Store {
/// Return the `FileLockEntry` and write to disk
pub fn update<'a>(&'a self, mut entry: FileLockEntry<'a>) -> Result<()> {
if let Err(e) = self.execute_pre_update_hooks(&mut entry) {
if let Err(e) = self.execute_hooks_for_mut_file(self.pre_update_aspects.clone(), &mut entry) {
return Err(e);
}
@ -232,8 +247,7 @@ impl Store {
return Err(e);
}
self.execute_post_update_hooks(entry)
.map(|_| ())
self.execute_hooks_for_mut_file(self.post_update_aspects.clone(), &mut entry)
}
/// Internal method to write to the filesystem store.
@ -284,7 +298,7 @@ impl Store {
/// Delete an entry
pub fn delete(&self, id: StoreId) -> Result<()> {
let id = self.storify_id(id);
if let Err(e) = self.execute_pre_delete_hooks(&id) {
if let Err(e) = self.execute_hooks_for_id(self.pre_delete_aspects.clone(), &id) {
return Err(e);
}
@ -306,7 +320,7 @@ impl Store {
return Err(StoreError::new(StoreErrorKind::FileError, Some(Box::new(e))));
}
self.execute_post_delete_hooks(&id)
self.execute_hooks_for_id(self.post_delete_aspects.clone(), &id)
}
/// Gets the path where this store is on the disk
@ -314,327 +328,96 @@ impl Store {
&self.location
}
pub fn register_pre_read_hook(&self, h: Box<PreReadHook>) -> Result<()> {
debug!("Registering pre-read hook: {:?}", h);
self.pre_read_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_post_read_hook(&self, h: Box<PostReadHook>) -> Result<()> {
debug!("Registering post-read hook: {:?}", h);
self.post_read_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_pre_create_hook(&self, h: Box<PreCreateHook>) -> Result<()> {
debug!("Registering pre-create hook: {:?}", h);
self.pre_create_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_post_create_hook(&self, h: Box<PostCreateHook>) -> Result<()> {
debug!("Registering post-create hook: {:?}", h);
self.post_create_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_pre_retrieve_hook(&self, h: Box<PreRetrieveHook>) -> Result<()> {
debug!("Registering pre-retrieve hook: {:?}", h);
self.pre_retrieve_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_post_retrieve_hook(&self, h: Box<PostRetrieveHook>) -> Result<()> {
debug!("Registering post-retrieve hook: {:?}", h);
self.post_retrieve_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_pre_update_hook(&self, h: Box<PreUpdateHook>) -> Result<()> {
debug!("Registering pre-update hook: {:?}", h);
self.pre_update_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_post_update_hook(&self, h: Box<PostUpdateHook>) -> Result<()> {
debug!("Registering post-update hook: {:?}", h);
self.post_update_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_pre_delete_hook(&self, h: Box<PreDeleteHook>) -> Result<()> {
debug!("Registering pre-delete hook: {:?}", h);
self.pre_delete_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn register_post_delete_hook(&self, h: Box<PostDeleteHook>) -> Result<()> {
debug!("Registering post-delete hook: {:?}", h);
self.post_delete_hooks
.deref()
.lock()
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None))
// TODO: cause: Some(Box::new(e))
.map(|mut guard| guard.deref_mut().push(h))
}
pub fn execute_pre_read_hooks(&self, id: &StoreId) -> Result<()> {
debug!("Execute pre-read hooks: {:?}", self.pre_read_hooks);
let guard = self.pre_read_hooks.deref().lock();
if guard.is_err() { return Err(StoreError::new(StoreErrorKind::PreHookExecuteError, None)) }
guard.unwrap().deref().iter()
.fold(Ok(()), |acc, hook| {
debug!("[Hook][exec]: {:?}", hook);
acc.and_then(|_| hook.pre_read(id))
})
.map_err(|e| StoreError::new(StoreErrorKind::PreHookExecuteError, Some(Box::new(e))))
}
pub fn execute_post_read_hooks<'a>(&'a self, fle: FileLockEntry<'a>)
-> Result<FileLockEntry<'a>>
pub fn register_hook(&mut self,
position: HookPosition,
aspect_name: &String,
h: Box<Hook>)
-> Result<()>
{
debug!("Execute post-read hooks: {:?}", self.post_read_hooks);
self.post_read_hooks
debug!("Registering hook: {:?}", h);
debug!(" in position: {:?}", position);
debug!(" with aspect: {:?}", aspect_name);
let mut guard = match position {
HookPosition::PreRead => self.pre_read_aspects.clone(),
HookPosition::PostRead => self.post_read_aspects.clone(),
HookPosition::PreCreate => self.pre_create_aspects.clone(),
HookPosition::PostCreate => self.post_create_aspects.clone(),
HookPosition::PreRetrieve => self.pre_retrieve_aspects.clone(),
HookPosition::PostRetrieve => self.post_retrieve_aspects.clone(),
HookPosition::PreUpdate => self.pre_update_aspects.clone(),
HookPosition::PostUpdate => self.post_update_aspects.clone(),
HookPosition::PreDelete => self.pre_delete_aspects.clone(),
HookPosition::PostDelete => self.post_delete_aspects.clone(),
};
let mut guard = guard
.deref()
.lock()
.map_err(|e| StoreError::new(StoreErrorKind::PostHookExecuteError, None))
.and_then(|guard| {
let accessors = guard.deref().iter().map(|h| h.accessor()).collect();
self.execute_hook_accessors(accessors, fle)
.map_err(|e| {
StoreError::new(StoreErrorKind::PostHookExecuteError, Some(Box::new(e)))
})
})
}
.map_err(|_| StoreError::new(StoreErrorKind::HookRegisterError, None));
fn execute_hook_accessors<'a>(&self,
accessors: Vec<Box<HookDataAccessor>>,
fle: FileLockEntry<'a>)
-> Result<FileLockEntry<'a>>
{
use std::thread;
use std::thread::JoinHandle;
use hook::accessor::HookDataAccessor as HDA;
use error::StoreError as SE;
use error::StoreErrorKind as SEK;
let iter_par = accessors.iter().all(|a| {
match a.deref() { &HDA::NonMutableAccess(_) => true, _ => false }
});
debug!("Parallel execution of hooks = {}", iter_par);
if iter_par {
debug!("Parallel execution of hooks not implemented yet");
let threads : Vec<Result<()>> = accessors
.iter()
.map(|accessor| {
crossbeam::scope(|scope| {
scope.spawn(|| {
match accessor.deref() {
&HDA::NonMutableAccess(ref accessor) => accessor.access(&fle),
_ => panic!("There shouldn't be a MutableHookDataAcceessor but there is"),
}
.map_err(|e| ()) // TODO: We're losing the error cause here
})
})
})
.map(|item| item.join().map_err(|_| SE::new(SEK::HookExecutionError, None)))
.collect();
threads
.into_iter()
.fold(Ok(fle), |acc, elem| {
acc.and_then(|a| {
elem.map(|_| a)
.map_err(|_| SE::new(SEK::HookExecutionError, None))
})
})
} else {
accessors.into_iter().fold(Ok(fle), move |acc, accessor| {
match accessor.deref() {
&HDA::MutableAccess(ref accessor) => {
match acc {
Ok(mut fle) => accessor
.access_mut(&mut fle)
.and(Ok(fle))
.map_err(|e| SE::new(SEK::HookExecutionError, Some(Box::new(e)))),
Err(e) => Err(SE::new(SEK::HookExecutionError, Some(Box::new(e)))),
}
},
&HDA::NonMutableAccess(ref accessor) => {
match acc {
Ok(mut fle) => accessor
.access(&fle)
.and(Ok(fle))
.map_err(|e| SE::new(SEK::HookExecutionError, Some(Box::new(e)))),
Err(e) => Err(SE::new(SEK::HookExecutionError, Some(Box::new(e)))),
}
},
_ => Err(StoreError::new(StoreErrorKind::HookExecutionError, None)),
}
})
if guard.is_err() {
return Err(StoreError::new(StoreErrorKind::HookRegisterError,
Some(Box::new(guard.err().unwrap()))));
}
let mut guard = guard.unwrap();
for mut aspect in guard.deref_mut() {
if aspect.name().clone() == aspect_name.clone() {
aspect.register_hook(h);
return Ok(());
}
}
return Err(StoreError::new(StoreErrorKind::HookRegisterError, None));
}
pub fn execute_pre_create_hooks(&self, id: &StoreId) -> Result<()> {
debug!("Execute pre-create hooks: {:?}", self.pre_create_hooks);
let guard = self.pre_create_hooks.deref().lock();
if guard.is_err() { return Err(StoreError::new(StoreErrorKind::PreHookExecuteError, None)) }
guard.unwrap().deref().iter()
.fold(Ok(()), |acc, hook| {
debug!("[Hook][exec]: {:?}", hook);
acc.and_then(|_| hook.pre_create(id))
})
.map_err(|e| StoreError::new(StoreErrorKind::PreHookExecuteError, Some(Box::new(e))))
}
pub fn execute_post_create_hooks<'a>(&'a self, fle: FileLockEntry<'a>)
-> Result<FileLockEntry<'a>>
fn execute_hooks_for_id(&self,
aspects: Arc<Mutex<Vec<Aspect>>>,
id: &StoreId)
-> Result<()>
{
debug!("Execute post-create hooks: {:?}", self.post_create_hooks);
self.post_create_hooks
.deref()
.lock()
.map_err(|e| StoreError::new(StoreErrorKind::PostHookExecuteError, None))
.and_then(|guard| {
let accessors = guard.deref().iter().map(|h| h.accessor()).collect();
self.execute_hook_accessors(accessors, fle)
.map_err(|e| {
StoreError::new(StoreErrorKind::PostHookExecuteError, Some(Box::new(e)))
})
})
}
pub fn execute_pre_retrieve_hooks(&self, id: &StoreId) -> Result<()> {
debug!("Execute pre-retrieve hooks: {:?}", self.pre_retrieve_hooks);
let guard = self.pre_retrieve_hooks.deref().lock();
let guard = aspects.deref().lock();
if guard.is_err() { return Err(StoreError::new(StoreErrorKind::PreHookExecuteError, None)) }
guard.unwrap().deref().iter()
.fold(Ok(()), |acc, hook| {
debug!("[Hook][exec]: {:?}", hook);
acc.and_then(|_| hook.pre_retrieve(id))
.fold(Ok(()), |acc, aspect| {
debug!("[Aspect][exec]: {:?}", aspect);
acc.and_then(|_| (aspect as &StoreIdAccessor).access(id))
})
.map_err(|e| StoreError::new(StoreErrorKind::PreHookExecuteError, Some(Box::new(e))))
}
pub fn execute_post_retrieve_hooks<'a>(&'a self, fle: FileLockEntry<'a>)
-> Result<FileLockEntry<'a>>
fn execute_hooks_for_mut_file(&self,
aspects: Arc<Mutex<Vec<Aspect>>>,
fle: &mut FileLockEntry)
-> Result<()>
{
debug!("Execute post-retrieve hooks: {:?}", self.post_retrieve_hooks);
self.post_retrieve_hooks
.deref()
.lock()
.map_err(|e| StoreError::new(StoreErrorKind::PostHookExecuteError, None))
.and_then(|guard| {
let accessors = guard.deref().iter().map(|h| h.accessor()).collect();
self.execute_hook_accessors(accessors, fle)
.map_err(|e| {
StoreError::new(StoreErrorKind::PostHookExecuteError, Some(Box::new(e)))
})
})
}
pub fn execute_pre_update_hooks(&self, fle: &mut FileLockEntry) -> Result<()> {
debug!("Execute pre-update hooks: {:?}", self.pre_update_hooks);
let guard = self.pre_update_hooks.deref().lock();
let guard = aspects.deref().lock();
if guard.is_err() { return Err(StoreError::new(StoreErrorKind::PreHookExecuteError, None)) }
guard.unwrap().deref().iter()
.fold(Ok(()), |acc, hook| {
debug!("[Hook][exec]: {:?}", hook);
acc.and_then(|_| hook.pre_update(fle))
.fold(Ok(()), |acc, aspect| {
debug!("[Aspect][exec]: {:?}", aspect);
acc.and_then(|_| aspect.access_mut(fle))
})
.map_err(|e| StoreError::new(StoreErrorKind::PreHookExecuteError, Some(Box::new(e))))
}
pub fn execute_post_update_hooks<'a>(&self, fle: FileLockEntry<'a>) -> Result<FileLockEntry<'a>> {
debug!("Execute post-update hooks: {:?}", self.post_update_hooks);
self.post_update_hooks
.deref()
.lock()
.map_err(|e| StoreError::new(StoreErrorKind::PostHookExecuteError, None))
.and_then(|guard| {
let accessors = guard.deref().iter().map(|h| h.accessor()).collect();
self.execute_hook_accessors(accessors, fle)
.map_err(|e| {
StoreError::new(StoreErrorKind::PostHookExecuteError, Some(Box::new(e)))
})
})
}
pub fn execute_pre_delete_hooks(&self, id: &StoreId) -> Result<()> {
debug!("Execute pre-delete hooks: {:?}", self.pre_delete_hooks);
let guard = self.pre_delete_hooks.deref().lock();
fn execute_hooks_for_file(&self,
aspects: Arc<Mutex<Vec<Aspect>>>,
fle: &FileLockEntry)
-> Result<()>
{
let guard = aspects.deref().lock();
if guard.is_err() { return Err(StoreError::new(StoreErrorKind::PreHookExecuteError, None)) }
guard.unwrap().deref().iter()
.fold(Ok(()), |acc, hook| {
debug!("[Hook][exec]: {:?}", hook);
acc.and_then(|_| hook.pre_delete(id))
.fold(Ok(()), |acc, aspect| {
debug!("[Aspect][exec]: {:?}", aspect);
acc.and_then(|_| (aspect as &NonMutableHookDataAccessor).access(fle))
})
.map_err(|e| StoreError::new(StoreErrorKind::PreHookExecuteError, Some(Box::new(e))))
}
pub fn execute_post_delete_hooks(&self, id: &StoreId) -> Result<()> {
debug!("Execute post-delete hooks: {:?}", self.post_delete_hooks);
self.post_delete_hooks
.deref()
.lock()
.map_err(|e| StoreError::new(StoreErrorKind::PostHookExecuteError, None))
.and_then(|guard| {
guard.deref()
.iter()
.fold(Ok(()), move |res, hook| {
debug!("[Hook][exec]: {:?}", hook);
res.and_then(|_| hook.post_delete(id))
})
.map_err(|e| {
StoreError::new(StoreErrorKind::PostHookExecuteError, Some(Box::new(e)))
})
})
}
}
impl Drop for Store {