13 #ifndef ROOT_TProcessExecutor 14 #define ROOT_TProcessExecutor 31 #include <type_traits> 47 template<
class F,
class Cond = noReferenceCond<F>>
48 auto Map(
F func,
unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
49 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
51 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
52 auto Map(
F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
58 template<
class F,
class R,
class Cond = noReferenceCond<F>>
59 auto MapReduce(
F func,
unsigned nTimes,
R redfunc) ->
typename std::result_of<F()>::type;
60 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
61 auto MapReduce(
F func, std::vector<T> &args,
R redfunc) ->
typename std::result_of<F(T)>::type;
64 template<
class T,
class R>
T Reduce(
const std::vector<T> &objs,
R redfunc);
67 template<
class T>
void Collect(std::vector<T> &reslist);
80 enum class ETask : unsigned char {
99 template<
class F,
class Cond>
102 using retType = decltype(func());
108 unsigned oldNWorkers = GetNWorkers();
109 if (nTimes < oldNWorkers)
112 bool ok = Fork(worker);
113 SetNWorkers(oldNWorkers);
116 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
117 return std::vector<retType>();
121 fNToProcess = nTimes;
122 std::vector<retType> reslist;
123 reslist.reserve(fNToProcess);
131 fTaskType = ETask::kNoTask;
141 template<
class F,
class T,
class Cond>
145 using retType = decltype(func(args.front()));
148 fTaskType = ETask::kMapWithArg;
152 unsigned oldNWorkers = GetNWorkers();
153 if (args.size() < oldNWorkers)
154 SetNWorkers(args.size());
156 bool ok = Fork(worker);
157 SetNWorkers(oldNWorkers);
160 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
161 return std::vector<retType>();
165 fNToProcess = args.size();
166 std::vector<retType> reslist;
167 reslist.reserve(fNToProcess);
168 std::vector<unsigned> range(fNToProcess);
169 std::iota(range.begin(), range.end(), 0);
177 fTaskType = ETask::kNoTask;
185 template<
class F,
class INTEGER,
class Cond>
188 std::vector<INTEGER> vargs(args.size());
189 std::copy(args.begin(), args.end(), vargs.begin());
190 const auto &reslist = Map(func, vargs);
200 template<
class F,
class R,
class Cond>
203 using retType = decltype(func());
206 fTaskType= ETask::kMapRed;
209 unsigned oldNWorkers = GetNWorkers();
210 if (nTimes < oldNWorkers)
213 bool ok = Fork(worker);
214 SetNWorkers(oldNWorkers);
216 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
221 fNToProcess = nTimes;
222 std::vector<retType> reslist;
223 reslist.reserve(fNToProcess);
231 fTaskType= ETask::kNoTask;
232 return redfunc(reslist);
241 template<
class F,
class T,
class R,
class Cond>
245 using retType = decltype(func(args.front()));
248 fTaskType= ETask::kMapRedWithArg;
251 unsigned oldNWorkers = GetNWorkers();
252 if (args.size() < oldNWorkers)
253 SetNWorkers(args.size());
255 bool ok = Fork(worker);
256 SetNWorkers(oldNWorkers);
258 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
259 return decltype(func(args.front()))();
263 fNToProcess = args.size();
264 std::vector<retType> reslist;
265 reslist.reserve(fNToProcess);
266 std::vector<unsigned> range(fNToProcess);
267 std::iota(range.begin(), range.end(), 0);
274 fTaskType= ETask::kNoTask;
275 return Reduce(reslist, redfunc);
281 template<
class T,
class R>
285 static_assert(std::is_same<decltype(redfunc(objs)),
T>::value,
"redfunc does not have the correct signature");
286 return redfunc(objs);
294 unsigned code = msg.first;
296 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
301 if(msg.second !=
nullptr)
302 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
305 const char *str = ReadBuffer<const char*>(msg.second.get());
306 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n" 307 "Continuing execution ignoring these entries.", str);
312 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
329 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
331 }
else if (msg.first < 1000)
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void SetNWorkers(unsigned n)
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Namespace for new ROOT classes and functions.
This class works together with TProcessExecutor to allow the execution of functions in server process...
unsigned GetNWorkers() const
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Error while reading from the socket.
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
unsigned fNProcessed
number of arguments already passed to the workers
void Remove(TSocket *s)
Remove a certain socket from the monitor.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
The message contains the result of the processing of a TTree.
void Reset()
Reset TProcessExecutor's state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
a MapReduce method with arguments is being executed
The message contains the result of a function execution.
This class provides a simple interface to execute the same task multiple times in parallel...
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Execute function with the argument contained in the message.
void Reset(Detail::TBranchProxy *x)
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
~TProcessExecutor()=default
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
Base class for multiprocess applications' clients.
static constexpr double s
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
a MapReduce method with no arguments is being executed
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
virtual void ActivateAll()
Activate all de-activated sockets.
Tell the client there was an error while processing.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
We are ready for the next task.
unsigned GetNWorkers() const
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
Execute function without arguments.
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...