OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
agent_rle.cuh
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright (c) 2011, Duane Merrill. All rights reserved.
3  * Copyright (c) 2011-2018, NVIDIA CORPORATION. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the NVIDIA CORPORATION nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  *
27  ******************************************************************************/
28 
34 #pragma once
35 
36 #include <iterator>
37 
39 #include "../block/block_load.cuh"
40 #include "../block/block_store.cuh"
41 #include "../block/block_scan.cuh"
42 #include "../block/block_exchange.cuh"
43 #include "../block/block_discontinuity.cuh"
44 #include "../grid/grid_queue.cuh"
45 #include "../iterator/cache_modified_input_iterator.cuh"
46 #include "../iterator/constant_input_iterator.cuh"
47 #include "../util_namespace.cuh"
48 
50 CUB_NS_PREFIX
51 
53 namespace cub {
54 
55 
56 /******************************************************************************
57  * Tuning policy types
58  ******************************************************************************/
59 
63 template <
64  int _BLOCK_THREADS,
65  int _ITEMS_PER_THREAD,
66  BlockLoadAlgorithm _LOAD_ALGORITHM,
67  CacheLoadModifier _LOAD_MODIFIER,
68  bool _STORE_WARP_TIME_SLICING,
69  BlockScanAlgorithm _SCAN_ALGORITHM>
71 {
72  enum
73  {
74  BLOCK_THREADS = _BLOCK_THREADS,
75  ITEMS_PER_THREAD = _ITEMS_PER_THREAD,
76  STORE_WARP_TIME_SLICING = _STORE_WARP_TIME_SLICING,
77  };
78 
79  static const BlockLoadAlgorithm LOAD_ALGORITHM = _LOAD_ALGORITHM;
80  static const CacheLoadModifier LOAD_MODIFIER = _LOAD_MODIFIER;
81  static const BlockScanAlgorithm SCAN_ALGORITHM = _SCAN_ALGORITHM;
82 };
83 
84 
85 
86 
87 
88 /******************************************************************************
89  * Thread block abstractions
90  ******************************************************************************/
91 
95 template <
96  typename AgentRlePolicyT,
97  typename InputIteratorT,
98  typename OffsetsOutputIteratorT,
99  typename LengthsOutputIteratorT,
100  typename EqualityOpT,
101  typename OffsetT>
102 struct AgentRle
103 {
104  //---------------------------------------------------------------------
105  // Types and constants
106  //---------------------------------------------------------------------
107 
109  typedef typename std::iterator_traits<InputIteratorT>::value_type T;
110 
112  typedef typename If<(Equals<typename std::iterator_traits<LengthsOutputIteratorT>::value_type, void>::VALUE), // LengthT = (if output iterator's value type is void) ?
113  OffsetT, // ... then the OffsetT type,
114  typename std::iterator_traits<LengthsOutputIteratorT>::value_type>::Type LengthT; // ... else the output iterator's value type
115 
118 
121 
122  // Constants
123  enum
124  {
125  WARP_THREADS = CUB_WARP_THREADS(PTX_ARCH),
126  BLOCK_THREADS = AgentRlePolicyT::BLOCK_THREADS,
127  ITEMS_PER_THREAD = AgentRlePolicyT::ITEMS_PER_THREAD,
128  WARP_ITEMS = WARP_THREADS * ITEMS_PER_THREAD,
129  TILE_ITEMS = BLOCK_THREADS * ITEMS_PER_THREAD,
130  WARPS = (BLOCK_THREADS + WARP_THREADS - 1) / WARP_THREADS,
131 
133  SYNC_AFTER_LOAD = (AgentRlePolicyT::LOAD_ALGORITHM != BLOCK_LOAD_DIRECT),
134 
136  STORE_WARP_TIME_SLICING = AgentRlePolicyT::STORE_WARP_TIME_SLICING,
137  ACTIVE_EXCHANGE_WARPS = (STORE_WARP_TIME_SLICING) ? 1 : WARPS,
138  };
139 
140 
146  template <bool LAST_TILE>
148  {
149  OffsetT num_remaining;
150  EqualityOpT equality_op;
151 
152  __device__ __forceinline__ OobInequalityOp(
153  OffsetT num_remaining,
154  EqualityOpT equality_op)
155  :
156  num_remaining(num_remaining),
158  {}
159 
160  template <typename Index>
161  __host__ __device__ __forceinline__ bool operator()(T first, T second, Index idx)
162  {
163  if (!LAST_TILE || (idx < num_remaining))
164  return !equality_op(first, second);
165  else
166  return true;
167  }
168  };
169 
170 
171  // Cache-modified Input iterator wrapper type (for applying cache modifier) for data
172  typedef typename If<IsPointer<InputIteratorT>::VALUE,
173  CacheModifiedInputIterator<AgentRlePolicyT::LOAD_MODIFIER, T, OffsetT>, // Wrap the native input pointer with CacheModifiedVLengthnputIterator
174  InputIteratorT>::Type // Directly use the supplied input iterator type
175  WrappedInputIteratorT;
176 
177  // Parameterized BlockLoad type for data
178  typedef BlockLoad<
179  T,
180  AgentRlePolicyT::BLOCK_THREADS,
181  AgentRlePolicyT::ITEMS_PER_THREAD,
182  AgentRlePolicyT::LOAD_ALGORITHM>
183  BlockLoadT;
184 
185  // Parameterized BlockDiscontinuity type for data
187 
188  // Parameterized WarpScan type
190 
191  // Reduce-length-by-run scan operator
193 
194  // Callback type for obtaining tile prefix during block scan
195  typedef TilePrefixCallbackOp<
200 
201  // Warp exchange types
203 
205 
208 
209  typedef LengthOffsetPair WarpAggregates[WARPS];
210 
211  // Shared memory type for this thread block
213  {
214  // Aliasable storage layout
215  union Aliasable
216  {
217  struct
218  {
219  typename BlockDiscontinuityT::TempStorage discontinuity; // Smem needed for discontinuity detection
220  typename WarpScanPairs::TempStorage warp_scan[WARPS]; // Smem needed for warp-synchronous scans
221  Uninitialized<LengthOffsetPair[WARPS]> warp_aggregates; // Smem needed for sharing warp-wide aggregates
222  typename TilePrefixCallbackOpT::TempStorage prefix; // Smem needed for cooperative prefix callback
223  };
224 
225  // Smem needed for input loading
226  typename BlockLoadT::TempStorage load;
227 
228  // Aliasable layout needed for two-phase scatter
230  {
231  unsigned long long align;
232  WarpExchangePairsStorage exchange_pairs[ACTIVE_EXCHANGE_WARPS];
233  typename WarpExchangeOffsets::TempStorage exchange_offsets[ACTIVE_EXCHANGE_WARPS];
234  typename WarpExchangeLengths::TempStorage exchange_lengths[ACTIVE_EXCHANGE_WARPS];
235 
236  } scatter_aliasable;
237 
238  } aliasable;
239 
240  OffsetT tile_idx; // Shared tile index
241  LengthOffsetPair tile_inclusive; // Inclusive tile prefix
242  LengthOffsetPair tile_exclusive; // Exclusive tile prefix
243  };
244 
245  // Alias wrapper allowing storage to be unioned
246  struct TempStorage : Uninitialized<_TempStorage> {};
247 
248 
249  //---------------------------------------------------------------------
250  // Per-thread fields
251  //---------------------------------------------------------------------
252 
254 
255  WrappedInputIteratorT d_in;
256  OffsetsOutputIteratorT d_offsets_out;
257  LengthsOutputIteratorT d_lengths_out;
258 
259  EqualityOpT equality_op;
262 
263 
264  //---------------------------------------------------------------------
265  // Constructor
266  //---------------------------------------------------------------------
267 
268  // Constructor
269  __device__ __forceinline__
272  InputIteratorT d_in,
273  OffsetsOutputIteratorT d_offsets_out,
274  LengthsOutputIteratorT d_lengths_out,
275  EqualityOpT equality_op,
277  :
278  temp_storage(temp_storage.Alias()),
279  d_in(d_in),
283  scan_op(cub::Sum()),
285  {}
286 
287 
288  //---------------------------------------------------------------------
289  // Utility methods for initializing the selections
290  //---------------------------------------------------------------------
291 
292  template <bool FIRST_TILE, bool LAST_TILE>
293  __device__ __forceinline__ void InitializeSelections(
294  OffsetT tile_offset,
295  OffsetT num_remaining,
296  T (&items)[ITEMS_PER_THREAD],
297  LengthOffsetPair (&lengths_and_num_runs)[ITEMS_PER_THREAD])
298  {
299  bool head_flags[ITEMS_PER_THREAD];
300  bool tail_flags[ITEMS_PER_THREAD];
301 
302  OobInequalityOp<LAST_TILE> inequality_op(num_remaining, equality_op);
303 
304  if (FIRST_TILE && LAST_TILE)
305  {
306  // First-and-last-tile always head-flags the first item and tail-flags the last item
307 
308  BlockDiscontinuityT(temp_storage.aliasable.discontinuity).FlagHeadsAndTails(
309  head_flags, tail_flags, items, inequality_op);
310  }
311  else if (FIRST_TILE)
312  {
313  // First-tile always head-flags the first item
314 
315  // Get the first item from the next tile
316  T tile_successor_item;
317  if (threadIdx.x == BLOCK_THREADS - 1)
318  tile_successor_item = d_in[tile_offset + TILE_ITEMS];
319 
320  BlockDiscontinuityT(temp_storage.aliasable.discontinuity).FlagHeadsAndTails(
321  head_flags, tail_flags, tile_successor_item, items, inequality_op);
322  }
323  else if (LAST_TILE)
324  {
325  // Last-tile always flags the last item
326 
327  // Get the last item from the previous tile
328  T tile_predecessor_item;
329  if (threadIdx.x == 0)
330  tile_predecessor_item = d_in[tile_offset - 1];
331 
332  BlockDiscontinuityT(temp_storage.aliasable.discontinuity).FlagHeadsAndTails(
333  head_flags, tile_predecessor_item, tail_flags, items, inequality_op);
334  }
335  else
336  {
337  // Get the first item from the next tile
338  T tile_successor_item;
339  if (threadIdx.x == BLOCK_THREADS - 1)
340  tile_successor_item = d_in[tile_offset + TILE_ITEMS];
341 
342  // Get the last item from the previous tile
343  T tile_predecessor_item;
344  if (threadIdx.x == 0)
345  tile_predecessor_item = d_in[tile_offset - 1];
346 
347  BlockDiscontinuityT(temp_storage.aliasable.discontinuity).FlagHeadsAndTails(
348  head_flags, tile_predecessor_item, tail_flags, tile_successor_item, items, inequality_op);
349  }
350 
351  // Zip counts and runs
352  #pragma unroll
353  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
354  {
355  lengths_and_num_runs[ITEM].key = head_flags[ITEM] && (!tail_flags[ITEM]);
356  lengths_and_num_runs[ITEM].value = ((!head_flags[ITEM]) || (!tail_flags[ITEM]));
357  }
358  }
359 
360  //---------------------------------------------------------------------
361  // Scan utility methods
362  //---------------------------------------------------------------------
363 
367  __device__ __forceinline__ void WarpScanAllocations(
368  LengthOffsetPair &tile_aggregate,
369  LengthOffsetPair &warp_aggregate,
370  LengthOffsetPair &warp_exclusive_in_tile,
371  LengthOffsetPair &thread_exclusive_in_warp,
372  LengthOffsetPair (&lengths_and_num_runs)[ITEMS_PER_THREAD])
373  {
374  // Perform warpscans
375  unsigned int warp_id = ((WARPS == 1) ? 0 : threadIdx.x / WARP_THREADS);
376  int lane_id = LaneId();
377 
378  LengthOffsetPair identity;
379  identity.key = 0;
380  identity.value = 0;
381 
382  LengthOffsetPair thread_inclusive;
383  LengthOffsetPair thread_aggregate = internal::ThreadReduce(lengths_and_num_runs, scan_op);
384  WarpScanPairs(temp_storage.aliasable.warp_scan[warp_id]).Scan(
385  thread_aggregate,
386  thread_inclusive,
387  thread_exclusive_in_warp,
388  identity,
389  scan_op);
390 
391  // Last lane in each warp shares its warp-aggregate
392  if (lane_id == WARP_THREADS - 1)
393  temp_storage.aliasable.warp_aggregates.Alias()[warp_id] = thread_inclusive;
394 
395  CTA_SYNC();
396 
397  // Accumulate total selected and the warp-wide prefix
398  warp_exclusive_in_tile = identity;
399  warp_aggregate = temp_storage.aliasable.warp_aggregates.Alias()[warp_id];
400  tile_aggregate = temp_storage.aliasable.warp_aggregates.Alias()[0];
401 
402  #pragma unroll
403  for (int WARP = 1; WARP < WARPS; ++WARP)
404  {
405  if (warp_id == WARP)
406  warp_exclusive_in_tile = tile_aggregate;
407 
408  tile_aggregate = scan_op(tile_aggregate, temp_storage.aliasable.warp_aggregates.Alias()[WARP]);
409  }
410  }
411 
412 
413  //---------------------------------------------------------------------
414  // Utility methods for scattering selections
415  //---------------------------------------------------------------------
416 
420  template <bool FIRST_TILE>
421  __device__ __forceinline__ void ScatterTwoPhase(
422  OffsetT tile_num_runs_exclusive_in_global,
423  OffsetT warp_num_runs_aggregate,
424  OffsetT warp_num_runs_exclusive_in_tile,
425  OffsetT (&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD],
426  LengthOffsetPair (&lengths_and_offsets)[ITEMS_PER_THREAD],
427  Int2Type<true> is_warp_time_slice)
428  {
429  unsigned int warp_id = ((WARPS == 1) ? 0 : threadIdx.x / WARP_THREADS);
430  int lane_id = LaneId();
431 
432  // Locally compact items within the warp (first warp)
433  if (warp_id == 0)
434  {
435  WarpExchangePairs(temp_storage.aliasable.scatter_aliasable.exchange_pairs[0]).ScatterToStriped(
436  lengths_and_offsets, thread_num_runs_exclusive_in_warp);
437  }
438 
439  // Locally compact items within the warp (remaining warps)
440  #pragma unroll
441  for (int SLICE = 1; SLICE < WARPS; ++SLICE)
442  {
443  CTA_SYNC();
444 
445  if (warp_id == SLICE)
446  {
447  WarpExchangePairs(temp_storage.aliasable.scatter_aliasable.exchange_pairs[0]).ScatterToStriped(
448  lengths_and_offsets, thread_num_runs_exclusive_in_warp);
449  }
450  }
451 
452  // Global scatter
453  #pragma unroll
454  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
455  {
456  if ((ITEM * WARP_THREADS) < warp_num_runs_aggregate - lane_id)
457  {
458  OffsetT item_offset =
459  tile_num_runs_exclusive_in_global +
460  warp_num_runs_exclusive_in_tile +
461  (ITEM * WARP_THREADS) + lane_id;
462 
463  // Scatter offset
464  d_offsets_out[item_offset] = lengths_and_offsets[ITEM].key;
465 
466  // Scatter length if not the first (global) length
467  if ((!FIRST_TILE) || (ITEM != 0) || (threadIdx.x > 0))
468  {
469  d_lengths_out[item_offset - 1] = lengths_and_offsets[ITEM].value;
470  }
471  }
472  }
473  }
474 
475 
479  template <bool FIRST_TILE>
480  __device__ __forceinline__ void ScatterTwoPhase(
481  OffsetT tile_num_runs_exclusive_in_global,
482  OffsetT warp_num_runs_aggregate,
483  OffsetT warp_num_runs_exclusive_in_tile,
484  OffsetT (&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD],
485  LengthOffsetPair (&lengths_and_offsets)[ITEMS_PER_THREAD],
486  Int2Type<false> is_warp_time_slice)
487  {
488  unsigned int warp_id = ((WARPS == 1) ? 0 : threadIdx.x / WARP_THREADS);
489  int lane_id = LaneId();
490 
491  // Unzip
492  OffsetT run_offsets[ITEMS_PER_THREAD];
493  LengthT run_lengths[ITEMS_PER_THREAD];
494 
495  #pragma unroll
496  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
497  {
498  run_offsets[ITEM] = lengths_and_offsets[ITEM].key;
499  run_lengths[ITEM] = lengths_and_offsets[ITEM].value;
500  }
501 
502  WarpExchangeOffsets(temp_storage.aliasable.scatter_aliasable.exchange_offsets[warp_id]).ScatterToStriped(
503  run_offsets, thread_num_runs_exclusive_in_warp);
504 
505  WARP_SYNC(0xffffffff);
506 
507  WarpExchangeLengths(temp_storage.aliasable.scatter_aliasable.exchange_lengths[warp_id]).ScatterToStriped(
508  run_lengths, thread_num_runs_exclusive_in_warp);
509 
510  // Global scatter
511  #pragma unroll
512  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
513  {
514  if ((ITEM * WARP_THREADS) + lane_id < warp_num_runs_aggregate)
515  {
516  OffsetT item_offset =
517  tile_num_runs_exclusive_in_global +
518  warp_num_runs_exclusive_in_tile +
519  (ITEM * WARP_THREADS) + lane_id;
520 
521  // Scatter offset
522  d_offsets_out[item_offset] = run_offsets[ITEM];
523 
524  // Scatter length if not the first (global) length
525  if ((!FIRST_TILE) || (ITEM != 0) || (threadIdx.x > 0))
526  {
527  d_lengths_out[item_offset - 1] = run_lengths[ITEM];
528  }
529  }
530  }
531  }
532 
533 
537  template <bool FIRST_TILE>
538  __device__ __forceinline__ void ScatterDirect(
539  OffsetT tile_num_runs_exclusive_in_global,
540  OffsetT warp_num_runs_aggregate,
541  OffsetT warp_num_runs_exclusive_in_tile,
542  OffsetT (&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD],
543  LengthOffsetPair (&lengths_and_offsets)[ITEMS_PER_THREAD])
544  {
545  #pragma unroll
546  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
547  {
548  if (thread_num_runs_exclusive_in_warp[ITEM] < warp_num_runs_aggregate)
549  {
550  OffsetT item_offset =
551  tile_num_runs_exclusive_in_global +
552  warp_num_runs_exclusive_in_tile +
553  thread_num_runs_exclusive_in_warp[ITEM];
554 
555  // Scatter offset
556  d_offsets_out[item_offset] = lengths_and_offsets[ITEM].key;
557 
558  // Scatter length if not the first (global) length
559  if (item_offset >= 1)
560  {
561  d_lengths_out[item_offset - 1] = lengths_and_offsets[ITEM].value;
562  }
563  }
564  }
565  }
566 
567 
571  template <bool FIRST_TILE>
572  __device__ __forceinline__ void Scatter(
573  OffsetT tile_num_runs_aggregate,
574  OffsetT tile_num_runs_exclusive_in_global,
575  OffsetT warp_num_runs_aggregate,
576  OffsetT warp_num_runs_exclusive_in_tile,
577  OffsetT (&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD],
578  LengthOffsetPair (&lengths_and_offsets)[ITEMS_PER_THREAD])
579  {
580  if ((ITEMS_PER_THREAD == 1) || (tile_num_runs_aggregate < BLOCK_THREADS))
581  {
582  // Direct scatter if the warp has any items
583  if (warp_num_runs_aggregate)
584  {
585  ScatterDirect<FIRST_TILE>(
586  tile_num_runs_exclusive_in_global,
587  warp_num_runs_aggregate,
588  warp_num_runs_exclusive_in_tile,
589  thread_num_runs_exclusive_in_warp,
590  lengths_and_offsets);
591  }
592  }
593  else
594  {
595  // Scatter two phase
596  ScatterTwoPhase<FIRST_TILE>(
597  tile_num_runs_exclusive_in_global,
598  warp_num_runs_aggregate,
599  warp_num_runs_exclusive_in_tile,
600  thread_num_runs_exclusive_in_warp,
601  lengths_and_offsets,
603  }
604  }
605 
606 
607 
608  //---------------------------------------------------------------------
609  // Cooperatively scan a device-wide sequence of tiles with other CTAs
610  //---------------------------------------------------------------------
611 
615  template <
616  bool LAST_TILE>
617  __device__ __forceinline__ LengthOffsetPair ConsumeTile(
619  OffsetT num_remaining,
620  int tile_idx,
621  OffsetT tile_offset,
623  {
624  if (tile_idx == 0)
625  {
626  // First tile
627 
628  // Load items
629  T items[ITEMS_PER_THREAD];
630  if (LAST_TILE)
631  BlockLoadT(temp_storage.aliasable.load).Load(d_in + tile_offset, items, num_remaining, T());
632  else
633  BlockLoadT(temp_storage.aliasable.load).Load(d_in + tile_offset, items);
634 
635  if (SYNC_AFTER_LOAD)
636  CTA_SYNC();
637 
638  // Set flags
639  LengthOffsetPair lengths_and_num_runs[ITEMS_PER_THREAD];
640 
641  InitializeSelections<true, LAST_TILE>(
642  tile_offset,
643  num_remaining,
644  items,
645  lengths_and_num_runs);
646 
647  // Exclusive scan of lengths and runs
648  LengthOffsetPair tile_aggregate;
649  LengthOffsetPair warp_aggregate;
650  LengthOffsetPair warp_exclusive_in_tile;
651  LengthOffsetPair thread_exclusive_in_warp;
652 
654  tile_aggregate,
655  warp_aggregate,
656  warp_exclusive_in_tile,
657  thread_exclusive_in_warp,
658  lengths_and_num_runs);
659 
660  // Update tile status if this is not the last tile
661  if (!LAST_TILE && (threadIdx.x == 0))
662  tile_status.SetInclusive(0, tile_aggregate);
663 
664  // Update thread_exclusive_in_warp to fold in warp run-length
665  if (thread_exclusive_in_warp.key == 0)
666  thread_exclusive_in_warp.value += warp_exclusive_in_tile.value;
667 
668  LengthOffsetPair lengths_and_offsets[ITEMS_PER_THREAD];
669  OffsetT thread_num_runs_exclusive_in_warp[ITEMS_PER_THREAD];
670  LengthOffsetPair lengths_and_num_runs2[ITEMS_PER_THREAD];
671 
672  // Downsweep scan through lengths_and_num_runs
673  internal::ThreadScanExclusive(lengths_and_num_runs, lengths_and_num_runs2, scan_op, thread_exclusive_in_warp);
674 
675  // Zip
676 
677  #pragma unroll
678  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
679  {
680  lengths_and_offsets[ITEM].value = lengths_and_num_runs2[ITEM].value;
681  lengths_and_offsets[ITEM].key = tile_offset + (threadIdx.x * ITEMS_PER_THREAD) + ITEM;
682  thread_num_runs_exclusive_in_warp[ITEM] = (lengths_and_num_runs[ITEM].key) ?
683  lengths_and_num_runs2[ITEM].key : // keep
684  WARP_THREADS * ITEMS_PER_THREAD; // discard
685  }
686 
687  OffsetT tile_num_runs_aggregate = tile_aggregate.key;
688  OffsetT tile_num_runs_exclusive_in_global = 0;
689  OffsetT warp_num_runs_aggregate = warp_aggregate.key;
690  OffsetT warp_num_runs_exclusive_in_tile = warp_exclusive_in_tile.key;
691 
692  // Scatter
693  Scatter<true>(
694  tile_num_runs_aggregate,
695  tile_num_runs_exclusive_in_global,
696  warp_num_runs_aggregate,
697  warp_num_runs_exclusive_in_tile,
698  thread_num_runs_exclusive_in_warp,
699  lengths_and_offsets);
700 
701  // Return running total (inclusive of this tile)
702  return tile_aggregate;
703  }
704  else
705  {
706  // Not first tile
707 
708  // Load items
709  T items[ITEMS_PER_THREAD];
710  if (LAST_TILE)
711  BlockLoadT(temp_storage.aliasable.load).Load(d_in + tile_offset, items, num_remaining, T());
712  else
713  BlockLoadT(temp_storage.aliasable.load).Load(d_in + tile_offset, items);
714 
715  if (SYNC_AFTER_LOAD)
716  CTA_SYNC();
717 
718  // Set flags
719  LengthOffsetPair lengths_and_num_runs[ITEMS_PER_THREAD];
720 
721  InitializeSelections<false, LAST_TILE>(
722  tile_offset,
723  num_remaining,
724  items,
725  lengths_and_num_runs);
726 
727  // Exclusive scan of lengths and runs
728  LengthOffsetPair tile_aggregate;
729  LengthOffsetPair warp_aggregate;
730  LengthOffsetPair warp_exclusive_in_tile;
731  LengthOffsetPair thread_exclusive_in_warp;
732 
734  tile_aggregate,
735  warp_aggregate,
736  warp_exclusive_in_tile,
737  thread_exclusive_in_warp,
738  lengths_and_num_runs);
739 
740  // First warp computes tile prefix in lane 0
741  TilePrefixCallbackOpT prefix_op(tile_status, temp_storage.aliasable.prefix, Sum(), tile_idx);
742  unsigned int warp_id = ((WARPS == 1) ? 0 : threadIdx.x / WARP_THREADS);
743  if (warp_id == 0)
744  {
745  prefix_op(tile_aggregate);
746  if (threadIdx.x == 0)
747  temp_storage.tile_exclusive = prefix_op.exclusive_prefix;
748  }
749 
750  CTA_SYNC();
751 
752  LengthOffsetPair tile_exclusive_in_global = temp_storage.tile_exclusive;
753 
754  // Update thread_exclusive_in_warp to fold in warp and tile run-lengths
755  LengthOffsetPair thread_exclusive = scan_op(tile_exclusive_in_global, warp_exclusive_in_tile);
756  if (thread_exclusive_in_warp.key == 0)
757  thread_exclusive_in_warp.value += thread_exclusive.value;
758 
759  // Downsweep scan through lengths_and_num_runs
760  LengthOffsetPair lengths_and_num_runs2[ITEMS_PER_THREAD];
761  LengthOffsetPair lengths_and_offsets[ITEMS_PER_THREAD];
762  OffsetT thread_num_runs_exclusive_in_warp[ITEMS_PER_THREAD];
763 
764  internal::ThreadScanExclusive(lengths_and_num_runs, lengths_and_num_runs2, scan_op, thread_exclusive_in_warp);
765 
766  // Zip
767  #pragma unroll
768  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ITEM++)
769  {
770  lengths_and_offsets[ITEM].value = lengths_and_num_runs2[ITEM].value;
771  lengths_and_offsets[ITEM].key = tile_offset + (threadIdx.x * ITEMS_PER_THREAD) + ITEM;
772  thread_num_runs_exclusive_in_warp[ITEM] = (lengths_and_num_runs[ITEM].key) ?
773  lengths_and_num_runs2[ITEM].key : // keep
774  WARP_THREADS * ITEMS_PER_THREAD; // discard
775  }
776 
777  OffsetT tile_num_runs_aggregate = tile_aggregate.key;
778  OffsetT tile_num_runs_exclusive_in_global = tile_exclusive_in_global.key;
779  OffsetT warp_num_runs_aggregate = warp_aggregate.key;
780  OffsetT warp_num_runs_exclusive_in_tile = warp_exclusive_in_tile.key;
781 
782  // Scatter
783  Scatter<false>(
784  tile_num_runs_aggregate,
785  tile_num_runs_exclusive_in_global,
786  warp_num_runs_aggregate,
787  warp_num_runs_exclusive_in_tile,
788  thread_num_runs_exclusive_in_warp,
789  lengths_and_offsets);
790 
791  // Return running total (inclusive of this tile)
792  return prefix_op.inclusive_prefix;
793  }
794  }
795 
796 
800  template <typename NumRunsIteratorT>
801  __device__ __forceinline__ void ConsumeRange(
802  int num_tiles,
804  NumRunsIteratorT d_num_runs_out)
805  {
806  // Blocks are launched in increasing order, so just assign one tile per block
807  int tile_idx = (blockIdx.x * gridDim.y) + blockIdx.y; // Current tile index
808  OffsetT tile_offset = tile_idx * TILE_ITEMS; // Global offset for the current tile
809  OffsetT num_remaining = num_items - tile_offset; // Remaining items (including this tile)
810 
811  if (tile_idx < num_tiles - 1)
812  {
813  // Not the last tile (full)
814  ConsumeTile<false>(num_items, num_remaining, tile_idx, tile_offset, tile_status);
815  }
816  else if (num_remaining > 0)
817  {
818  // The last tile (possibly partially-full)
819  LengthOffsetPair running_total = ConsumeTile<true>(num_items, num_remaining, tile_idx, tile_offset, tile_status);
820 
821  if (threadIdx.x == 0)
822  {
823  // Output the total number of items selected
824  *d_num_runs_out = running_total.key;
825 
826  // The inclusive prefix contains accumulated length reduction for the last run
827  if (running_total.key > 0)
828  d_lengths_out[running_total.key - 1] = running_total.value;
829  }
830  }
831  }
832 };
833 
834 
835 } // CUB namespace
836 CUB_NS_POSTFIX // Optional outer namespace(s)
837 
Items per thread (per tile of input)
Definition: agent_rle.cuh:75
Type equality test.
Definition: util_type.cuh:98
Key key
Item key.
Definition: util_type.cuh:671
\smemstorage{BlockDiscontinuity}
CacheLoadModifier
Enumeration of cache modifiers for memory load operations.
Definition: thread_load.cuh:62
__device__ __forceinline__ T ThreadReduce(T *input, ReductionOp reduction_op, T prefix, Int2Type< LENGTH >)
Value value
Item value.
Definition: util_type.cuh:672
KeyValuePair< OffsetT, LengthT > LengthOffsetPair
Tuple type for scanning (pairs run-length and run-index)
Definition: agent_rle.cuh:117
Optional outer namespace(s)
UniqueOutputIteratorT ValuesInputIteratorT AggregatesOutputIteratorT NumRunsOutputIteratorT d_num_runs_out
Pointer to total number of runs encountered (i.e., the length of d_unique_out)
< The BlockScan algorithm to use
Definition: agent_rle.cuh:70
ReduceByKeyScanTileState< LengthT, OffsetT > ScanTileStateT
Tile status descriptor interface type.
Definition: agent_rle.cuh:120
__device__ __forceinline__ void Load(InputIteratorT block_itr, InputT(&items)[ITEMS_PER_THREAD])
Load a linear segment of items from memory.
__device__ __forceinline__ void ScatterDirect(OffsetT tile_num_runs_exclusive_in_global, OffsetT warp_num_runs_aggregate, OffsetT warp_num_runs_exclusive_in_tile, OffsetT(&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD], LengthOffsetPair(&lengths_and_offsets)[ITEMS_PER_THREAD])
Definition: agent_rle.cuh:538
\smemstorage{BlockLoad}
OffsetsOutputIteratorT LengthsOutputIteratorT NumRunsOutputIteratorT ScanTileStateT tile_status
[in] Tile status interface
CTA_SYNC()
Definition: util_ptx.cuh:255
AgentRle implements a stateful abstraction of CUDA thread blocks for participating in device-wide run...
Definition: agent_rle.cuh:102
__device__ __forceinline__ unsigned int LaneId()
Returns the warp lane ID of the calling thread.
Definition: util_ptx.cuh:420
BlockLoadAlgorithm
cub::BlockLoadAlgorithm enumerates alternative algorithms for cub::BlockLoad to read a linear segment...
Definition: block_load.cuh:473
UniqueOutputIteratorT ValuesInputIteratorT AggregatesOutputIteratorT NumRunsOutputIteratorT ScanTileStateT int EqualityOpT equality_op
KeyT equality operator.
__device__ __forceinline__ AgentRle(TempStorage &temp_storage, InputIteratorT d_in, OffsetsOutputIteratorT d_offsets_out, LengthsOutputIteratorT d_lengths_out, EqualityOpT equality_op, OffsetT num_items)
Definition: agent_rle.cuh:270
Whether or not only one warp's worth of shared memory should be allocated and time-sliced among block...
Definition: agent_rle.cuh:136
Threads per thread block.
Definition: agent_rle.cuh:74
_TempStorage & temp_storage
Reference to temp_storage.
Definition: agent_rle.cuh:253
WrappedInputIteratorT d_in
Pointer to input sequence of data items.
Definition: agent_rle.cuh:255
__device__ __forceinline__ void Scatter(OffsetT tile_num_runs_aggregate, OffsetT tile_num_runs_exclusive_in_global, OffsetT warp_num_runs_aggregate, OffsetT warp_num_runs_exclusive_in_tile, OffsetT(&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD], LengthOffsetPair(&lengths_and_offsets)[ITEMS_PER_THREAD])
Definition: agent_rle.cuh:572
The BlockLoad class provides collective data movement methods for loading a linear segment of items f...
Definition: block_load.cuh:640
LengthsOutputIteratorT d_lengths_out
Output run lengths.
Definition: agent_rle.cuh:257
BlockRadixRank provides operations for ranking unsigned integer types within a CUDA thread block.
OffsetT OffsetT
[in] Total number of input data items
std::iterator_traits< InputIteratorT >::value_type T
The input value type.
Definition: agent_rle.cuh:109
static const BlockLoadAlgorithm LOAD_ALGORITHM
The BlockLoad algorithm to use.
Definition: agent_rle.cuh:79
__device__ __forceinline__ void WarpScanAllocations(LengthOffsetPair &tile_aggregate, LengthOffsetPair &warp_aggregate, LengthOffsetPair &warp_exclusive_in_tile, LengthOffsetPair &thread_exclusive_in_warp, LengthOffsetPair(&lengths_and_num_runs)[ITEMS_PER_THREAD])
Definition: agent_rle.cuh:367
\smemstorage{WarpExchange}
__device__ __forceinline__ T ThreadScanExclusive(T inclusive, T exclusive, T *input, T *output, ScanOp scan_op, Int2Type< LENGTH >)
Definition: thread_scan.cuh:63
The BlockDiscontinuity class provides collective methods for flagging discontinuities within an order...
Allows for the treatment of an integral constant as a type at compile-time (e.g., to achieve static c...
Definition: util_type.cuh:275
A storage-backing wrapper that allows types with non-trivial constructors to be aliased in unions.
Definition: util_type.cuh:634
__device__ __forceinline__ void ScatterTwoPhase(OffsetT tile_num_runs_exclusive_in_global, OffsetT warp_num_runs_aggregate, OffsetT warp_num_runs_exclusive_in_tile, OffsetT(&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD], LengthOffsetPair(&lengths_and_offsets)[ITEMS_PER_THREAD], Int2Type< false > is_warp_time_slice)
Definition: agent_rle.cuh:480
static const BlockScanAlgorithm SCAN_ALGORITHM
The BlockScan algorithm to use.
Definition: agent_rle.cuh:81
__device__ __forceinline__ void Scan(T input, T &inclusive_output, T &exclusive_output, ScanOp scan_op)
Computes both inclusive and exclusive prefix scans using the specified binary scan functor across the...
Definition: warp_scan.cuh:799
EqualityOpT equality_op
T equality operator.
Definition: agent_rle.cuh:259
__device__ __forceinline__ LengthOffsetPair ConsumeTile(OffsetT num_items, OffsetT num_remaining, int tile_idx, OffsetT tile_offset, ScanTileStateT &tile_status)
Definition: agent_rle.cuh:617
Type selection (IF ? ThenType : ElseType)
Definition: util_type.cuh:72
OffsetsOutputIteratorT LengthsOutputIteratorT NumRunsOutputIteratorT ScanTileStateT EqualityOpT OffsetT int num_tiles
< [in] Total number of tiles for the entire problem
ReduceBySegmentOpT scan_op
Reduce-length-by-flag scan operator.
Definition: agent_rle.cuh:260
static const CacheLoadModifier LOAD_MODIFIER
Cache load modifier for reading input elements.
Definition: agent_rle.cuh:80
Whether or not only one warp's worth of shared memory should be allocated and time-sliced among block...
Definition: agent_rle.cuh:76
OffsetsOutputIteratorT d_offsets_out
Input run offsets.
Definition: agent_rle.cuh:256
__device__ __forceinline__ void ScatterTwoPhase(OffsetT tile_num_runs_exclusive_in_global, OffsetT warp_num_runs_aggregate, OffsetT warp_num_runs_exclusive_in_tile, OffsetT(&thread_num_runs_exclusive_in_warp)[ITEMS_PER_THREAD], LengthOffsetPair(&lengths_and_offsets)[ITEMS_PER_THREAD], Int2Type< true > is_warp_time_slice)
Definition: agent_rle.cuh:421
__device__ __forceinline__ void ConsumeRange(int num_tiles, ScanTileStateT &tile_status, NumRunsIteratorT d_num_runs_out)
< Output iterator type for recording number of items selected
Definition: agent_rle.cuh:801
__device__ __forceinline__ void WARP_SYNC(unsigned int member_mask)
Definition: util_ptx.cuh:273
Default sum functor.
__device__ __forceinline__ void ScatterToStriped(T items[ITEMS_PER_THREAD], OffsetT ranks[ITEMS_PER_THREAD])
Exchanges valid data items annotated by rank into striped arrangement.
A random-access input wrapper for dereferencing array values using a PTX cache load modifier.
\smemstorage{WarpScan}
Definition: warp_scan.cuh:192
The WarpScan class provides collective methods for computing a parallel prefix scan of items partitio...
Definition: warp_scan.cuh:146
BlockScanAlgorithm
BlockScanAlgorithm enumerates alternative algorithms for cub::BlockScan to compute a parallel prefix ...
Definition: block_scan.cuh:57
OffsetT num_items
Total number of input items.
Definition: agent_rle.cuh:261
Whether or not to sync after loading data.
Definition: agent_rle.cuh:133