17#ifndef __mrtrix_ordered_thread_queue_h__
18#define __mrtrix_ordered_thread_queue_h__
32 class __Ordered {
MEMALIGN(__Ordered<Item>)
34 __Ordered () =
default;
43 bool operator() (
const __Ordered<Item>* a,
const __Ordered<Item>* b)
const {
return a->index < b->index; }
49 template <
class JobType>
struct job_is_single_threaded : std::true_type {
NOMEMALIGN };
50 template <
class JobType>
struct job_is_single_threaded< __Multi<JobType>> : std::false_type {
NOMEMALIGN };
53 template <
class Item>
struct __batch_size <__Ordered<__Batch<Item>>> {
NOMEMALIGN
54 __batch_size (
const __Ordered<__Batch<Item>>&
item) : n (
item.
item.
num) { }
55 operator size_t ()
const {
return n; }
67 template <
class Item>
struct Type<__Ordered<Item>> {
NOMEMALIGN
69 using queue = Queue<__Ordered<Item>>;
70 using reader =
typename queue::Reader;
71 using writer =
typename queue::Writer;
72 using read_item =
typename reader::Item;
73 using write_item =
typename writer::Item;
77 template <
class Item,
class Functor>
78 struct __Source<__Ordered<Item>,Functor> {
79 MEMALIGN(__Source<__Ordered<Item>,Functor>)
81 using queued_t = __Ordered<Item>;
82 using queue_t = typename Type<queued_t>::queue;
83 using writer_t = typename Type<queued_t>::writer;
84 using functor_t = typename __job<Functor>::member_type;
90 __Source (queue_t& queue, Functor&
functor, const queued_t&
item) :
93 batch_size (__batch_size<queued_t> (
item)) { }
97 auto out = writer.placeholder();
99 if (!func (out->item))
101 out->index = count++;
102 }
while (out.write());
112 template <
class Item1,
class Functor,
class Item2>
113 struct __Pipe<__Ordered<Item1>,Functor,__Ordered<Item2>> {
114 MEMALIGN(__Pipe<__Ordered<Item1>,Functor,__Ordered<Item2>>)
116 using queued1_t = __Ordered<Item1>;
117 using queued2_t = __Ordered<Item2>;
118 using queue1_t = typename Type<queued1_t>::queue;
119 using queue2_t = typename Type<queued2_t>::queue;
120 using reader_t = typename Type<queued1_t>::reader;
121 using writer_t = typename Type<queued2_t>::writer;
122 using functor_t = typename __job<Functor>::member_type;
127 const
size_t batch_size;
129 __Pipe (queue1_t& queue_in, Functor&
functor, queue2_t& queue_out, const queued2_t& item2) :
133 batch_size (__batch_size<queued2_t> (item2)) { }
136 auto in = reader.placeholder();
137 auto out = writer.placeholder();
139 if (!func (in->item, out->item))
141 out->index = in->index;
153 template <
class Item,
class Functor>
154 struct __Sink<__Ordered<Item>,Functor> {
155 MEMALIGN(__Sink<__Ordered<Item>,Functor>)
157 using queued_t = __Ordered<Item>;
158 using queue_t = typename Type<queued_t>::queue;
159 using reader_t = typename Type<queued_t>::reader;
160 using functor_t = typename __job<Functor>::member_type;
165 __Sink (queue_t& queue, Functor&
functor) :
171 auto in = reader.placeholder();
172 std::set<queued_t*,CompareItems> buffer;
174 if (in->index > expected) {
175 buffer.emplace (in.stash());
178 if (!func (in->item))
181 while (!buffer.empty() && (*buffer.begin())->index <= expected) {
182 if (!func ((*buffer.begin())->item))
184 in.recycle (*buffer.begin());
185 buffer.erase (buffer.begin());
203 template <
class Item>
struct Type<__Ordered<__Batch<Item>>> {
NOMEMALIGN
205 using queue = Queue<__Ordered<vector<Item>>>;
206 using reader =
typename queue::Reader;
207 using writer =
typename queue::Writer;
208 using read_item =
typename reader::Item;
209 using write_item =
typename writer::Item;
214 template <
class Item,
class Functor>
215 struct __Source<__Ordered<__Batch<Item>>,Functor> {
216 MEMALIGN(__Source<__Ordered<__Batch<Item>>,Functor>)
218 using queued_t = __Ordered<vector<Item>>;
219 using passed_t = __Ordered<__Batch<Item>>;
220 using queue_t = typename Type<queued_t>::queue;
221 using writer_t = typename Type<queued_t>::writer;
222 using functor_t = typename __job<Functor>::member_type;
228 __Source (queue_t& queue, Functor&
functor, const passed_t&
item) :
231 batch_size (__batch_size<passed_t> (
item)) { }
235 auto out = writer.placeholder();
238 out->item.resize (batch_size);
239 for (
size_t n = 0; n < batch_size; ++n) {
240 if (!func (out->item[n])) {
246 out->index = count++;
247 }
while (out.write() && !stop);
257 template <
class Item1,
class Functor,
class Item2>
258 struct __Pipe<__Ordered<__Batch<Item1>>,Functor,__Ordered<__Batch<Item2>>> {
259 MEMALIGN(__Pipe<__Ordered<__Batch<Item1>>,Functor,__Ordered<__Batch<Item2>>>)
261 using queued1_t = __Ordered<vector<Item1>>;
262 using queued2_t = __Ordered<vector<Item2>>;
263 using passed2_t = __Ordered<__Batch<Item2>>;
264 using queue1_t = typename Type<queued1_t>::queue;
265 using queue2_t = typename Type<queued2_t>::queue;
266 using reader_t = typename Type<queued1_t>::reader;
267 using writer_t = typename Type<queued2_t>::writer;
268 using functor_t = typename __job<Functor>::member_type;
273 const
size_t batch_size;
275 __Pipe (queue1_t& queue_in, Functor&
functor, queue2_t& queue_out, const passed2_t& item2) :
279 batch_size (__batch_size<passed2_t> (item2)) { }
282 auto in = reader.placeholder();
283 auto out = writer.placeholder();
285 out->item.resize (in->item.size());
287 for (
size_t n = 0; n < in->item.size(); ++n) {
288 if (func (in->item[n], out->item[k]))
291 out->item.resize (k);
292 out->index = in->index;
305 template <
class Item,
class Functor>
306 struct __Sink<__Ordered<__Batch<Item>>,Functor> {
307 MEMALIGN(__Sink<__Ordered<__Batch<Item>>,Functor>)
309 using queued_t = __Ordered<vector<Item>>;
310 using queue_t = typename Type<queued_t>::queue;
311 using reader_t = typename Type<queued_t>::reader;
312 using functor_t = typename __job<Functor>::member_type;
317 __Sink (queue_t& queue, Functor&
functor) :
323 auto in = reader.placeholder();
324 std::set<queued_t*,CompareItems> buffer;
326 if (in->index > expected) {
327 buffer.emplace (in.stash());
330 for (
size_t n = 0; n < in->item.size(); ++n)
331 if (!func (in->item[n]))
334 while (!buffer.empty() && (*buffer.begin())->index <= expected) {
335 for (
size_t n = 0; n < (*buffer.begin())->item.size(); ++n)
336 if (!func ((*buffer.begin())->item[n]))
338 in.recycle (*buffer.begin());
339 buffer.erase (buffer.begin());
355 template <
class Source,
class Item1,
class Pipe,
class Item2,
class Sink>
365 "run_ordered_queue can only run with single-threaded source & sink");
367 if (__batch_size<Item1>(item1) != __batch_size<Item2>(item2))
368 throw Exception (
"Thread::run_ordered_queue must be run with matching batch sizes across all stages");
372 __Ordered<Item1>(item1),
374 __Ordered<Item2>(item2),
382 template <
class Source,
class Item1,
class Pipe1,
class Item2,
class Pipe2,
class Item3,
class Sink>
394 "run_ordered_queue can only run with single-threaded source & sink");
396 if (__batch_size<Item1>(item1) != __batch_size<Item2>(item2) ||
397 __batch_size<Item1>(item1) != __batch_size<Item3>(item3))
398 throw Exception (
"Thread::run_ordered_queue must be run with matching batch sizes across all stages");
402 __Ordered<Item1>(item1),
404 __Ordered<Item2>(item2),
406 __Ordered<Item3>(item3),
VectorType::Scalar value(const VectorType &coefs, typename VectorType::Scalar cos_elevation, typename VectorType::Scalar cos_azimuth, typename VectorType::Scalar sin_azimuth, int lmax)
void run_queue(Source &&source, const Item &item, Sink &&sink, size_t capacity=128)
convenience function to set up and run a 2-stage multi-threaded pipeline.
void run_ordered_queue(Source &&source, const Item1 &item1, Pipe &&pipe, const Item2 &item2, Sink &&sink, size_t capacity=128)
std::remove_reference< Functor >::type & functor
#define MRTRIX_QUEUE_DEFAULT_CAPACITY