From a9600f23b3ba85ab039f485816a1c304c7f4b713 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 23 Feb 2016 19:34:28 +0100 Subject: [PATCH] Threaded hook execution --- libimagstore/src/error.rs | 2 +- libimagstore/src/store.rs | 83 ++++++++++++++++++++++++++------------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/libimagstore/src/error.rs b/libimagstore/src/error.rs index 55d883a6..a445c2d3 100644 --- a/libimagstore/src/error.rs +++ b/libimagstore/src/error.rs @@ -28,7 +28,7 @@ pub enum StoreErrorKind { HeaderKeyNotFound, HeaderTypeFailure, HookRegisterError, - HookExecuteError, + HookExecutionError, PreHookExecuteError, PostHookExecuteError, // maybe more diff --git a/libimagstore/src/store.rs b/libimagstore/src/store.rs index dcefeb6f..ceaa3172 100644 --- a/libimagstore/src/store.rs +++ b/libimagstore/src/store.rs @@ -15,6 +15,8 @@ use std::ops::DerefMut; use toml::{Table, Value}; use regex::Regex; +use crossbeam; +use crossbeam::ScopedJoinHandle; use error::{ParserErrorKind, ParserError}; use error::{StoreError, StoreErrorKind}; @@ -448,43 +450,68 @@ impl Store { }) } - fn execute_hook_accessors<'a>(&self, accessors: Vec>, fle: FileLockEntry<'a>) + fn execute_hook_accessors<'a>(&self, + accessors: Vec>, + fle: FileLockEntry<'a>) -> Result> { - use hook::HookDataAccessor as HDA; + use std::thread; + use std::thread::JoinHandle; - let iter_par = accessors.iter().all(|accessor| { - if let &HookDataAccessor::NonMutableAccess(_) = accessor.deref() { - true - } else { - false - } + use hook::HookDataAccessor as HDA; + use error::StoreError as SE; + use error::StoreErrorKind as SEK; + + let iter_par = accessors.iter().all(|a| { + match a.deref() { &HDA::NonMutableAccess(_) => true, _ => false } }); debug!("Parallel execution of hooks = {}", iter_par); if iter_par { debug!("Parallel execution of hooks not implemented yet"); - } // else { + let threads : Vec> = accessors + .iter() + .map(|accessor| { + crossbeam::scope(|scope| { + scope.spawn(|| { + match accessor.deref() { + &HDA::NonMutableAccess(ref accessor) => accessor.access(&fle), + _ => panic!("There shouldn't be a MutableHookDataAcceessor but there is"), + } + .map_err(|e| ()) // TODO: We're losing the error cause here + }) + }) + }) + .map(|item| item.join().map_err(|_| SE::new(SEK::HookExecutionError, None))) + .collect(); - accessors.into_iter().fold(Ok(fle), move |acc, accessor| { - match accessor.deref() { - &HDA::MutableAccess(ref accessor) => { - match acc { - Ok(mut fle) => accessor.access_mut(&mut fle).and(Ok(fle)), - Err(e) => Err(e), - } - }, - &HDA::NonMutableAccess(ref accessor) => { - match acc { - Ok(mut fle) => accessor.access(&fle).and(Ok(fle)), - Err(e) => Err(e), - } - }, - } - }) - .map_err(|e| StoreError::new(StoreErrorKind::PostHookExecuteError, Some(Box::new(e)))) - - // } // else + threads + .into_iter() + .fold(Ok(fle), |acc, elem| { + acc.and_then(|a| { + elem.map(|_| a) + .map_err(|_| SE::new(SEK::HookExecutionError, None)) + }) + }) + } else { + accessors.into_iter().fold(Ok(fle), move |acc, accessor| { + match accessor.deref() { + &HDA::MutableAccess(ref accessor) => { + match acc { + Ok(mut fle) => accessor.access_mut(&mut fle).and(Ok(fle)), + Err(e) => Err(e), + } + }, + &HDA::NonMutableAccess(ref accessor) => { + match acc { + Ok(mut fle) => accessor.access(&fle).and(Ok(fle)), + Err(e) => Err(e), + } + }, + } + }) + .map_err(|e| SE::new(SEK::HookExecutionError, Some(Box::new(e)))) + } } pub fn execute_pre_create_hooks(&self, id: &StoreId) -> Result<()> {