Developer documentation
Version 3.0.3-105-gd3941f44
ordered_thread_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2008-2022 the MRtrix3 contributors.
2 *
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 *
7 * Covered Software is provided under this License on an "as is"
8 * basis, without warranty of any kind, either expressed, implied, or
9 * statutory, including, without limitation, warranties that the
10 * Covered Software is free of defects, merchantable, fit for a
11 * particular purpose or non-infringing.
12 * See the Mozilla Public License v. 2.0 for more details.
13 *
14 * For more details, see http://www.mrtrix.org/.
15 */
16
17#ifndef __mrtrix_ordered_thread_queue_h__
18#define __mrtrix_ordered_thread_queue_h__
19
20#include <set>
21#include "thread_queue.h"
22
23namespace MR {
24 namespace Thread {
25
26
27
28
29 namespace {
30
31 template <class Item>
32 class __Ordered { MEMALIGN(__Ordered<Item>)
33 public:
34 __Ordered () = default;
35 __Ordered (const Item& item) : item (item) { }
36
37 Item item;
38 size_t index;
39 };
40
41 struct CompareItems { NOMEMALIGN
42 template <class Item>
43 bool operator() (const __Ordered<Item>* a, const __Ordered<Item>* b) const { return a->index < b->index; }
44 };
45
46
47
48
49 template <class JobType> struct job_is_single_threaded : std::true_type { NOMEMALIGN };
50 template <class JobType> struct job_is_single_threaded< __Multi<JobType>> : std::false_type { NOMEMALIGN };
51
52
53 template <class Item> struct __batch_size <__Ordered<__Batch<Item>>> { NOMEMALIGN
54 __batch_size (const __Ordered<__Batch<Item>>& item) : n (item.item.num) { }
55 operator size_t () const { return n; }
56 const size_t n;
57 };
58
59
60
61
62 /***********************************************************************
63 * Source/Pipe/Sink for UNBATCHED ordered queue *
64 ***********************************************************************/
65
66
67 template <class Item> struct Type<__Ordered<Item>> { NOMEMALIGN
68 using item = Item;
69 using queue = Queue<__Ordered<Item>>;
70 using reader = typename queue::Reader;
71 using writer = typename queue::Writer;
72 using read_item = typename reader::Item;
73 using write_item = typename writer::Item;
74 };
75
76
77 template <class Item, class Functor>
78 struct __Source<__Ordered<Item>,Functor> {
79 MEMALIGN(__Source<__Ordered<Item>,Functor>)
80
81 using queued_t = __Ordered<Item>;
82 using queue_t = typename Type<queued_t>::queue;
83 using writer_t = typename Type<queued_t>::writer;
84 using functor_t = typename __job<Functor>::member_type;
85
86 writer_t writer;
87 functor_t func;
88 size_t batch_size;
89
90 __Source (queue_t& queue, Functor& functor, const queued_t& item) :
91 writer (queue),
92 func (__job<Functor>::functor (functor)),
93 batch_size (__batch_size<queued_t> (item)) { }
94
95 void execute () {
96 size_t count = 0;
97 auto out = writer.placeholder();
98 do {
99 if (!func (out->item))
100 break;
101 out->index = count++;
102 } while (out.write());
103 }
104 };
105
106
107
108
109
110
111
112 template <class Item1, class Functor, class Item2>
113 struct __Pipe<__Ordered<Item1>,Functor,__Ordered<Item2>> {
114 MEMALIGN(__Pipe<__Ordered<Item1>,Functor,__Ordered<Item2>>)
115
116 using queued1_t = __Ordered<Item1>;
117 using queued2_t = __Ordered<Item2>;
118 using queue1_t = typename Type<queued1_t>::queue;
119 using queue2_t = typename Type<queued2_t>::queue;
120 using reader_t = typename Type<queued1_t>::reader;
121 using writer_t = typename Type<queued2_t>::writer;
122 using functor_t = typename __job<Functor>::member_type;
123
124 reader_t reader;
125 writer_t writer;
126 functor_t func;
127 const size_t batch_size;
128
129 __Pipe (queue1_t& queue_in, Functor& functor, queue2_t& queue_out, const queued2_t& item2) :
130 reader (queue_in),
131 writer (queue_out),
132 func (__job<Functor>::functor (functor)),
133 batch_size (__batch_size<queued2_t> (item2)) { }
134
135 void execute () {
136 auto in = reader.placeholder();
137 auto out = writer.placeholder();
138 while (in.read()) {
139 if (!func (in->item, out->item))
140 break;
141 out->index = in->index;
142 out.write();
143 }
144 }
145
146 };
147
148
149
150
151
152
153 template <class Item, class Functor>
154 struct __Sink<__Ordered<Item>,Functor> {
155 MEMALIGN(__Sink<__Ordered<Item>,Functor>)
156
157 using queued_t = __Ordered<Item>;
158 using queue_t = typename Type<queued_t>::queue;
159 using reader_t = typename Type<queued_t>::reader;
160 using functor_t = typename __job<Functor>::member_type;
161
162 reader_t reader;
163 functor_t func;
164
165 __Sink (queue_t& queue, Functor& functor) :
166 reader (queue),
167 func (__job<Functor>::functor (functor)) { }
168
169 void execute () {
170 size_t expected = 0;
171 auto in = reader.placeholder();
172 std::set<queued_t*,CompareItems> buffer;
173 while (in.read()) {
174 if (in->index > expected) {
175 buffer.emplace (in.stash());
176 continue;
177 }
178 if (!func (in->item))
179 return;
180 ++expected;
181 while (!buffer.empty() && (*buffer.begin())->index <= expected) {
182 if (!func ((*buffer.begin())->item))
183 return;
184 in.recycle (*buffer.begin());
185 buffer.erase (buffer.begin());
186 ++expected;
187 }
188 }
189 }
190 };
191
192
193
194
195
196
197
198
199 /***********************************************************************
200 * Source/Pipe/Sink for BATCHED ordered queue *
201 ***********************************************************************/
202
203 template <class Item> struct Type<__Ordered<__Batch<Item>>> { NOMEMALIGN
204 using item = Item;
205 using queue = Queue<__Ordered<vector<Item>>>;
206 using reader = typename queue::Reader;
207 using writer = typename queue::Writer;
208 using read_item = typename reader::Item;
209 using write_item = typename writer::Item;
210 };
211
212
213
214 template <class Item, class Functor>
215 struct __Source<__Ordered<__Batch<Item>>,Functor> {
216 MEMALIGN(__Source<__Ordered<__Batch<Item>>,Functor>)
217
218 using queued_t = __Ordered<vector<Item>>;
219 using passed_t = __Ordered<__Batch<Item>>;
220 using queue_t = typename Type<queued_t>::queue;
221 using writer_t = typename Type<queued_t>::writer;
222 using functor_t = typename __job<Functor>::member_type;
223
224 writer_t writer;
225 functor_t func;
226 size_t batch_size;
227
228 __Source (queue_t& queue, Functor& functor, const passed_t& item) :
229 writer (queue),
230 func (__job<Functor>::functor (functor)),
231 batch_size (__batch_size<passed_t> (item)) { }
232
233 void execute () {
234 size_t count = 0;
235 auto out = writer.placeholder();
236 bool stop = false;
237 do {
238 out->item.resize (batch_size);
239 for (size_t n = 0; n < batch_size; ++n) {
240 if (!func (out->item[n])) {
241 out->item.resize(n);
242 stop = true;
243 break;
244 }
245 }
246 out->index = count++;
247 } while (out.write() && !stop);
248 }
249 };
250
251
252
253
254
255
256
257 template <class Item1, class Functor, class Item2>
258 struct __Pipe<__Ordered<__Batch<Item1>>,Functor,__Ordered<__Batch<Item2>>> {
259 MEMALIGN(__Pipe<__Ordered<__Batch<Item1>>,Functor,__Ordered<__Batch<Item2>>>)
260
261 using queued1_t = __Ordered<vector<Item1>>;
262 using queued2_t = __Ordered<vector<Item2>>;
263 using passed2_t = __Ordered<__Batch<Item2>>;
264 using queue1_t = typename Type<queued1_t>::queue;
265 using queue2_t = typename Type<queued2_t>::queue;
266 using reader_t = typename Type<queued1_t>::reader;
267 using writer_t = typename Type<queued2_t>::writer;
268 using functor_t = typename __job<Functor>::member_type;
269
270 reader_t reader;
271 writer_t writer;
272 functor_t func;
273 const size_t batch_size;
274
275 __Pipe (queue1_t& queue_in, Functor& functor, queue2_t& queue_out, const passed2_t& item2) :
276 reader (queue_in),
277 writer (queue_out),
278 func (__job<Functor>::functor (functor)),
279 batch_size (__batch_size<passed2_t> (item2)) { }
280
281 void execute () {
282 auto in = reader.placeholder();
283 auto out = writer.placeholder();
284 while (in.read()) {
285 out->item.resize (in->item.size());
286 size_t k = 0;
287 for (size_t n = 0; n < in->item.size(); ++n) {
288 if (func (in->item[n], out->item[k]))
289 ++k;
290 }
291 out->item.resize (k);
292 out->index = in->index;
293 if (!out.write())
294 return;
295 }
296 }
297
298 };
299
300
301
302
303
304
305 template <class Item, class Functor>
306 struct __Sink<__Ordered<__Batch<Item>>,Functor> {
307 MEMALIGN(__Sink<__Ordered<__Batch<Item>>,Functor>)
308
309 using queued_t = __Ordered<vector<Item>>;
310 using queue_t = typename Type<queued_t>::queue;
311 using reader_t = typename Type<queued_t>::reader;
312 using functor_t = typename __job<Functor>::member_type;
313
314 reader_t reader;
315 functor_t func;
316
317 __Sink (queue_t& queue, Functor& functor) :
318 reader (queue),
319 func (__job<Functor>::functor (functor)) { }
320
321 void execute () {
322 size_t expected = 0;
323 auto in = reader.placeholder();
324 std::set<queued_t*,CompareItems> buffer;
325 while (in.read()) {
326 if (in->index > expected) {
327 buffer.emplace (in.stash());
328 continue;
329 }
330 for (size_t n = 0; n < in->item.size(); ++n)
331 if (!func (in->item[n]))
332 return;
333 ++expected;
334 while (!buffer.empty() && (*buffer.begin())->index <= expected) {
335 for (size_t n = 0; n < (*buffer.begin())->item.size(); ++n)
336 if (!func ((*buffer.begin())->item[n]))
337 return;
338 in.recycle (*buffer.begin());
339 buffer.erase (buffer.begin());
340 ++expected;
341 }
342 }
343 }
344 };
345
346
347
348
349
350 }
351
352
353
354
355 template <class Source, class Item1, class Pipe, class Item2, class Sink>
356 inline void run_ordered_queue (
357 Source&& source,
358 const Item1& item1,
359 Pipe&& pipe,
360 const Item2& item2,
361 Sink&& sink,
362 size_t capacity = MRTRIX_QUEUE_DEFAULT_CAPACITY)
363 {
365 "run_ordered_queue can only run with single-threaded source & sink");
366
367 if (__batch_size<Item1>(item1) != __batch_size<Item2>(item2))
368 throw Exception ("Thread::run_ordered_queue must be run with matching batch sizes across all stages");
369
370 run_queue (
371 std::move (source),
372 __Ordered<Item1>(item1),
373 std::move (pipe),
374 __Ordered<Item2>(item2),
375 std::move (sink),
376 capacity);
377 }
378
379
380
381
382 template <class Source, class Item1, class Pipe1, class Item2, class Pipe2, class Item3, class Sink>
383 inline void run_ordered_queue (
384 Source&& source,
385 const Item1& item1,
386 Pipe1&& pipe1,
387 const Item2& item2,
388 Pipe2&& pipe2,
389 const Item3& item3,
390 Sink&& sink,
391 size_t capacity = MRTRIX_QUEUE_DEFAULT_CAPACITY)
392 {
394 "run_ordered_queue can only run with single-threaded source & sink");
395
396 if (__batch_size<Item1>(item1) != __batch_size<Item2>(item2) ||
397 __batch_size<Item1>(item1) != __batch_size<Item3>(item3))
398 throw Exception ("Thread::run_ordered_queue must be run with matching batch sizes across all stages");
399
400 run_queue (
401 std::move (source),
402 __Ordered<Item1>(item1),
403 std::move (pipe1),
404 __Ordered<Item2>(item2),
405 std::move (pipe2),
406 __Ordered<Item3>(item3),
407 std::move (sink),
408 capacity);
409 }
410
411
412 }
413}
414
415#endif
416
VectorType::Scalar value(const VectorType &coefs, typename VectorType::Scalar cos_elevation, typename VectorType::Scalar cos_azimuth, typename VectorType::Scalar sin_azimuth, int lmax)
Definition: SH.h:233
void run_queue(Source &&source, const Item &item, Sink &&sink, size_t capacity=128)
convenience function to set up and run a 2-stage multi-threaded pipeline.
#define NOMEMALIGN
Definition: memory.h:22
void run_ordered_queue(Source &&source, const Item1 &item1, Pipe &&pipe, const Item2 &item2, Sink &&sink, size_t capacity=128)
Definition: base.h:24
Item item
size_t index
#define MEMALIGN(...)
Definition: types.h:185
size_t num
Definition: thread.h:216
std::remove_reference< Functor >::type & functor
Definition: thread.h:215
#define MRTRIX_QUEUE_DEFAULT_CAPACITY
Definition: thread_queue.h:27