OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
agent_segment_fixup.cuh
Go to the documentation of this file.
1 /******************************************************************************
2  * Copyright (c) 2011, Duane Merrill. All rights reserved.
3  * Copyright (c) 2011-2018, NVIDIA CORPORATION. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the NVIDIA CORPORATION nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  *
27  ******************************************************************************/
28 
34 #pragma once
35 
36 #include <iterator>
37 
39 #include "../block/block_load.cuh"
40 #include "../block/block_store.cuh"
41 #include "../block/block_scan.cuh"
42 #include "../block/block_discontinuity.cuh"
43 #include "../iterator/cache_modified_input_iterator.cuh"
44 #include "../iterator/constant_input_iterator.cuh"
45 #include "../util_namespace.cuh"
46 
48 CUB_NS_PREFIX
49 
51 namespace cub {
52 
53 
54 /******************************************************************************
55  * Tuning policy types
56  ******************************************************************************/
57 
61 template <
62  int _BLOCK_THREADS,
63  int _ITEMS_PER_THREAD,
64  BlockLoadAlgorithm _LOAD_ALGORITHM,
65  CacheLoadModifier _LOAD_MODIFIER,
66  BlockScanAlgorithm _SCAN_ALGORITHM>
68 {
69  enum
70  {
71  BLOCK_THREADS = _BLOCK_THREADS,
72  ITEMS_PER_THREAD = _ITEMS_PER_THREAD,
73  };
74 
75  static const BlockLoadAlgorithm LOAD_ALGORITHM = _LOAD_ALGORITHM;
76  static const CacheLoadModifier LOAD_MODIFIER = _LOAD_MODIFIER;
77  static const BlockScanAlgorithm SCAN_ALGORITHM = _SCAN_ALGORITHM;
78 };
79 
80 
81 /******************************************************************************
82  * Thread block abstractions
83  ******************************************************************************/
84 
88 template <
89  typename AgentSegmentFixupPolicyT,
90  typename PairsInputIteratorT,
91  typename AggregatesOutputIteratorT,
92  typename EqualityOpT,
93  typename ReductionOpT,
94  typename OffsetT>
96 {
97  //---------------------------------------------------------------------
98  // Types and constants
99  //---------------------------------------------------------------------
100 
101  // Data type of key-value input iterator
102  typedef typename std::iterator_traits<PairsInputIteratorT>::value_type KeyValuePairT;
103 
104  // Value type
105  typedef typename KeyValuePairT::Value ValueT;
106 
107  // Tile status descriptor interface type
109 
110  // Constants
111  enum
112  {
113  BLOCK_THREADS = AgentSegmentFixupPolicyT::BLOCK_THREADS,
114  ITEMS_PER_THREAD = AgentSegmentFixupPolicyT::ITEMS_PER_THREAD,
115  TILE_ITEMS = BLOCK_THREADS * ITEMS_PER_THREAD,
116 
117  // Whether or not do fixup using RLE + global atomics
118  USE_ATOMIC_FIXUP = (CUB_PTX_ARCH >= 350) &&
123 
124  // Whether or not the scan operation has a zero-valued identity value (true if we're performing addition on a primitive type)
126  };
127 
128  // Cache-modified Input iterator wrapper type (for applying cache modifier) for keys
130  CacheModifiedInputIterator<AgentSegmentFixupPolicyT::LOAD_MODIFIER, KeyValuePairT, OffsetT>, // Wrap the native input pointer with CacheModifiedValuesInputIterator
131  PairsInputIteratorT>::Type // Directly use the supplied input iterator type
132  WrappedPairsInputIteratorT;
133 
134  // Cache-modified Input iterator wrapper type (for applying cache modifier) for fixup values
136  CacheModifiedInputIterator<AgentSegmentFixupPolicyT::LOAD_MODIFIER, ValueT, OffsetT>, // Wrap the native input pointer with CacheModifiedValuesInputIterator
137  AggregatesOutputIteratorT>::Type // Directly use the supplied input iterator type
138  WrappedFixupInputIteratorT;
139 
140  // Reduce-value-by-segment scan operator
142 
143  // Parameterized BlockLoad type for pairs
144  typedef BlockLoad<
145  KeyValuePairT,
146  BLOCK_THREADS,
147  ITEMS_PER_THREAD,
148  AgentSegmentFixupPolicyT::LOAD_ALGORITHM>
150 
151  // Parameterized BlockScan type
152  typedef BlockScan<
153  KeyValuePairT,
154  BLOCK_THREADS,
155  AgentSegmentFixupPolicyT::SCAN_ALGORITHM>
156  BlockScanT;
157 
158  // Callback type for obtaining tile prefix during block scan
159  typedef TilePrefixCallbackOp<
160  KeyValuePairT,
164 
165  // Shared memory type for this thread block
167  {
168  struct
169  {
170  typename BlockScanT::TempStorage scan; // Smem needed for tile scanning
171  typename TilePrefixCallbackOpT::TempStorage prefix; // Smem needed for cooperative prefix callback
172  };
173 
174  // Smem needed for loading keys
175  typename BlockLoadPairs::TempStorage load_pairs;
176  };
177 
178  // Alias wrapper allowing storage to be unioned
179  struct TempStorage : Uninitialized<_TempStorage> {};
180 
181 
182  //---------------------------------------------------------------------
183  // Per-thread fields
184  //---------------------------------------------------------------------
185 
187  WrappedPairsInputIteratorT d_pairs_in;
188  AggregatesOutputIteratorT d_aggregates_out;
189  WrappedFixupInputIteratorT d_fixup_in;
191  ReductionOpT reduction_op;
193 
194 
195  //---------------------------------------------------------------------
196  // Constructor
197  //---------------------------------------------------------------------
198 
199  // Constructor
200  __device__ __forceinline__
203  PairsInputIteratorT d_pairs_in,
204  AggregatesOutputIteratorT d_aggregates_out,
205  EqualityOpT equality_op,
206  ReductionOpT reduction_op)
207  :
208  temp_storage(temp_storage.Alias()),
215  {}
216 
217 
218  //---------------------------------------------------------------------
219  // Cooperatively scan a device-wide sequence of tiles with other CTAs
220  //---------------------------------------------------------------------
221 
222 
226  template <bool IS_LAST_TILE>
227  __device__ __forceinline__ void ConsumeTile(
228  OffsetT num_remaining,
229  int tile_idx,
230  OffsetT tile_offset,
232  Int2Type<true> use_atomic_fixup)
233  {
234  KeyValuePairT pairs[ITEMS_PER_THREAD];
235 
236  // Load pairs
237  KeyValuePairT oob_pair;
238  oob_pair.key = -1;
239 
240  if (IS_LAST_TILE)
241  BlockLoadPairs(temp_storage.load_pairs).Load(d_pairs_in + tile_offset, pairs, num_remaining, oob_pair);
242  else
243  BlockLoadPairs(temp_storage.load_pairs).Load(d_pairs_in + tile_offset, pairs);
244 
245  // RLE
246  #pragma unroll
247  for (int ITEM = 1; ITEM < ITEMS_PER_THREAD; ++ITEM)
248  {
249  ValueT* d_scatter = d_aggregates_out + pairs[ITEM - 1].key;
250  if (pairs[ITEM].key != pairs[ITEM - 1].key)
251  atomicAdd(d_scatter, pairs[ITEM - 1].value);
252  else
253  pairs[ITEM].value = reduction_op(pairs[ITEM - 1].value, pairs[ITEM].value);
254  }
255 
256  // Flush last item if valid
257  ValueT* d_scatter = d_aggregates_out + pairs[ITEMS_PER_THREAD - 1].key;
258  if ((!IS_LAST_TILE) || (pairs[ITEMS_PER_THREAD - 1].key >= 0))
259  atomicAdd(d_scatter, pairs[ITEMS_PER_THREAD - 1].value);
260  }
261 
262 
266  template <bool IS_LAST_TILE>
267  __device__ __forceinline__ void ConsumeTile(
268  OffsetT num_remaining,
269  int tile_idx,
270  OffsetT tile_offset,
272  Int2Type<false> use_atomic_fixup)
273  {
274  KeyValuePairT pairs[ITEMS_PER_THREAD];
275  KeyValuePairT scatter_pairs[ITEMS_PER_THREAD];
276 
277  // Load pairs
278  KeyValuePairT oob_pair;
279  oob_pair.key = -1;
280 
281  if (IS_LAST_TILE)
282  BlockLoadPairs(temp_storage.load_pairs).Load(d_pairs_in + tile_offset, pairs, num_remaining, oob_pair);
283  else
284  BlockLoadPairs(temp_storage.load_pairs).Load(d_pairs_in + tile_offset, pairs);
285 
286  CTA_SYNC();
287 
288  KeyValuePairT tile_aggregate;
289  if (tile_idx == 0)
290  {
291  // Exclusive scan of values and segment_flags
292  BlockScanT(temp_storage.scan).ExclusiveScan(pairs, scatter_pairs, scan_op, tile_aggregate);
293 
294  // Update tile status if this is not the last tile
295  if (threadIdx.x == 0)
296  {
297  // Set first segment id to not trigger a flush (invalid from exclusive scan)
298  scatter_pairs[0].key = pairs[0].key;
299 
300  if (!IS_LAST_TILE)
301  tile_state.SetInclusive(0, tile_aggregate);
302 
303  }
304  }
305  else
306  {
307  // Exclusive scan of values and segment_flags
308  TilePrefixCallbackOpT prefix_op(tile_state, temp_storage.prefix, scan_op, tile_idx);
309  BlockScanT(temp_storage.scan).ExclusiveScan(pairs, scatter_pairs, scan_op, prefix_op);
310  tile_aggregate = prefix_op.GetBlockAggregate();
311  }
312 
313  // Scatter updated values
314  #pragma unroll
315  for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
316  {
317  if (scatter_pairs[ITEM].key != pairs[ITEM].key)
318  {
319  // Update the value at the key location
320  ValueT value = d_fixup_in[scatter_pairs[ITEM].key];
321  value = reduction_op(value, scatter_pairs[ITEM].value);
322 
323  d_aggregates_out[scatter_pairs[ITEM].key] = value;
324  }
325  }
326 
327  // Finalize the last item
328  if (IS_LAST_TILE)
329  {
330  // Last thread will output final count and last item, if necessary
331  if (threadIdx.x == BLOCK_THREADS - 1)
332  {
333  // If the last tile is a whole tile, the inclusive prefix contains accumulated value reduction for the last segment
334  if (num_remaining == TILE_ITEMS)
335  {
336  // Update the value at the key location
337  OffsetT last_key = pairs[ITEMS_PER_THREAD - 1].key;
338  d_aggregates_out[last_key] = reduction_op(tile_aggregate.value, d_fixup_in[last_key]);
339  }
340  }
341  }
342  }
343 
344 
348  __device__ __forceinline__ void ConsumeRange(
349  int num_items,
350  int num_tiles,
352  {
353  // Blocks are launched in increasing order, so just assign one tile per block
354  int tile_idx = (blockIdx.x * gridDim.y) + blockIdx.y; // Current tile index
355  OffsetT tile_offset = tile_idx * TILE_ITEMS; // Global offset for the current tile
356  OffsetT num_remaining = num_items - tile_offset; // Remaining items (including this tile)
357 
358  if (num_remaining > TILE_ITEMS)
359  {
360  // Not the last tile (full)
361  ConsumeTile<false>(num_remaining, tile_idx, tile_offset, tile_state, Int2Type<USE_ATOMIC_FIXUP>());
362  }
363  else if (num_remaining > 0)
364  {
365  // The last tile (possibly partially-full)
366  ConsumeTile<true>(num_remaining, tile_idx, tile_offset, tile_state, Int2Type<USE_ATOMIC_FIXUP>());
367  }
368  }
369 
370 };
371 
372 
373 } // CUB namespace
374 CUB_NS_POSTFIX // Optional outer namespace(s)
375 
Type equality test.
Definition: util_type.cuh:98
ReduceBySegmentOpT scan_op
Reduce-by-segment scan operator.
AgentSegmentFixup implements a stateful abstraction of CUDA thread blocks for participating in device...
__device__ __forceinline__ void ConsumeTile(OffsetT num_remaining, int tile_idx, OffsetT tile_offset, ScanTileStateT &tile_state, Int2Type< false > use_atomic_fixup)
Type traits.
Definition: util_type.cuh:1158
CacheLoadModifier
Enumeration of cache modifiers for memory load operations.
Definition: thread_load.cuh:62
KeyT const ValueT ValueT OffsetT OffsetT num_items
[in] Total number of input data items
Optional outer namespace(s)
_TempStorage & temp_storage
Reference to temp_storage.
__device__ __forceinline__ void Load(InputIteratorT block_itr, InputT(&items)[ITEMS_PER_THREAD])
Load a linear segment of items from memory.
#define CUB_PTX_ARCH
CUB_PTX_ARCH reflects the PTX version targeted by the active compiler pass (or zero during the host p...
Definition: util_arch.cuh:53
\smemstorage{BlockLoad}
CTA_SYNC()
Definition: util_ptx.cuh:255
InequalityWrapper< EqualityOpT > inequality_op
KeyT inequality operator.
BlockLoadAlgorithm
cub::BlockLoadAlgorithm enumerates alternative algorithms for cub::BlockLoad to read a linear segment...
Definition: block_load.cuh:473
UniqueOutputIteratorT ValuesInputIteratorT AggregatesOutputIteratorT NumRunsOutputIteratorT ScanTileStateT int EqualityOpT equality_op
KeyT equality operator.
static const BlockScanAlgorithm SCAN_ALGORITHM
The BlockScan algorithm to use.
UniqueOutputIteratorT ValuesInputIteratorT AggregatesOutputIteratorT NumRunsOutputIteratorT ScanTileStateT tile_state
Tile status interface.
static const BlockLoadAlgorithm LOAD_ALGORITHM
The BlockLoad algorithm to use.
The BlockLoad class provides collective data movement methods for loading a linear segment of items f...
Definition: block_load.cuh:640
OffsetT OffsetT
[in] Total number of input data items
Items per thread (per tile of input)
__device__ __forceinline__ void ConsumeTile(OffsetT num_remaining, int tile_idx, OffsetT tile_offset, ScanTileStateT &tile_state, Int2Type< true > use_atomic_fixup)
< The BlockScan algorithm to use
WrappedFixupInputIteratorT d_fixup_in
Fixup input values.
Allows for the treatment of an integral constant as a type at compile-time (e.g., to achieve static c...
Definition: util_type.cuh:275
A storage-backing wrapper that allows types with non-trivial constructors to be aliased in unions.
Definition: util_type.cuh:634
WrappedPairsInputIteratorT d_pairs_in
Input keys.
AggregatesOutputIteratorT d_aggregates_out
Output value aggregates.
static const CacheLoadModifier LOAD_MODIFIER
Cache load modifier for reading input elements.
__device__ __forceinline__ void ExclusiveScan(T input, T &output, T initial_value, ScanOp scan_op)
Computes an exclusive block-wide prefix scan using the specified binary scan_op functor....
Definition: block_scan.cuh:728
ReductionOpT reduction_op
Reduction operator.
Type selection (IF ? ThenType : ElseType)
Definition: util_type.cuh:72
__device__ __forceinline__ AgentSegmentFixup(TempStorage &temp_storage, PairsInputIteratorT d_pairs_in, AggregatesOutputIteratorT d_aggregates_out, EqualityOpT equality_op, ReductionOpT reduction_op)
OffsetsOutputIteratorT LengthsOutputIteratorT NumRunsOutputIteratorT ScanTileStateT EqualityOpT OffsetT int num_tiles
< [in] Total number of tiles for the entire problem
The BlockScan class provides collective methods for computing a parallel prefix sum/scan of items par...
Definition: block_scan.cuh:193
__device__ __forceinline__ void ConsumeRange(int num_items, int num_tiles, ScanTileStateT &tile_state)
\smemstorage{BlockScan}
Definition: block_scan.cuh:260
A random-access input wrapper for dereferencing array values using a PTX cache load modifier.
BlockScanAlgorithm
BlockScanAlgorithm enumerates alternative algorithms for cub::BlockScan to compute a parallel prefix ...
Definition: block_scan.cuh:57