OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
 
Loading...
Searching...
No Matches
agent_rle.cuh
Go to the documentation of this file.
1/******************************************************************************
2 * Copyright (c) 2011, Duane Merrill. All rights reserved.
3 * Copyright (c) 2011-2018, NVIDIA CORPORATION. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the NVIDIA CORPORATION nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 *
27 ******************************************************************************/
28
34#pragma once
35
36#include <iterator>
37
39#include "../block/block_load.cuh"
40#include "../block/block_store.cuh"
41#include "../block/block_scan.cuh"
42#include "../block/block_exchange.cuh"
43#include "../block/block_discontinuity.cuh"
44#include "../grid/grid_queue.cuh"
45#include "../iterator/cache_modified_input_iterator.cuh"
46#include "../iterator/constant_input_iterator.cuh"
47#include "../util_namespace.cuh"
48
50CUB_NS_PREFIX
51
53namespace cub {
54
55
56/******************************************************************************
57 * Tuning policy types
58 ******************************************************************************/
59
63template <
64 int _BLOCK_THREADS,
65 int _ITEMS_PER_THREAD,
66 BlockLoadAlgorithm _LOAD_ALGORITHM,
67 CacheLoadModifier _LOAD_MODIFIER,
68 bool _STORE_WARP_TIME_SLICING,
69 BlockScanAlgorithm _SCAN_ALGORITHM>
71{
72 enum
73 {
74 BLOCK_THREADS = _BLOCK_THREADS,
75 ITEMS_PER_THREAD = _ITEMS_PER_THREAD,
76 STORE_WARP_TIME_SLICING = _STORE_WARP_TIME_SLICING,
77 };
78
79 static const BlockLoadAlgorithm LOAD_ALGORITHM = _LOAD_ALGORITHM;
80 static const CacheLoadModifier LOAD_MODIFIER = _LOAD_MODIFIER;
81 static const BlockScanAlgorithm SCAN_ALGORITHM = _SCAN_ALGORITHM;
82};
83
84
85
86
87
88/******************************************************************************
89 * Thread block abstractions
90 ******************************************************************************/
91
95template <
96 typename AgentRlePolicyT,
97 typename InputIteratorT,
98 typename OffsetsOutputIteratorT,
99 typename LengthsOutputIteratorT,
100 typename EqualityOpT,
101 typename OffsetT>
103{
104 //---------------------------------------------------------------------
105 // Types and constants
106 //---------------------------------------------------------------------
107
109 typedef typename std::iterator_traits<InputIteratorT>::value_type T;
110
112 typedef typename If<(Equals<typename std::iterator_traits<LengthsOutputIteratorT>::value_type, void>::VALUE), // LengthT = (if output iterator's value type is void) ?
113 OffsetT, // ... then the OffsetT type,
114 typename std::iterator_traits<LengthsOutputIteratorT>::value_type>::Type LengthT; // ... else the output iterator's value type
115
118
121
122 // Constants
123 enum
124 {
125 WARP_THREADS = CUB_WARP_THREADS(PTX_ARCH),
126 BLOCK_THREADS = AgentRlePolicyT::BLOCK_THREADS,
127 ITEMS_PER_THREAD = AgentRlePolicyT::ITEMS_PER_THREAD,
128 WARP_ITEMS = WARP_THREADS * ITEMS_PER_THREAD,
129 TILE_ITEMS = BLOCK_THREADS * ITEMS_PER_THREAD,
130 WARPS = (BLOCK_THREADS + WARP_THREADS - 1) / WARP_THREADS,
131
133 SYNC_AFTER_LOAD = (AgentRlePolicyT::LOAD_ALGORITHM != BLOCK_LOAD_DIRECT),
134
136 STORE_WARP_TIME_SLICING = AgentRlePolicyT::STORE_WARP_TIME_SLICING,
137 ACTIVE_EXCHANGE_WARPS = (STORE_WARP_TIME_SLICING) ? 1 : WARPS,
138 };
139
140
146 template <bool LAST_TILE>
148 {
149 OffsetT num_remaining;
150 EqualityOpT equality_op;
151
152 __device__ __forceinline__ OobInequalityOp(
153 OffsetT num_remaining,
154 EqualityOpT equality_op)
155 :
156 num_remaining(num_remaining),
158 {}
159
160 template <typename Index>
161 __host__ __device__ __forceinline__ bool operator()(T first, T second, Index idx)
162 {
163 if (!LAST_TILE || (idx < num_remaining))
164 return !equality_op(first, second);
165 else
166 return true;
167 }
168 };
169
170
171 // Cache-modified Input iterator wrapper type (for applying cache modifier) for data
173 CacheModifiedInputIterator<AgentRlePolicyT::LOAD_MODIFIER, T, OffsetT>, // Wrap the native input pointer with CacheModifiedVLengthnputIterator
174 InputIteratorT>::Type // Directly use the supplied input iterator type
175 WrappedInputIteratorT;
176
177 // Parameterized BlockLoad type for data
178 typedef BlockLoad<
179 T,
180 AgentRlePolicyT::BLOCK_THREADS,
181 AgentRlePolicyT::ITEMS_PER_THREAD,
182 AgentRlePolicyT::LOAD_ALGORITHM>
184
185 // Parameterized BlockDiscontinuity type for data
187
188 // Parameterized WarpScan type
190
191 // Reduce-length-by-run scan operator
193
194 // Callback type for obtaining tile prefix during block scan
195 typedef TilePrefixCallbackOp<
200
201 // Warp exchange types
203
205
208
209 typedef LengthOffsetPair WarpAggregates[WARPS];
210
211 // Shared memory type for this thread block
213 {
214 // Aliasable storage layout
216 {
217 struct
218 {
219 typename BlockDiscontinuityT::TempStorage discontinuity; // Smem needed for discontinuity detection
220 typename WarpScanPairs::TempStorage warp_scan[WARPS]; // Smem needed for warp-synchronous scans
221 Uninitialized<LengthOffsetPair[WARPS]> warp_aggregates; // Smem needed for sharing warp-wide aggregates
222 typename TilePrefixCallbackOpT::TempStorage prefix; // Smem needed for cooperative prefix callback
223 };
224
225 // Smem needed for input loading
226 typename BlockLoadT::TempStorage load;
227
228 // Aliasable layout needed for two-phase scatter
230 {
231 unsigned long long align;
232 WarpExchangePairsStorage exchange_pairs[ACTIVE_EXCHANGE_WARPS];
233 typename WarpExchangeOffsets::TempStorage exchange_offsets[ACTIVE_EXCHANGE_WARPS];
234 typename WarpExchangeLengths::TempStorage exchange_lengths[ACTIVE_EXCHANGE_WARPS];
235
236 } scatter_aliasable;
237
238 } aliasable;
239
240 OffsetT tile_idx; // Shared tile index
241 LengthOffsetPair tile_inclusive; // Inclusive tile prefix
242 LengthOffsetPair tile_exclusive; // Exclusive tile prefix
243 };
244
245 // Alias wrapper allowing storage to be unioned
246 struct TempStorage : Uninitialized<_TempStorage> {};
247
248
249 //---------------------------------------------------------------------
250 // Per-thread fields
251 //---------------------------------------------------------------------
252
254
255 WrappedInputIteratorT d_in;
256 OffsetsOutputIteratorT d_offsets_out;
257 LengthsOutputIteratorT d_lengths_out;
258
259 EqualityOpT equality_op;
262
263
264 //---------------------------------------------------------------------
265 // Constructor
266 //---------------------------------------------------------------------
267
268 // Constructor
269 __device__ __forceinline__
272 InputIteratorT d_in,
273 OffsetsOutputIteratorT d_offsets_out,
274 LengthsOutputIteratorT d_lengths_out,
275 EqualityOpT equality_op,
277 :
278 temp_storage(temp_storage.Alias()),
279 d_in(d_in),
283 scan_op(cub::Sum()),
285 {}
286
287
288 //---------------------------------------------------------------------
289 // Utility methods for initializing the selections
290 //---------------------------------------------------------------------
291
292 template <bool FIRST_TILE, bool LAST_TILE>
293 __device__ __forceinline__ void InitializeSelections(
294 OffsetT tile_offset,
295 OffsetT num_remaining,
296 T (&items)[ITEMS_PER_THREAD],
297 LengthOffsetPair (&lengths_and_num_runs)[ITEMS_PER_THREAD])
298 {
299 bool head_flags[ITEMS_PER_THREAD];
300 bool tail_flags[ITEMS_PER_THREAD];
301
302 OobInequalityOp<LAST_TILE> inequality_op(num_remaining, equality_op);
303
304 if (FIRST_TILE && LAST_TILE)
305 {
306 // First-and-last-tile always head-flags the first item and tail-flags the last item
307
308 BlockDiscontinuityT(temp_storage.aliasable.discontinuity).FlagHeadsAndTails(
309 head_flags, tail_flags, items, inequality_op);
310 }
311 else if (FIRST_TILE)
312 {
313 // First-tile always head-flags the first item
314
315 // Get the first item from the next tile
316 T tile_successor_item;
317 if (threadIdx.x == BLOCK_THREADS - 1)
318 tile_successor_item = d_in[tile_offset + TILE_ITEMS];
319
320 BlockDiscontinuityT(temp_storage.aliasable.discontinuity).FlagHeadsAndTails(
321 head_flags, tail_flags, tile_successor_item, items, inequality_op);
322 }
323 else if (LAST_TILE)
324 {
325 // Last-tile always flags the last item
326
327 // Get the last item from the previous tile
328 T tile_predecessor_item;
329 if (threadIdx.x == 0)
330 tile_predecessor_item = d_in[tile_offset - 1];
331
332 BlockDiscontinuityT(temp_storage.aliasable.discontinuity).FlagHeadsAndTails(
333 head_flags, tile_predecessor_item, tail_flags, items, inequality_op);
334 }
335 else
336 {
337 // Get the first item from the next tile
338 T tile_successor_item;
339 if (threadIdx.x == BLOCK_THREADS - 1)
340 tile_successor_item = d_in[tile_offset + TILE_ITEMS];
341
342 // Get the last item from the previous tile
343 T tile_predecessor_item;
344 if (threadIdx.x == 0)
345 tile_predecessor_item = d_in[tile_offset - 1];
346
347 BlockDiscontinuityT(temp_storage.aliasable.discontinuity).FlagHeadsAndTails(
348 head_flags, tile_predecessor_item, tail_flags, tile_successor_item, items, inequality_op);
349 }
350
351 // Zip counts and runs
352 #pragma unroll
353 for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
354 {
355 lengths_and_num_runs[ITEM].key = head_flags[ITEM] && (!tail_flags[ITEM]);
356 lengths_and_num_runs[ITEM].value = ((!head_flags[ITEM]) || (!tail_flags[ITEM]));
357 }
358 }
359
360 //---------------------------------------------------------------------
361 // Scan utility methods
362 //---------------------------------------------------------------------
363
367 __device__ __forceinline__ void WarpScanAllocations(
368 LengthOffsetPair &tile_aggregate,
369 LengthOffsetPair &warp_aggregate,
370 LengthOffsetPair &warp_exclusive_in_tile,
371 LengthOffsetPair &thread_exclusive_in_warp,
372 LengthOffsetPair (&lengths_and_num_runs)[ITEMS_PER_THREAD])
373 {
374 // Perform warpscans
375 unsigned int warp_id = ((WARPS == 1) ? 0 : threadIdx.x / WARP_THREADS);
376 int lane_id = LaneId();
377
378 LengthOffsetPair identity;
379 identity.key = 0;
380 identity.value = 0;
381
382 LengthOffsetPair thread_inclusive;
383 LengthOffsetPair thread_aggregate = internal::ThreadReduce(lengths_and_num_runs, scan_op);
384 WarpScanPairs(temp_storage.aliasable.warp_scan[warp_id]).Scan(
385 thread_aggregate,
386 thread_inclusive,
387 thread_exclusive_in_warp,
388 identity,
389 scan_op);
390
391 // Last lane in each warp shares its warp-aggregate
392 if (lane_id == WARP_THREADS - 1)
393 temp_storage.aliasable.warp_aggregates.Alias()[warp_id] = thread_inclusive;
394
395 CTA_SYNC();
396
397 // Accumulate total selected and the warp-wide prefix
398 warp_exclusive_in_tile = identity;
399 warp_aggregate = temp_storage.aliasable.warp_aggregates.Alias()[warp_id];
400 tile_aggregate = temp_storage.aliasable.warp_aggregates.Alias()[0];
401
402 #pragma unroll
403 for (int WARP = 1; WARP < WARPS; ++WARP)
404 {
405 if (warp_id == WARP)
406 warp_exclusive_in_tile = tile_aggregate;
407
408 tile_aggregate = scan_op(tile_aggregate, temp_storage.aliasable.warp_aggregates.Alias()[WARP]);
409 }
410 }
411
412
413 //---------------------------------------------------------------------
414 // Utility methods for scattering selections
415 //---------------------------------------------------------------------
416
420 template <bool FIRST_TILE>
421 __device__ __forceinline__ void ScatterTwoPhase(
422 OffsetT tile_num_runs_exclusive_in_global,
423 OffsetT warp_num_runs_aggregate,
424 OffsetT warp_num_runs_exclusive_in_tile,
425 OffsetT (&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD],
426 LengthOffsetPair (&lengths_and_offsets)[ITEMS_PER_THREAD],
427 Int2Type<true> is_warp_time_slice)
428 {
429 unsigned int warp_id = ((WARPS == 1) ? 0 : threadIdx.x / WARP_THREADS);
430 int lane_id = LaneId();
431
432 // Locally compact items within the warp (first warp)
433 if (warp_id == 0)
434 {
435 WarpExchangePairs(temp_storage.aliasable.scatter_aliasable.exchange_pairs[0]).ScatterToStriped(
436 lengths_and_offsets, thread_num_runs_exclusive_in_warp);
437 }
438
439 // Locally compact items within the warp (remaining warps)
440 #pragma unroll
441 for (int SLICE = 1; SLICE < WARPS; ++SLICE)
442 {
443 CTA_SYNC();
444
445 if (warp_id == SLICE)
446 {
447 WarpExchangePairs(temp_storage.aliasable.scatter_aliasable.exchange_pairs[0]).ScatterToStriped(
448 lengths_and_offsets, thread_num_runs_exclusive_in_warp);
449 }
450 }
451
452 // Global scatter
453 #pragma unroll
454 for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
455 {
456 if ((ITEM * WARP_THREADS) < warp_num_runs_aggregate - lane_id)
457 {
458 OffsetT item_offset =
459 tile_num_runs_exclusive_in_global +
460 warp_num_runs_exclusive_in_tile +
461 (ITEM * WARP_THREADS) + lane_id;
462
463 // Scatter offset
464 d_offsets_out[item_offset] = lengths_and_offsets[ITEM].key;
465
466 // Scatter length if not the first (global) length
467 if ((!FIRST_TILE) || (ITEM != 0) || (threadIdx.x > 0))
468 {
469 d_lengths_out[item_offset - 1] = lengths_and_offsets[ITEM].value;
470 }
471 }
472 }
473 }
474
475
479 template <bool FIRST_TILE>
480 __device__ __forceinline__ void ScatterTwoPhase(
481 OffsetT tile_num_runs_exclusive_in_global,
482 OffsetT warp_num_runs_aggregate,
483 OffsetT warp_num_runs_exclusive_in_tile,
484 OffsetT (&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD],
485 LengthOffsetPair (&lengths_and_offsets)[ITEMS_PER_THREAD],
486 Int2Type<false> is_warp_time_slice)
487 {
488 unsigned int warp_id = ((WARPS == 1) ? 0 : threadIdx.x / WARP_THREADS);
489 int lane_id = LaneId();
490
491 // Unzip
492 OffsetT run_offsets[ITEMS_PER_THREAD];
493 LengthT run_lengths[ITEMS_PER_THREAD];
494
495 #pragma unroll
496 for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
497 {
498 run_offsets[ITEM] = lengths_and_offsets[ITEM].key;
499 run_lengths[ITEM] = lengths_and_offsets[ITEM].value;
500 }
501
502 WarpExchangeOffsets(temp_storage.aliasable.scatter_aliasable.exchange_offsets[warp_id]).ScatterToStriped(
503 run_offsets, thread_num_runs_exclusive_in_warp);
504
505 WARP_SYNC(0xffffffff);
506
507 WarpExchangeLengths(temp_storage.aliasable.scatter_aliasable.exchange_lengths[warp_id]).ScatterToStriped(
508 run_lengths, thread_num_runs_exclusive_in_warp);
509
510 // Global scatter
511 #pragma unroll
512 for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
513 {
514 if ((ITEM * WARP_THREADS) + lane_id < warp_num_runs_aggregate)
515 {
516 OffsetT item_offset =
517 tile_num_runs_exclusive_in_global +
518 warp_num_runs_exclusive_in_tile +
519 (ITEM * WARP_THREADS) + lane_id;
520
521 // Scatter offset
522 d_offsets_out[item_offset] = run_offsets[ITEM];
523
524 // Scatter length if not the first (global) length
525 if ((!FIRST_TILE) || (ITEM != 0) || (threadIdx.x > 0))
526 {
527 d_lengths_out[item_offset - 1] = run_lengths[ITEM];
528 }
529 }
530 }
531 }
532
533
537 template <bool FIRST_TILE>
538 __device__ __forceinline__ void ScatterDirect(
539 OffsetT tile_num_runs_exclusive_in_global,
540 OffsetT warp_num_runs_aggregate,
541 OffsetT warp_num_runs_exclusive_in_tile,
542 OffsetT (&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD],
543 LengthOffsetPair (&lengths_and_offsets)[ITEMS_PER_THREAD])
544 {
545 #pragma unroll
546 for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
547 {
548 if (thread_num_runs_exclusive_in_warp[ITEM] < warp_num_runs_aggregate)
549 {
550 OffsetT item_offset =
551 tile_num_runs_exclusive_in_global +
552 warp_num_runs_exclusive_in_tile +
553 thread_num_runs_exclusive_in_warp[ITEM];
554
555 // Scatter offset
556 d_offsets_out[item_offset] = lengths_and_offsets[ITEM].key;
557
558 // Scatter length if not the first (global) length
559 if (item_offset >= 1)
560 {
561 d_lengths_out[item_offset - 1] = lengths_and_offsets[ITEM].value;
562 }
563 }
564 }
565 }
566
567
571 template <bool FIRST_TILE>
572 __device__ __forceinline__ void Scatter(
573 OffsetT tile_num_runs_aggregate,
574 OffsetT tile_num_runs_exclusive_in_global,
575 OffsetT warp_num_runs_aggregate,
576 OffsetT warp_num_runs_exclusive_in_tile,
577 OffsetT (&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD],
578 LengthOffsetPair (&lengths_and_offsets)[ITEMS_PER_THREAD])
579 {
580 if ((ITEMS_PER_THREAD == 1) || (tile_num_runs_aggregate < BLOCK_THREADS))
581 {
582 // Direct scatter if the warp has any items
583 if (warp_num_runs_aggregate)
584 {
585 ScatterDirect<FIRST_TILE>(
586 tile_num_runs_exclusive_in_global,
587 warp_num_runs_aggregate,
588 warp_num_runs_exclusive_in_tile,
589 thread_num_runs_exclusive_in_warp,
590 lengths_and_offsets);
591 }
592 }
593 else
594 {
595 // Scatter two phase
596 ScatterTwoPhase<FIRST_TILE>(
597 tile_num_runs_exclusive_in_global,
598 warp_num_runs_aggregate,
599 warp_num_runs_exclusive_in_tile,
600 thread_num_runs_exclusive_in_warp,
601 lengths_and_offsets,
603 }
604 }
605
606
607
608 //---------------------------------------------------------------------
609 // Cooperatively scan a device-wide sequence of tiles with other CTAs
610 //---------------------------------------------------------------------
611
615 template <
616 bool LAST_TILE>
617 __device__ __forceinline__ LengthOffsetPair ConsumeTile(
619 OffsetT num_remaining,
620 int tile_idx,
621 OffsetT tile_offset,
623 {
624 if (tile_idx == 0)
625 {
626 // First tile
627
628 // Load items
629 T items[ITEMS_PER_THREAD];
630 if (LAST_TILE)
631 BlockLoadT(temp_storage.aliasable.load).Load(d_in + tile_offset, items, num_remaining, T());
632 else
633 BlockLoadT(temp_storage.aliasable.load).Load(d_in + tile_offset, items);
634
635 if (SYNC_AFTER_LOAD)
636 CTA_SYNC();
637
638 // Set flags
639 LengthOffsetPair lengths_and_num_runs[ITEMS_PER_THREAD];
640
641 InitializeSelections<true, LAST_TILE>(
642 tile_offset,
643 num_remaining,
644 items,
645 lengths_and_num_runs);
646
647 // Exclusive scan of lengths and runs
648 LengthOffsetPair tile_aggregate;
649 LengthOffsetPair warp_aggregate;
650 LengthOffsetPair warp_exclusive_in_tile;
651 LengthOffsetPair thread_exclusive_in_warp;
652
654 tile_aggregate,
655 warp_aggregate,
656 warp_exclusive_in_tile,
657 thread_exclusive_in_warp,
658 lengths_and_num_runs);
659
660 // Update tile status if this is not the last tile
661 if (!LAST_TILE && (threadIdx.x == 0))
662 tile_status.SetInclusive(0, tile_aggregate);
663
664 // Update thread_exclusive_in_warp to fold in warp run-length
665 if (thread_exclusive_in_warp.key == 0)
666 thread_exclusive_in_warp.value += warp_exclusive_in_tile.value;
667
668 LengthOffsetPair lengths_and_offsets[ITEMS_PER_THREAD];
669 OffsetT thread_num_runs_exclusive_in_warp[ITEMS_PER_THREAD];
670 LengthOffsetPair lengths_and_num_runs2[ITEMS_PER_THREAD];
671
672 // Downsweep scan through lengths_and_num_runs
673 internal::ThreadScanExclusive(lengths_and_num_runs, lengths_and_num_runs2, scan_op, thread_exclusive_in_warp);
674
675 // Zip
676
677 #pragma unroll
678 for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
679 {
680 lengths_and_offsets[ITEM].value = lengths_and_num_runs2[ITEM].value;
681 lengths_and_offsets[ITEM].key = tile_offset + (threadIdx.x * ITEMS_PER_THREAD) + ITEM;
682 thread_num_runs_exclusive_in_warp[ITEM] = (lengths_and_num_runs[ITEM].key) ?
683 lengths_and_num_runs2[ITEM].key : // keep
684 WARP_THREADS * ITEMS_PER_THREAD; // discard
685 }
686
687 OffsetT tile_num_runs_aggregate = tile_aggregate.key;
688 OffsetT tile_num_runs_exclusive_in_global = 0;
689 OffsetT warp_num_runs_aggregate = warp_aggregate.key;
690 OffsetT warp_num_runs_exclusive_in_tile = warp_exclusive_in_tile.key;
691
692 // Scatter
693 Scatter<true>(
694 tile_num_runs_aggregate,
695 tile_num_runs_exclusive_in_global,
696 warp_num_runs_aggregate,
697 warp_num_runs_exclusive_in_tile,
698 thread_num_runs_exclusive_in_warp,
699 lengths_and_offsets);
700
701 // Return running total (inclusive of this tile)
702 return tile_aggregate;
703 }
704 else
705 {
706 // Not first tile
707
708 // Load items
709 T items[ITEMS_PER_THREAD];
710 if (LAST_TILE)
711 BlockLoadT(temp_storage.aliasable.load).Load(d_in + tile_offset, items, num_remaining, T());
712 else
713 BlockLoadT(temp_storage.aliasable.load).Load(d_in + tile_offset, items);
714
715 if (SYNC_AFTER_LOAD)
716 CTA_SYNC();
717
718 // Set flags
719 LengthOffsetPair lengths_and_num_runs[ITEMS_PER_THREAD];
720
721 InitializeSelections<false, LAST_TILE>(
722 tile_offset,
723 num_remaining,
724 items,
725 lengths_and_num_runs);
726
727 // Exclusive scan of lengths and runs
728 LengthOffsetPair tile_aggregate;
729 LengthOffsetPair warp_aggregate;
730 LengthOffsetPair warp_exclusive_in_tile;
731 LengthOffsetPair thread_exclusive_in_warp;
732
734 tile_aggregate,
735 warp_aggregate,
736 warp_exclusive_in_tile,
737 thread_exclusive_in_warp,
738 lengths_and_num_runs);
739
740 // First warp computes tile prefix in lane 0
741 TilePrefixCallbackOpT prefix_op(tile_status, temp_storage.aliasable.prefix, Sum(), tile_idx);
742 unsigned int warp_id = ((WARPS == 1) ? 0 : threadIdx.x / WARP_THREADS);
743 if (warp_id == 0)
744 {
745 prefix_op(tile_aggregate);
746 if (threadIdx.x == 0)
747 temp_storage.tile_exclusive = prefix_op.exclusive_prefix;
748 }
749
750 CTA_SYNC();
751
752 LengthOffsetPair tile_exclusive_in_global = temp_storage.tile_exclusive;
753
754 // Update thread_exclusive_in_warp to fold in warp and tile run-lengths
755 LengthOffsetPair thread_exclusive = scan_op(tile_exclusive_in_global, warp_exclusive_in_tile);
756 if (thread_exclusive_in_warp.key == 0)
757 thread_exclusive_in_warp.value += thread_exclusive.value;
758
759 // Downsweep scan through lengths_and_num_runs
760 LengthOffsetPair lengths_and_num_runs2[ITEMS_PER_THREAD];
761 LengthOffsetPair lengths_and_offsets[ITEMS_PER_THREAD];
762 OffsetT thread_num_runs_exclusive_in_warp[ITEMS_PER_THREAD];
763
764 internal::ThreadScanExclusive(lengths_and_num_runs, lengths_and_num_runs2, scan_op, thread_exclusive_in_warp);
765
766 // Zip
767 #pragma unroll
768 for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
769 {
770 lengths_and_offsets[ITEM].value = lengths_and_num_runs2[ITEM].value;
771 lengths_and_offsets[ITEM].key = tile_offset + (threadIdx.x * ITEMS_PER_THREAD) + ITEM;
772 thread_num_runs_exclusive_in_warp[ITEM] = (lengths_and_num_runs[ITEM].key) ?
773 lengths_and_num_runs2[ITEM].key : // keep
774 WARP_THREADS * ITEMS_PER_THREAD; // discard
775 }
776
777 OffsetT tile_num_runs_aggregate = tile_aggregate.key;
778 OffsetT tile_num_runs_exclusive_in_global = tile_exclusive_in_global.key;
779 OffsetT warp_num_runs_aggregate = warp_aggregate.key;
780 OffsetT warp_num_runs_exclusive_in_tile = warp_exclusive_in_tile.key;
781
782 // Scatter
783 Scatter<false>(
784 tile_num_runs_aggregate,
785 tile_num_runs_exclusive_in_global,
786 warp_num_runs_aggregate,
787 warp_num_runs_exclusive_in_tile,
788 thread_num_runs_exclusive_in_warp,
789 lengths_and_offsets);
790
791 // Return running total (inclusive of this tile)
792 return prefix_op.inclusive_prefix;
793 }
794 }
795
796
800 template <typename NumRunsIteratorT>
801 __device__ __forceinline__ void ConsumeRange(
802 int num_tiles,
804 NumRunsIteratorT d_num_runs_out)
805 {
806 // Blocks are launched in increasing order, so just assign one tile per block
807 int tile_idx = (blockIdx.x * gridDim.y) + blockIdx.y; // Current tile index
808 OffsetT tile_offset = tile_idx * TILE_ITEMS; // Global offset for the current tile
809 OffsetT num_remaining = num_items - tile_offset; // Remaining items (including this tile)
810
811 if (tile_idx < num_tiles - 1)
812 {
813 // Not the last tile (full)
814 ConsumeTile<false>(num_items, num_remaining, tile_idx, tile_offset, tile_status);
815 }
816 else if (num_remaining > 0)
817 {
818 // The last tile (possibly partially-full)
819 LengthOffsetPair running_total = ConsumeTile<true>(num_items, num_remaining, tile_idx, tile_offset, tile_status);
820
821 if (threadIdx.x == 0)
822 {
823 // Output the total number of items selected
824 *d_num_runs_out = running_total.key;
825
826 // The inclusive prefix contains accumulated length reduction for the last run
827 if (running_total.key > 0)
828 d_lengths_out[running_total.key - 1] = running_total.value;
829 }
830 }
831 }
832};
833
834
835} // CUB namespace
836CUB_NS_POSTFIX // Optional outer namespace(s)
837
The BlockDiscontinuity class provides collective methods for flagging discontinuities within an order...
The BlockLoad class provides collective data movement methods for loading a linear segment of items f...
BlockRadixRank provides operations for ranking unsigned integer types within a CUDA thread block.
A random-access input wrapper for dereferencing array values using a PTX cache load modifier.
__device__ __forceinline__ void ScatterToStriped(T items[ITEMS_PER_THREAD], OffsetT ranks[ITEMS_PER_THREAD])
Exchanges valid data items annotated by rank into striped arrangement.
The WarpScan class provides collective methods for computing a parallel prefix scan of items partitio...
__device__ __forceinline__ void Scan(T input, T &inclusive_output, T &exclusive_output, ScanOp scan_op)
Computes both inclusive and exclusive prefix scans using the specified binary scan functor across the...
BlockLoadAlgorithm
cub::BlockLoadAlgorithm enumerates alternative algorithms for cub::BlockLoad to read a linear segment...
CacheLoadModifier
Enumeration of cache modifiers for memory load operations.
__device__ __forceinline__ void Load(InputIteratorT block_itr, InputT(&items)[ITEMS_PER_THREAD])
Load a linear segment of items from memory.
@ BLOCK_LOAD_DIRECT
__device__ __forceinline__ T ThreadScanExclusive(T inclusive, T exclusive, T *input, T *output, ScanOp scan_op, Int2Type< LENGTH >)
__device__ __forceinline__ void WARP_SYNC(unsigned int member_mask)
Definition util_ptx.cuh:273
__device__ __forceinline__ unsigned int LaneId()
Returns the warp lane ID of the calling thread.
Definition util_ptx.cuh:420
CTA_SYNC()
Definition util_ptx.cuh:255
__device__ __forceinline__ T ThreadReduce(T *input, ReductionOp reduction_op, T prefix, Int2Type< LENGTH >)
Optional outer namespace(s)
UniqueOutputIteratorT ValuesInputIteratorT AggregatesOutputIteratorT NumRunsOutputIteratorT d_num_runs_out
Pointer to total number of runs encountered (i.e., the length of d_unique_out)
OffsetsOutputIteratorT LengthsOutputIteratorT NumRunsOutputIteratorT ScanTileStateT EqualityOpT OffsetT int num_tiles
< [in] Total number of tiles for the entire problem
BlockScanAlgorithm
BlockScanAlgorithm enumerates alternative algorithms for cub::BlockScan to compute a parallel prefix ...
OffsetT OffsetT
[in] Total number of input data items
OffsetsOutputIteratorT LengthsOutputIteratorT NumRunsOutputIteratorT ScanTileStateT tile_status
[in] Tile status interface
UniqueOutputIteratorT ValuesInputIteratorT AggregatesOutputIteratorT NumRunsOutputIteratorT ScanTileStateT int EqualityOpT equality_op
KeyT equality operator.
< The BlockScan algorithm to use
Definition agent_rle.cuh:71
static const BlockScanAlgorithm SCAN_ALGORITHM
The BlockScan algorithm to use.
Definition agent_rle.cuh:81
static const BlockLoadAlgorithm LOAD_ALGORITHM
The BlockLoad algorithm to use.
Definition agent_rle.cuh:79
@ ITEMS_PER_THREAD
Items per thread (per tile of input)
Definition agent_rle.cuh:75
@ BLOCK_THREADS
Threads per thread block.
Definition agent_rle.cuh:74
@ STORE_WARP_TIME_SLICING
Whether or not only one warp's worth of shared memory should be allocated and time-sliced among block...
Definition agent_rle.cuh:76
static const CacheLoadModifier LOAD_MODIFIER
Cache load modifier for reading input elements.
Definition agent_rle.cuh:80
AgentRle implements a stateful abstraction of CUDA thread blocks for participating in device-wide run...
ReduceBySegmentOpT scan_op
Reduce-length-by-flag scan operator.
__device__ __forceinline__ AgentRle(TempStorage &temp_storage, InputIteratorT d_in, OffsetsOutputIteratorT d_offsets_out, LengthsOutputIteratorT d_lengths_out, EqualityOpT equality_op, OffsetT num_items)
__device__ __forceinline__ void ScatterTwoPhase(OffsetT tile_num_runs_exclusive_in_global, OffsetT warp_num_runs_aggregate, OffsetT warp_num_runs_exclusive_in_tile, OffsetT(&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD], LengthOffsetPair(&lengths_and_offsets)[ITEMS_PER_THREAD], Int2Type< true > is_warp_time_slice)
__device__ __forceinline__ void Scatter(OffsetT tile_num_runs_aggregate, OffsetT tile_num_runs_exclusive_in_global, OffsetT warp_num_runs_aggregate, OffsetT warp_num_runs_exclusive_in_tile, OffsetT(&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD], LengthOffsetPair(&lengths_and_offsets)[ITEMS_PER_THREAD])
__device__ __forceinline__ void WarpScanAllocations(LengthOffsetPair &tile_aggregate, LengthOffsetPair &warp_aggregate, LengthOffsetPair &warp_exclusive_in_tile, LengthOffsetPair &thread_exclusive_in_warp, LengthOffsetPair(&lengths_and_num_runs)[ITEMS_PER_THREAD])
__device__ __forceinline__ void ConsumeRange(int num_tiles, ScanTileStateT &tile_status, NumRunsIteratorT d_num_runs_out)
< Output iterator type for recording number of items selected
__device__ __forceinline__ void ScatterTwoPhase(OffsetT tile_num_runs_exclusive_in_global, OffsetT warp_num_runs_aggregate, OffsetT warp_num_runs_exclusive_in_tile, OffsetT(&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD], LengthOffsetPair(&lengths_and_offsets)[ITEMS_PER_THREAD], Int2Type< false > is_warp_time_slice)
OffsetsOutputIteratorT d_offsets_out
Input run offsets.
@ STORE_WARP_TIME_SLICING
Whether or not only one warp's worth of shared memory should be allocated and time-sliced among block...
@ SYNC_AFTER_LOAD
Whether or not to sync after loading data.
__device__ __forceinline__ void ScatterDirect(OffsetT tile_num_runs_exclusive_in_global, OffsetT warp_num_runs_aggregate, OffsetT warp_num_runs_exclusive_in_tile, OffsetT(&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD], LengthOffsetPair(&lengths_and_offsets)[ITEMS_PER_THREAD])
LengthsOutputIteratorT d_lengths_out
Output run lengths.
WrappedInputIteratorT d_in
Pointer to input sequence of data items.
KeyValuePair< OffsetT, LengthT > LengthOffsetPair
Tuple type for scanning (pairs run-length and run-index)
EqualityOpT equality_op
T equality operator.
OffsetT num_items
Total number of input items.
std::iterator_traits< InputIteratorT >::value_type T
The input value type.
_TempStorage & temp_storage
Reference to temp_storage.
__device__ __forceinline__ LengthOffsetPair ConsumeTile(OffsetT num_items, OffsetT num_remaining, int tile_idx, OffsetT tile_offset, ScanTileStateT &tile_status)
ReduceByKeyScanTileState< LengthT, OffsetT > ScanTileStateT
Tile status descriptor interface type.
\smemstorage{BlockDiscontinuity}
\smemstorage{BlockLoad}
Type equality test.
Definition util_type.cuh:99
Type selection (IF ? ThenType : ElseType)
Definition util_type.cuh:73
ThenType Type
Conditional type result.
Definition util_type.cuh:75
Allows for the treatment of an integral constant as a type at compile-time (e.g., to achieve static c...
A key identifier paired with a corresponding value.
Value value
Item value.
Key key
Item key.
Reduce-by-segment functor.
Default sum functor.
A storage-backing wrapper that allows types with non-trivial constructors to be aliased in unions.
\smemstorage{WarpExchange}
\smemstorage{WarpScan}