17#ifndef __mrtrix_thread_queue_h__
18#define __mrtrix_thread_queue_h__
21#include <condition_variable>
27#define MRTRIX_QUEUE_DEFAULT_CAPACITY 128
28#define MRTRIX_QUEUE_DEFAULT_BATCH_SIZE 128
43 using type =
typename std::remove_reference<X>::type;
44 using member_type =
typename std::remove_reference<X>::type&;
45 static X&
functor (X& job) {
return job; }
47 template <
class SingleFunctor>
48 static SingleFunctor&
get (X& , SingleFunctor&
functor) {
56 using type =
typename std::remove_reference<X>::type;
57 using member_type =
typename std::remove_reference<X>::type;
58 static X&
functor (__Multi<X>& job) {
return job.functor; }
60 template <
class SingleFunctor>
61 static __Multi<SingleFunctor>
get (__Multi<X>& f, SingleFunctor&
functor) {
62 return __Multi<SingleFunctor> (
functor, f.num);
345 buffer (new T* [buffer_size]),
348 capacity (buffer_size),
352 assert (capacity > 0);
405 Item (
const Writer& writer) : Q (writer.Q), p (Q.get_item()) { }
408 Q.unregister_writer();
478 Q.unregister_reader();
517 std::lock_guard<std::mutex> lock (mutex);
518 std::cerr <<
"Thread::Queue \"" + name +
"\": "
519 << writer_count <<
" writer" << (writer_count > 1 ?
"s" :
"") <<
", "
520 << reader_count <<
" reader" << (reader_count > 1 ?
"s" :
"") <<
", items waiting: " << size() <<
"\n";
526 std::condition_variable more_data, more_space;
531 size_t writer_count, reader_count;
532 std::stack<T*,vector<T*> > item_stack;
536 void register_writer () {
537 std::lock_guard<std::mutex> lock (mutex);
540 void unregister_writer () {
541 std::lock_guard<std::mutex> lock (mutex);
542 assert (writer_count);
545 DEBUG (
"no writers left on queue \"" + name +
"\"");
546 more_data.notify_all();
549 void register_reader () {
550 std::lock_guard<std::mutex> lock (mutex);
553 void unregister_reader () {
554 std::lock_guard<std::mutex> lock (mutex);
555 assert (reader_count);
558 DEBUG (
"no readers left on queue \"" + name +
"\"");
559 more_space.notify_all();
564 return (front == back);
567 return (inc (back) == front);
570 return ( (back < front ? back+capacity : back) - front);
574 std::lock_guard<std::mutex> lock (mutex);
576 items.push_back (std::unique_ptr<T> (
item));
581 std::unique_lock<std::mutex> lock (mutex);
582 more_space.wait (lock, [
this]{
return !(full() && reader_count); });
583 if (!reader_count)
return false;
586 if (item_stack.empty()) {
588 items.push_back (std::unique_ptr<T> (
item));
591 item = item_stack.top();
594 more_data.notify_one();
599 std::unique_lock<std::mutex> lock (mutex);
601 item_stack.push (
item);
603 more_data.wait (lock, [
this]{
return !(empty() && writer_count); });
604 if (empty() && !writer_count)
608 more_space.notify_one();
613 std::unique_lock<std::mutex> lock (mutex);
615 item_stack.push (
item);
620 if (p >= buffer + capacity) p = buffer;
636 template <
class Item>
638 __Batch (
size_t number) :
num (number) { }
644 template <
class Item>
struct __batch_size {
NOMEMALIGN
645 __batch_size (
const Item&) { }
646 operator size_t ()
const {
return 0; }
648 template <
class Item>
struct __batch_size <__Batch<Item>> {
NOMEMALIGN
649 __batch_size (
const __Batch<Item>&
item) : n (
item.
num) { }
650 operator size_t ()
const {
return n; }
659 template <
class Item>
struct Type {
NOMEMALIGN
661 using queue = Queue<Item>;
662 using reader =
typename queue::Reader;
663 using writer =
typename queue::Writer;
664 using read_item =
typename reader::Item;
665 using write_item =
typename writer::Item;
668 template <
class Item>
struct Type<__Batch<Item>> {
NOMEMALIGN
670 using queue = Queue<vector<Item>>;
671 using reader =
typename queue::Reader;
672 using writer =
typename queue::Writer;
673 using read_item =
typename reader::Item;
674 using write_item =
typename writer::Item;
679 template <
class Item>
681 FetchItem (
typename Type<Item>::reader&
item) : in (
item.placeholder()) { }
682 bool read () {
return in.read(); }
683 Item&
value () {
return (*in); }
684 typename Type<Item>::read_item in;
687 template <
class Item>
689 FetchItem (
typename Type<__Batch<Item>>::reader& in) : in (in.placeholder()), n (0) { }
694 if (n >= in->size()) {
701 Item&
value () {
return (*in)[n]; }
702 typename Type<__Batch<Item>>::read_item in;
710 template <
class Item>
712 StoreItem (
size_t,
typename Type<Item>::writer&
item) : out (
item.placeholder()) { }
713 bool write () {
return out.write(); }
714 Item&
value () {
return (*out); }
715 bool flush () {
return true; }
716 typename Type<Item>::write_item out;
719 template <
class Item>
721 StoreItem (
size_t batch_size,
typename Type<__Batch<Item>>::writer&
item) :
722 out (
item.placeholder()), batch_size (batch_size), n(0) { out->resize (batch_size); }
725 if (n >= batch_size) {
729 out->resize (batch_size);
733 Item&
value () {
return (*out)[n]; }
734 void flush () {
if (n) { out->resize (n); out.write(); } }
735 typename Type<__Batch<Item>>::write_item out;
736 const size_t batch_size;
743 template <
class Item,
class Functor>
744 struct __Source {
MEMALIGN(__Source<Item,Functor>)
746 using queue_t =
typename Type<Item>::queue;
747 using writer_t =
typename Type<Item>::writer;
748 using functor_t =
typename __job<Functor>::member_type;
754 __Source (queue_t& queue, Functor&
functor,
const Item&
item) :
757 batch_size (__batch_size<Item> (
item)) { }
760 auto out = StoreItem<Item> (batch_size, writer);
762 if (!func (out.value()))
764 }
while (out.write());
775 template <
class Item1,
class Functor,
class Item2>
776 struct __Pipe {
MEMALIGN(__Pipe<Item1,Functor,Item2>)
779 using queue1_t =
typename Type<Item1>::queue;
780 using queue2_t =
typename Type<Item2>::queue;
781 using reader_t =
typename Type<Item1>::reader;
782 using writer_t =
typename Type<Item2>::writer;
783 using functor_t =
typename __job<Functor>::member_type;
788 const size_t batch_size;
790 __Pipe (queue1_t& queue_in, Functor&
functor, queue2_t& queue_out,
const Item2& item2) :
794 batch_size (__batch_size<Item2> (item2)) { }
797 auto in = FetchItem<Item1> (reader);
798 auto out = StoreItem<Item2> (batch_size, writer);
800 if (func (in.value(), out.value())) {
815 template <
class Item,
class Functor>
816 struct __Sink {
MEMALIGN(__Sink<Item,Functor>)
818 using queue_t =
typename Type<Item>::queue;
819 using reader_t =
typename Type<Item>::reader;
820 using functor_t =
typename __job<Functor>::member_type;
825 __Sink (queue_t& queue, Functor&
functor) :
830 auto in = FetchItem<Item> (reader);
832 if (!func (in.value()))
857 template <
class Item>
860 return __Batch<Item> (number);
1049 template <
class Source,
class Item,
class Sink>
1064 typename Type<Item>::queue queue (
"source->sink", capacity);
1065 __Source<Item,Source> source_functor (queue, source,
item);
1066 __Sink<Item,Sink> sink_functor (queue, sink);
1123 template <
class Source,
class Item1,
class Pipe,
class Item2,
class Sink>
1144 typename Type<Item1>::queue queue1 (
"source->pipe", capacity);
1145 typename Type<Item2>::queue queue2 (
"pipe->sink", capacity);
1147 __Source<Item1,Source> source_functor (queue1, source, item1);
1148 __Pipe<Item1,Pipe,Item2> pipe_functor (queue1, pipe, queue2, item2);
1149 __Sink<Item2,Sink> sink_functor (queue2, sink);
1167 template <
class Source,
class Item1,
class Pipe1,
class Item2,
class Pipe2,
class Item3,
class Sink>
1192 typename Type<Item1>::queue queue1 (
"source->pipe", capacity);
1193 typename Type<Item2>::queue queue2 (
"pipe->pipe", capacity);
1194 typename Type<Item3>::queue queue3 (
"pipe->sink", capacity);
1196 __Source<Item1,Source> source_functor (queue1, source, item1);
1197 __Pipe<Item1,Pipe1,Item2> pipe1_functor (queue1, pipe1, queue2, item2);
1198 __Pipe<Item2,Pipe2,Item3> pipe2_functor (queue2, pipe2, queue3, item3);
1199 __Sink<Item3,Sink> sink_functor (queue3, sink);
This class is used to read items from the queue.
void recycle(T *item) const
Item(const Reader &reader)
Construct a Reader::Item object.
bool read()
Get next item from the queue.
~Item()
Unregister the parent Reader from the queue.
This class is used to register a reader with the queue.
Reader(Queue< T > &queue)
Register a Reader object with the queue.
Reader(const Reader &reader)
This class is used to write items to the queue.
bool write()
Push the item onto the queue.
Item(const Writer &writer)
Construct a Writer::Item object.
~Item()
Unregister the parent Writer from the queue.
This class is used to register a writer with the queue.
Writer(Queue< T > &queue)
Register a Writer object with the queue.
A first-in first-out thread-safe item queue.
void status()
Print out a status report for debugging purposes.
Queue & operator=(const Queue &)=delete
Queue(const std::string &description="unnamed", size_t buffer_size=128)
Construct a Queue of items of type T.
Queue(const Queue &)=delete
VectorType::Scalar value(const VectorType &coefs, typename VectorType::Scalar cos_elevation, typename VectorType::Scalar cos_azimuth, typename VectorType::Scalar sin_azimuth, int lmax)
__run< Functor >::type run(Functor &&functor, const std::string &name="unnamed")
Execute the functor's execute method in a separate thread.
size_t threads_to_execute()
void run_queue(Source &&source, const Item &item, Sink &&sink, size_t capacity=128)
convenience function to set up and run a 2-stage multi-threaded pipeline.
__Batch< Item > batch(const Item &, size_t number=128)
used to request batched processing of items
void write(const KeyValues &keyval, nlohmann::json &json)
KeyValues read(const nlohmann::json &json, const KeyValues &preexisting=KeyValues())
List get(const HeaderType &header)
return the strides of header as a vector<ssize_t>
void check_app_exit_code()
std::remove_reference< Functor >::type & functor
#define MRTRIX_QUEUE_DEFAULT_CAPACITY
#define MRTRIX_QUEUE_DEFAULT_BATCH_SIZE