OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
 
Loading...
Searching...
No Matches
agent_segment_fixup.cuh
Go to the documentation of this file.
1/******************************************************************************
2 * Copyright (c) 2011, Duane Merrill. All rights reserved.
3 * Copyright (c) 2011-2018, NVIDIA CORPORATION. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the NVIDIA CORPORATION nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 *
27 ******************************************************************************/
28
34#pragma once
35
36#include <iterator>
37
39#include "../block/block_load.cuh"
40#include "../block/block_store.cuh"
41#include "../block/block_scan.cuh"
42#include "../block/block_discontinuity.cuh"
43#include "../iterator/cache_modified_input_iterator.cuh"
44#include "../iterator/constant_input_iterator.cuh"
45#include "../util_namespace.cuh"
46
48CUB_NS_PREFIX
49
51namespace cub {
52
53
54/******************************************************************************
55 * Tuning policy types
56 ******************************************************************************/
57
61template <
62 int _BLOCK_THREADS,
63 int _ITEMS_PER_THREAD,
64 BlockLoadAlgorithm _LOAD_ALGORITHM,
65 CacheLoadModifier _LOAD_MODIFIER,
66 BlockScanAlgorithm _SCAN_ALGORITHM>
68{
69 enum
70 {
71 BLOCK_THREADS = _BLOCK_THREADS,
72 ITEMS_PER_THREAD = _ITEMS_PER_THREAD,
73 };
74
75 static const BlockLoadAlgorithm LOAD_ALGORITHM = _LOAD_ALGORITHM;
76 static const CacheLoadModifier LOAD_MODIFIER = _LOAD_MODIFIER;
77 static const BlockScanAlgorithm SCAN_ALGORITHM = _SCAN_ALGORITHM;
78};
79
80
81/******************************************************************************
82 * Thread block abstractions
83 ******************************************************************************/
84
88template <
89 typename AgentSegmentFixupPolicyT,
90 typename PairsInputIteratorT,
91 typename AggregatesOutputIteratorT,
92 typename EqualityOpT,
93 typename ReductionOpT,
94 typename OffsetT>
96{
97 //---------------------------------------------------------------------
98 // Types and constants
99 //---------------------------------------------------------------------
100
101 // Data type of key-value input iterator
102 typedef typename std::iterator_traits<PairsInputIteratorT>::value_type KeyValuePairT;
103
104 // Value type
105 typedef typename KeyValuePairT::Value ValueT;
106
107 // Tile status descriptor interface type
109
110 // Constants
111 enum
112 {
113 BLOCK_THREADS = AgentSegmentFixupPolicyT::BLOCK_THREADS,
114 ITEMS_PER_THREAD = AgentSegmentFixupPolicyT::ITEMS_PER_THREAD,
115 TILE_ITEMS = BLOCK_THREADS * ITEMS_PER_THREAD,
116
117 // Whether or not do fixup using RLE + global atomics
118 USE_ATOMIC_FIXUP = (CUB_PTX_ARCH >= 350) &&
123
124 // Whether or not the scan operation has a zero-valued identity value (true if we're performing addition on a primitive type)
126 };
127
128 // Cache-modified Input iterator wrapper type (for applying cache modifier) for keys
130 CacheModifiedInputIterator<AgentSegmentFixupPolicyT::LOAD_MODIFIER, KeyValuePairT, OffsetT>, // Wrap the native input pointer with CacheModifiedValuesInputIterator
131 PairsInputIteratorT>::Type // Directly use the supplied input iterator type
132 WrappedPairsInputIteratorT;
133
134 // Cache-modified Input iterator wrapper type (for applying cache modifier) for fixup values
136 CacheModifiedInputIterator<AgentSegmentFixupPolicyT::LOAD_MODIFIER, ValueT, OffsetT>, // Wrap the native input pointer with CacheModifiedValuesInputIterator
137 AggregatesOutputIteratorT>::Type // Directly use the supplied input iterator type
138 WrappedFixupInputIteratorT;
139
140 // Reduce-value-by-segment scan operator
142
143 // Parameterized BlockLoad type for pairs
144 typedef BlockLoad<
145 KeyValuePairT,
146 BLOCK_THREADS,
147 ITEMS_PER_THREAD,
148 AgentSegmentFixupPolicyT::LOAD_ALGORITHM>
150
151 // Parameterized BlockScan type
152 typedef BlockScan<
153 KeyValuePairT,
154 BLOCK_THREADS,
155 AgentSegmentFixupPolicyT::SCAN_ALGORITHM>
157
158 // Callback type for obtaining tile prefix during block scan
159 typedef TilePrefixCallbackOp<
160 KeyValuePairT,
164
165 // Shared memory type for this thread block
167 {
168 struct
169 {
170 typename BlockScanT::TempStorage scan; // Smem needed for tile scanning
171 typename TilePrefixCallbackOpT::TempStorage prefix; // Smem needed for cooperative prefix callback
172 };
173
174 // Smem needed for loading keys
175 typename BlockLoadPairs::TempStorage load_pairs;
176 };
177
178 // Alias wrapper allowing storage to be unioned
179 struct TempStorage : Uninitialized<_TempStorage> {};
180
181
182 //---------------------------------------------------------------------
183 // Per-thread fields
184 //---------------------------------------------------------------------
185
187 WrappedPairsInputIteratorT d_pairs_in;
188 AggregatesOutputIteratorT d_aggregates_out;
189 WrappedFixupInputIteratorT d_fixup_in;
191 ReductionOpT reduction_op;
193
194
195 //---------------------------------------------------------------------
196 // Constructor
197 //---------------------------------------------------------------------
198
199 // Constructor
200 __device__ __forceinline__
203 PairsInputIteratorT d_pairs_in,
204 AggregatesOutputIteratorT d_aggregates_out,
205 EqualityOpT equality_op,
206 ReductionOpT reduction_op)
207 :
208 temp_storage(temp_storage.Alias()),
215 {}
216
217
218 //---------------------------------------------------------------------
219 // Cooperatively scan a device-wide sequence of tiles with other CTAs
220 //---------------------------------------------------------------------
221
222
226 template <bool IS_LAST_TILE>
227 __device__ __forceinline__ void ConsumeTile(
228 OffsetT num_remaining,
229 int tile_idx,
230 OffsetT tile_offset,
232 Int2Type<true> use_atomic_fixup)
233 {
234 KeyValuePairT pairs[ITEMS_PER_THREAD];
235
236 // Load pairs
237 KeyValuePairT oob_pair;
238 oob_pair.key = -1;
239
240 if (IS_LAST_TILE)
241 BlockLoadPairs(temp_storage.load_pairs).Load(d_pairs_in + tile_offset, pairs, num_remaining, oob_pair);
242 else
243 BlockLoadPairs(temp_storage.load_pairs).Load(d_pairs_in + tile_offset, pairs);
244
245 // RLE
246 #pragma unroll
247 for (int ITEM = 1; ITEM < ITEMS_PER_THREAD; ++ITEM)
248 {
249 ValueT* d_scatter = d_aggregates_out + pairs[ITEM - 1].key;
250 if (pairs[ITEM].key != pairs[ITEM - 1].key)
251 atomicAdd(d_scatter, pairs[ITEM - 1].value);
252 else
253 pairs[ITEM].value = reduction_op(pairs[ITEM - 1].value, pairs[ITEM].value);
254 }
255
256 // Flush last item if valid
257 ValueT* d_scatter = d_aggregates_out + pairs[ITEMS_PER_THREAD - 1].key;
258 if ((!IS_LAST_TILE) || (pairs[ITEMS_PER_THREAD - 1].key >= 0))
259 atomicAdd(d_scatter, pairs[ITEMS_PER_THREAD - 1].value);
260 }
261
262
266 template <bool IS_LAST_TILE>
267 __device__ __forceinline__ void ConsumeTile(
268 OffsetT num_remaining,
269 int tile_idx,
270 OffsetT tile_offset,
272 Int2Type<false> use_atomic_fixup)
273 {
274 KeyValuePairT pairs[ITEMS_PER_THREAD];
275 KeyValuePairT scatter_pairs[ITEMS_PER_THREAD];
276
277 // Load pairs
278 KeyValuePairT oob_pair;
279 oob_pair.key = -1;
280
281 if (IS_LAST_TILE)
282 BlockLoadPairs(temp_storage.load_pairs).Load(d_pairs_in + tile_offset, pairs, num_remaining, oob_pair);
283 else
284 BlockLoadPairs(temp_storage.load_pairs).Load(d_pairs_in + tile_offset, pairs);
285
286 CTA_SYNC();
287
288 KeyValuePairT tile_aggregate;
289 if (tile_idx == 0)
290 {
291 // Exclusive scan of values and segment_flags
292 BlockScanT(temp_storage.scan).ExclusiveScan(pairs, scatter_pairs, scan_op, tile_aggregate);
293
294 // Update tile status if this is not the last tile
295 if (threadIdx.x == 0)
296 {
297 // Set first segment id to not trigger a flush (invalid from exclusive scan)
298 scatter_pairs[0].key = pairs[0].key;
299
300 if (!IS_LAST_TILE)
301 tile_state.SetInclusive(0, tile_aggregate);
302
303 }
304 }
305 else
306 {
307 // Exclusive scan of values and segment_flags
308 TilePrefixCallbackOpT prefix_op(tile_state, temp_storage.prefix, scan_op, tile_idx);
309 BlockScanT(temp_storage.scan).ExclusiveScan(pairs, scatter_pairs, scan_op, prefix_op);
310 tile_aggregate = prefix_op.GetBlockAggregate();
311 }
312
313 // Scatter updated values
314 #pragma unroll
315 for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM)
316 {
317 if (scatter_pairs[ITEM].key != pairs[ITEM].key)
318 {
319 // Update the value at the key location
320 ValueT value = d_fixup_in[scatter_pairs[ITEM].key];
321 value = reduction_op(value, scatter_pairs[ITEM].value);
322
323 d_aggregates_out[scatter_pairs[ITEM].key] = value;
324 }
325 }
326
327 // Finalize the last item
328 if (IS_LAST_TILE)
329 {
330 // Last thread will output final count and last item, if necessary
331 if (threadIdx.x == BLOCK_THREADS - 1)
332 {
333 // If the last tile is a whole tile, the inclusive prefix contains accumulated value reduction for the last segment
334 if (num_remaining == TILE_ITEMS)
335 {
336 // Update the value at the key location
337 OffsetT last_key = pairs[ITEMS_PER_THREAD - 1].key;
338 d_aggregates_out[last_key] = reduction_op(tile_aggregate.value, d_fixup_in[last_key]);
339 }
340 }
341 }
342 }
343
344
348 __device__ __forceinline__ void ConsumeRange(
349 int num_items,
350 int num_tiles,
352 {
353 // Blocks are launched in increasing order, so just assign one tile per block
354 int tile_idx = (blockIdx.x * gridDim.y) + blockIdx.y; // Current tile index
355 OffsetT tile_offset = tile_idx * TILE_ITEMS; // Global offset for the current tile
356 OffsetT num_remaining = num_items - tile_offset; // Remaining items (including this tile)
357
358 if (num_remaining > TILE_ITEMS)
359 {
360 // Not the last tile (full)
361 ConsumeTile<false>(num_remaining, tile_idx, tile_offset, tile_state, Int2Type<USE_ATOMIC_FIXUP>());
362 }
363 else if (num_remaining > 0)
364 {
365 // The last tile (possibly partially-full)
366 ConsumeTile<true>(num_remaining, tile_idx, tile_offset, tile_state, Int2Type<USE_ATOMIC_FIXUP>());
367 }
368 }
369
370};
371
372
373} // CUB namespace
374CUB_NS_POSTFIX // Optional outer namespace(s)
375
The BlockLoad class provides collective data movement methods for loading a linear segment of items f...
The BlockScan class provides collective methods for computing a parallel prefix sum/scan of items par...
__device__ __forceinline__ void ExclusiveScan(T input, T &output, T initial_value, ScanOp scan_op)
Computes an exclusive block-wide prefix scan using the specified binary scan_op functor....
A random-access input wrapper for dereferencing array values using a PTX cache load modifier.
BlockLoadAlgorithm
cub::BlockLoadAlgorithm enumerates alternative algorithms for cub::BlockLoad to read a linear segment...
CacheLoadModifier
Enumeration of cache modifiers for memory load operations.
__device__ __forceinline__ void Load(InputIteratorT block_itr, InputT(&items)[ITEMS_PER_THREAD])
Load a linear segment of items from memory.
CTA_SYNC()
Definition util_ptx.cuh:255
Optional outer namespace(s)
KeyT const ValueT ValueT OffsetT OffsetT num_items
[in] Total number of input data items
OffsetsOutputIteratorT LengthsOutputIteratorT NumRunsOutputIteratorT ScanTileStateT EqualityOpT OffsetT int num_tiles
< [in] Total number of tiles for the entire problem
BlockScanAlgorithm
BlockScanAlgorithm enumerates alternative algorithms for cub::BlockScan to compute a parallel prefix ...
OffsetT OffsetT
[in] Total number of input data items
UniqueOutputIteratorT ValuesInputIteratorT AggregatesOutputIteratorT NumRunsOutputIteratorT ScanTileStateT tile_state
Tile status interface.
UniqueOutputIteratorT ValuesInputIteratorT AggregatesOutputIteratorT NumRunsOutputIteratorT ScanTileStateT int EqualityOpT equality_op
KeyT equality operator.
< The BlockScan algorithm to use
static const BlockLoadAlgorithm LOAD_ALGORITHM
The BlockLoad algorithm to use.
static const CacheLoadModifier LOAD_MODIFIER
Cache load modifier for reading input elements.
static const BlockScanAlgorithm SCAN_ALGORITHM
The BlockScan algorithm to use.
@ BLOCK_THREADS
Threads per thread block.
@ ITEMS_PER_THREAD
Items per thread (per tile of input)
AgentSegmentFixup implements a stateful abstraction of CUDA thread blocks for participating in device...
WrappedFixupInputIteratorT d_fixup_in
Fixup input values.
__device__ __forceinline__ void ConsumeTile(OffsetT num_remaining, int tile_idx, OffsetT tile_offset, ScanTileStateT &tile_state, Int2Type< false > use_atomic_fixup)
__device__ __forceinline__ void ConsumeRange(int num_items, int num_tiles, ScanTileStateT &tile_state)
AggregatesOutputIteratorT d_aggregates_out
Output value aggregates.
ReduceBySegmentOpT scan_op
Reduce-by-segment scan operator.
WrappedPairsInputIteratorT d_pairs_in
Input keys.
ReductionOpT reduction_op
Reduction operator.
_TempStorage & temp_storage
Reference to temp_storage.
__device__ __forceinline__ AgentSegmentFixup(TempStorage &temp_storage, PairsInputIteratorT d_pairs_in, AggregatesOutputIteratorT d_aggregates_out, EqualityOpT equality_op, ReductionOpT reduction_op)
__device__ __forceinline__ void ConsumeTile(OffsetT num_remaining, int tile_idx, OffsetT tile_offset, ScanTileStateT &tile_state, Int2Type< true > use_atomic_fixup)
InequalityWrapper< EqualityOpT > inequality_op
KeyT inequality operator.
\smemstorage{BlockLoad}
\smemstorage{BlockScan}
Type equality test.
Definition util_type.cuh:99
Type selection (IF ? ThenType : ElseType)
Definition util_type.cuh:73
Inequality functor (wraps equality functor)
Allows for the treatment of an integral constant as a type at compile-time (e.g., to achieve static c...
< Binary reduction operator to apply to values
Type traits.
A storage-backing wrapper that allows types with non-trivial constructors to be aliased in unions.
#define CUB_PTX_ARCH
CUB_PTX_ARCH reflects the PTX version targeted by the active compiler pass (or zero during the host p...
Definition util_arch.cuh:53