Functionality for thread-safe parallel processing of queued items. More...
Classes | |
class | MR::Thread::Queue< T > |
A first-in first-out thread-safe item queue. More... | |
Functions | |
template<class Item > | |
__Batch< Item > | MR::Thread::batch (const Item &, size_t number=128) |
used to request batched processing of items More... | |
template<class Source , class Item , class Sink > | |
void | MR::Thread::run_queue (Source &&source, const Item &item, Sink &&sink, size_t capacity=128) |
convenience function to set up and run a 2-stage multi-threaded pipeline. More... | |
template<class Source , class Item1 , class Pipe , class Item2 , class Sink > | |
void | MR::Thread::run_queue (Source &&source, const Item1 &item1, Pipe &&pipe, const Item2 &item2, Sink &&sink, size_t capacity=128) |
convenience functions to set up and run a 3-stage multi-threaded pipeline. More... | |
template<class Source , class Item1 , class Pipe1 , class Item2 , class Pipe2 , class Item3 , class Sink > | |
void | MR::Thread::run_queue (Source &&source, const Item1 &item1, Pipe1 &&pipe1, const Item2 &item2, Pipe2 &&pipe2, const Item3 &item3, Sink &&sink, size_t capacity=128) |
convenience functions to set up and run a 4-stage multi-threaded pipeline. More... | |
Functionality for thread-safe parallel processing of queued items.
These functions and classes provide functionality for one or more source threads to feed items into a first-in first-out queue, and one or more sink threads to consume items. This pipeline can also extend over two queues, with one or more pipe threads consuming items of one type from the first queue, and feeding items of another type onto the second queue.
As a graphical representation of the pipeline, the following workflows can be achieved:
or for a deeper pipeline:
By default, items are push to and pulled from the queue one by one. In situations where the amount of processing per item is small, items can be sent in batches to reduce the overhead of thread management (mutex locking/unlocking, etc).
The simplest way to use this functionality is via the Thread::run_queue() and associated Thread::multi() and Thread::batch() functions. In complex situations, it may be necessary to use the Thread::Queue class directly, although that should very rarely (if ever) be needed.
|
inline |
used to request batched processing of items
This function is used in combination with Thread::run_queue to request that the items object be processed in batches of number items (defaults to MRTRIX_QUEUE_DEFAULT_BATCH_SIZE).
Definition at line 858 of file thread_queue.h.
|
inline |
convenience function to set up and run a 2-stage multi-threaded pipeline.
This function (and its 3-stage equivalent Thread::run_queue(const Source&, const Item1&, const Pipe&, const Item2&, const Sink&, size_t)) simplify the process of setting up a multi-threaded processing chain that should meet most users' needs.
The arguments to this function correspond to an instance of the Source, the Sink, and optionally the Pipe functors, in addition to an instance of the Items to be passed through each stage of the pipeline - these are provided purely to specify the type of object to pass through the queue(s).
The 3 types of functors each have a specific purpose, and corresponding requirements as described below:
true
if further items need to be processed, or false
to signal that no further items are to be sent through the queue (at which point the corresponding thread(s) will exit).true
when ready to process further items, or false
to signal the end of processing (at which point the corresponding thread(s) will exit).true
if the item processed is to be sent to the next stage in the pipeline, and false if it is to be discarded - note that this is very different from the other functors, where returning false signals end of processing.This is a simple demo application that generates a linear sequence of numbers and sums them up:
If a functor is to be run over multiple parallel threads of execution, it should be wrapped in a call to Thread::multi() before being passed to the Thread::run_queue() functions. The Thread::run_queue() functions will then create additional instances of the relevant functor using its copy constructor; care should therefore be taken to ensure that the functor's copy constructor behaves appropriately.
For example, using the code above:
For the functor that is being multi-threaded, the default number of threads instantiated will depend on the "NumberOfThreads" entry in the MRtrix confugration file, or can be set at the command-line using the -nthreads option. This number can also be set as additional optional argument to Thread::multi().
Note that any functor can be parallelised in this way. In the example above, the Source functor could have been wrapped in Thread::multi() instead if this was the behaviour required:
In cases where the amount of processing per item is small, the overhead of managing the concurrent access to the various queues from all the threads may become prohibitive (see Writing multi-threaded applications for details). In this case, it is a good idea to process the items in batches, which drastically reduces the number of accesses to the queue. This can be done by wrapping the items in a call to Thread::batch():
By default, batches consist of MRTRIX_QUEUE_DEFAULT_BATCH_SIZE items (defined as 128). This can be set explicitly by providing the desired size as an additional argument to Thread::batch():
Obviously, Thread::multi() and Thread::batch() can be used in any combination to perform the operations required.
Definition at line 1050 of file thread_queue.h.
|
inline |
convenience functions to set up and run a 3-stage multi-threaded pipeline.
This function extends the 2-stage Thread::run_queue() function to allow a 3-stage pipeline. For example, using the example from Thread::run_queue(), the following would add an additional stage to the pipeline to double the numbers as they come through:
Note that the return value of the Pipe functor's operator() method is used in this case to signal whether or not the corresponding item should be sent through to the next stage (true) or discarded (false). This differs from the Source & Sink functors where the corresponding return value is used to signal end of processing.
As with the 2-stage pipeline, any functor can be executed in parallel (i.e. wrapped in Thread::multi()), Items do not need to be of the same type, and can be batched independently with any desired size.
Definition at line 1124 of file thread_queue.h.
|
inline |
convenience functions to set up and run a 4-stage multi-threaded pipeline.
This function extends the 2-stage Thread::run_queue() function to allow a 3-stage pipeline.
Definition at line 1168 of file thread_queue.h.