Threaded hook execution

This commit is contained in:
Matthias Beyer 2016-02-23 19:34:28 +01:00
parent 79ba3f5151
commit a9600f23b3
2 changed files with 56 additions and 29 deletions

View file

@ -28,7 +28,7 @@ pub enum StoreErrorKind {
HeaderKeyNotFound, HeaderKeyNotFound,
HeaderTypeFailure, HeaderTypeFailure,
HookRegisterError, HookRegisterError,
HookExecuteError, HookExecutionError,
PreHookExecuteError, PreHookExecuteError,
PostHookExecuteError, PostHookExecuteError,
// maybe more // maybe more

View file

@ -15,6 +15,8 @@ use std::ops::DerefMut;
use toml::{Table, Value}; use toml::{Table, Value};
use regex::Regex; use regex::Regex;
use crossbeam;
use crossbeam::ScopedJoinHandle;
use error::{ParserErrorKind, ParserError}; use error::{ParserErrorKind, ParserError};
use error::{StoreError, StoreErrorKind}; use error::{StoreError, StoreErrorKind};
@ -448,43 +450,68 @@ impl Store {
}) })
} }
fn execute_hook_accessors<'a>(&self, accessors: Vec<Box<HookDataAccessor>>, fle: FileLockEntry<'a>) fn execute_hook_accessors<'a>(&self,
accessors: Vec<Box<HookDataAccessor>>,
fle: FileLockEntry<'a>)
-> Result<FileLockEntry<'a>> -> Result<FileLockEntry<'a>>
{ {
use hook::HookDataAccessor as HDA; use std::thread;
use std::thread::JoinHandle;
let iter_par = accessors.iter().all(|accessor| { use hook::HookDataAccessor as HDA;
if let &HookDataAccessor::NonMutableAccess(_) = accessor.deref() { use error::StoreError as SE;
true use error::StoreErrorKind as SEK;
} else {
false let iter_par = accessors.iter().all(|a| {
} match a.deref() { &HDA::NonMutableAccess(_) => true, _ => false }
}); });
debug!("Parallel execution of hooks = {}", iter_par); debug!("Parallel execution of hooks = {}", iter_par);
if iter_par { if iter_par {
debug!("Parallel execution of hooks not implemented yet"); debug!("Parallel execution of hooks not implemented yet");
} // else { let threads : Vec<Result<()>> = accessors
.iter()
.map(|accessor| {
crossbeam::scope(|scope| {
scope.spawn(|| {
match accessor.deref() {
&HDA::NonMutableAccess(ref accessor) => accessor.access(&fle),
_ => panic!("There shouldn't be a MutableHookDataAcceessor but there is"),
}
.map_err(|e| ()) // TODO: We're losing the error cause here
})
})
})
.map(|item| item.join().map_err(|_| SE::new(SEK::HookExecutionError, None)))
.collect();
accessors.into_iter().fold(Ok(fle), move |acc, accessor| { threads
match accessor.deref() { .into_iter()
&HDA::MutableAccess(ref accessor) => { .fold(Ok(fle), |acc, elem| {
match acc { acc.and_then(|a| {
Ok(mut fle) => accessor.access_mut(&mut fle).and(Ok(fle)), elem.map(|_| a)
Err(e) => Err(e), .map_err(|_| SE::new(SEK::HookExecutionError, None))
} })
}, })
&HDA::NonMutableAccess(ref accessor) => { } else {
match acc { accessors.into_iter().fold(Ok(fle), move |acc, accessor| {
Ok(mut fle) => accessor.access(&fle).and(Ok(fle)), match accessor.deref() {
Err(e) => Err(e), &HDA::MutableAccess(ref accessor) => {
} match acc {
}, Ok(mut fle) => accessor.access_mut(&mut fle).and(Ok(fle)),
} Err(e) => Err(e),
}) }
.map_err(|e| StoreError::new(StoreErrorKind::PostHookExecuteError, Some(Box::new(e)))) },
&HDA::NonMutableAccess(ref accessor) => {
// } // else match acc {
Ok(mut fle) => accessor.access(&fle).and(Ok(fle)),
Err(e) => Err(e),
}
},
}
})
.map_err(|e| SE::new(SEK::HookExecutionError, Some(Box::new(e))))
}
} }
pub fn execute_pre_create_hooks(&self, id: &StoreId) -> Result<()> { pub fn execute_pre_create_hooks(&self, id: &StoreId) -> Result<()> {