13 #include "VCluster_base.hpp"
14 #include "VCluster_meta_function.hpp"
15 #include "util/math_util_complex.hpp"
16 #include "memory/mem_conf.hpp"
17 #include "util/cuda_util.hpp"
22 #ifndef MAX_NUMER_OF_PROPERTIES
23 #define MAX_NUMER_OF_PROPERTIES 20
27 extern CudaMemory exp_tmp2[MAX_NUMER_OF_PROPERTIES];
30 extern CudaMemory rem_tmp2[MAX_NUMER_OF_PROPERTIES];
34 extern size_t NBX_cnt;
36 void bt_sighandler(
int sig, siginfo_t * info,
void * ctx);
57 template<
typename InternalMemory = HeapMemory>
71 unsigned int NBX_prc_scnt = 0;
72 unsigned int NBX_prc_pcnt = 0;
86 template<
typename Memory>
90 openfpm::vector_fr<BMemory<Memory>> *
recv_buf;
121 base_info<InternalMemory> NBX_prc_bi[NQUEUE];
129 template<
int ... prp>
133 template<
typename op,
141 op & op_param,
size_t opt
143 if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value)
147 std::cout << __FILE__ <<
":" << __LINE__ <<
" error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" <<
148 " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl;
173 template<
typename op,
typename T,
typename S,
template <
typename>
class layout_base>
182 sz_recv_byte[NBX_prc_scnt].resize(sz_recv.
size());
190 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
196 send_sz_byte.resize(0);
201 for (
size_t i = 0; i <
send.size() ; i++)
245 mem[NBX_prc_scnt]->
incRef();
247 for (
size_t i = 0; i <
send.size() ; i++)
257 NBX_prc_bi[NBX_prc_scnt].set(&this->
recv_buf[NBX_prc_scnt],prc_recv,sz_recv_byte[NBX_prc_scnt],this->
tags[NBX_prc_scnt],opt);
260 if (opt & RECEIVE_KNOWN)
263 if (opt & KNOWN_ELEMENT_OR_BYTE)
268 for (
size_t i = 0 ; i < sz_recv.
size() ; i++)
269 {sz_recv_byte[NBX_prc_scnt].get(i) = sz_recv.get(i) *
sizeof(
typename T::value_type);}
273 #ifndef DISABLE_ALL_RTTI
274 std::cout << __FILE__ <<
":" << __LINE__ <<
" Error " << demangle(
typeid(T).name()) <<
" the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;
279 prc_recv.
size(),(
size_t *)prc_recv.getPointer(),(
size_t *)sz_recv_byte[NBX_prc_scnt].getPointer(),
msg_alloc_known,(
void *)&NBX_prc_bi);
285 sz_recv_byte[NBX_prc_scnt] = self_base::sz_recv_tmp;
322 static void *
msg_alloc(
size_t msg_i ,
size_t total_msg,
size_t total_p,
size_t i,
size_t ri,
size_t tag,
void * ptr)
328 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Internal error this processor is not suppose to receive\n";
334 rinfo.
recv_buf->get(ri).resize(msg_i);
338 rinfo.
sz->add(msg_i);
339 rinfo.
tags->add(tag);
344 if (rinfo.
opt & MPI_GPU_DIRECT)
346 #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
347 return rinfo.
recv_buf->last().getDevicePointer();
349 return rinfo.
recv_buf->last().getPointer();
353 return rinfo.
recv_buf->last().getPointer();
370 static void *
msg_alloc_known(
size_t msg_i ,
size_t total_msg,
size_t total_p,
size_t i,
size_t ri,
size_t tag,
void * ptr)
376 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Internal error this processor is not suppose to receive\n";
382 rinfo.
recv_buf->get(ri).resize(msg_i);
385 return rinfo.
recv_buf->last().getPointer();
401 template<
typename op,
typename T,
typename S,
template <
typename>
class layout_base ,
unsigned int ... prp >
412 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(
recv,
self_base::recv_buf[NBX_prc_pcnt], sz, sz_byte, op_param,opt);
460 return SGather<T,S,layout_base>(
send,
recv,prc,sz,root);
465 enum { value = index };
509 {std::cerr <<
"Error: " << __FILE__ <<
":" << __LINE__ <<
" using SGather in general the sending object and the receiving object must be different" << std::endl;}
531 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
626 template<
typename T,
typename S,
template <
typename>
class layout_base=
memory_traits_lin>
640 sz_byte.resize(sz.size());
644 for (
size_t i = 0; i < sz.size() ; i++)
646 send_buf.add((
char *)
send.getPointer() +
sizeof(
typename T::value_type)*ptr );
647 sz_byte.get(i) = sz.get(i) *
sizeof(
typename T::value_type);
660 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
681 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
701 struct recv_buff_reorder
713 :proc(0),tag(0),pos(0)
717 bool operator<(
const recv_buff_reorder & rd)
const
720 {
return tag < rd.tag;}
722 return (proc < rd.proc);
730 for (
size_t i = 0 ; i < rcv.
size() ; i++)
732 rcv.get(i).proc = prc.get(i);
734 {rcv.get(i).tag =
tags.get(i);}
736 {rcv.get(i).tag = (
unsigned int)-1;}
743 openfpm::vector_fr<BMemory<InternalMemory>> recv_ord;
744 recv_ord.resize(rcv.
size());
747 prc_ord.resize(rcv.
size());
750 sz_recv_ord.resize(rcv.
size());
753 for (
size_t i = 0 ; i < rcv.
size() ; i++)
756 prc_ord.get(i) = rcv.get(i).proc;
757 sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
762 for (
size_t i = 0 ; i < rcv.
size() ; i++)
768 sz_recv.swap(sz_recv_ord);
811 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
818 mem[NBX_prc_scnt]->
decRef();
819 delete mem[NBX_prc_scnt];
820 delete pmem[NBX_prc_scnt];
823 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
873 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
908 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
918 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
925 mem[NBX_prc_scnt]->
decRef();
926 delete mem[NBX_prc_scnt];
927 delete pmem[NBX_prc_scnt];
933 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,&sz_recv_byte_out,opa,opt);
969 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
979 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
1013 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
1022 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
1029 mem[NBX_prc_scnt]->
decRef();
1030 delete mem[NBX_prc_scnt];
1031 delete pmem[NBX_prc_scnt];
1037 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,NULL,opa,opt);
1072 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
1081 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
1124 template<
typename op,
1127 template <
typename>
class layout_base,
1138 prepare_send_buffer<op,T,S,layout_base>(
send,
recv,prc_send,prc_recv,recv_sz,opt);
1145 mem[NBX_prc_scnt]->
decRef();
1146 delete mem[NBX_prc_scnt];
1147 delete pmem[NBX_prc_scnt];
1193 template<
typename op,
1196 template <
typename>
class layout_base,
1207 prepare_send_buffer<op,T,S,layout_base>(
send,
recv,prc_send,prc_recv,recv_sz,opt);
1219 template<
typename T,
1235 mem[NBX_prc_pcnt]->
decRef();
1236 delete mem[NBX_prc_pcnt];
1237 delete pmem[NBX_prc_pcnt];
1240 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
1247 if (NBX_prc_scnt == NBX_prc_pcnt)
1261 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
1276 mem[NBX_prc_pcnt]->
decRef();
1277 delete mem[NBX_prc_pcnt];
1278 delete pmem[NBX_prc_pcnt];
1284 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,&sz_recv_byte_out,opa,opt);
1287 if (NBX_prc_scnt == NBX_prc_pcnt)
1301 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
1315 mem[NBX_prc_pcnt]->
decRef();
1316 delete mem[NBX_prc_pcnt];
1317 delete pmem[NBX_prc_pcnt];
1323 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,NULL,opa,opt);
1326 if (NBX_prc_scnt == NBX_prc_pcnt)
1340 template<
typename op,
1343 template <
typename>
class layout_base,
1359 mem[NBX_prc_pcnt]->
decRef();
1360 delete mem[NBX_prc_pcnt];
1361 delete pmem[NBX_prc_pcnt];
1367 if (NBX_prc_scnt == NBX_prc_pcnt)
1380 extern Vcluster<> * global_v_cluster_private_heap;
1389 static inline void init_global_v_cluster_private(
int *argc,
char ***argv, MPI_Comm ext_comm)
1391 if (global_v_cluster_private_heap == NULL)
1392 {global_v_cluster_private_heap =
new Vcluster<>(argc,argv,ext_comm);}
1394 if (global_v_cluster_private_cuda == NULL)
1398 static inline void delete_global_v_cluster_private()
1400 delete global_v_cluster_private_heap;
1401 delete global_v_cluster_private_cuda;
1404 template<
typename Memory>
1409 return *global_v_cluster_private_heap;
1418 return *global_v_cluster_private_cuda;
1422 template<
typename Memory = HeapMemory>
1425 if (global_v_cluster_private_heap == NULL)
1426 {std::cerr << __FILE__ <<
":" << __LINE__ <<
" Error you must call openfpm_init before using any distributed data structures";}
1438 static inline bool is_openfpm_init()
1440 return ofp_initialized;
1449 void openfpm_init_vcl(
int *argc,
char ***argv, MPI_Comm ext_comm);
1451 size_t openfpm_vcluster_compilation_mask();
1458 void openfpm_finalize();
1465 static void openfpm_init(
int *argc,
char ***argv, MPI_Comm ext_comm=MPI_COMM_WORLD)
1467 if (ofp_initialized)
1471 openfpm_init_vcl(argc,argv, ext_comm);
1473 size_t compiler_mask = CUDA_ON_BACKEND;
1477 if (compiler_mask != openfpm_vcluster_compilation_mask() || compiler_mask != openfpm_ofpmmemory_compilation_mask())
1479 std::cout << __FILE__ <<
":" << __LINE__ <<
" Error: the program has been compiled with CUDA_ON_BACKEND: " << compiler_mask <<
" but libvcluster has been compiled with CUDA_ON_BACKEND: " <<
1480 openfpm_vcluster_compilation_mask() <<
", and libofpmmemory has been compiled with CUDA_ON_BACKEND: " << openfpm_ofpmmemory_compilation_mask() <<
1481 " recompile the library with the right CUDA_ON_BACKEND" << std::endl;
It override the behavior if size()
virtual void decRef()
Decrement the reference counter.
virtual void incRef()
Increment the reference counter.
This class allocate, and destroy CPU memory.
This class virtualize the cluster of PC as a set of processes that communicate.
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
size_t size()
Get the total number of processors.
openfpm::vector_fr< BMemory< HeapMemory > > recv_buf[NQUEUE]
Receive buffers.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t getProcessUnitID()
Get the process unit id.
openfpm::vector< size_t > tags[NQUEUE]
tags receiving
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
openfpm::vector< MPI_Request > req
vector of MPI requests
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
MPI_Comm ext_comm
external communicator
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Implementation of VCluster class.
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
bool SGather(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Gather, gather the data from all processors into one node.
Vcluster(int *argc, char ***argv, MPI_Comm ext_comm=MPI_COMM_WORLD)
Constructor.
static void * msg_alloc(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
bool SSendRecvP_op(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
void barrier()
Just a call to mpi_barrier.
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Synchronize with SSendRecvP.
void prepare_send_buffer(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt)
Prepare the send buffer and send the message to other processors.
bool SSendRecvP_opAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
static void * msg_alloc_known(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
bool SScatter(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Scatter, scatter the data from one processor to the other node.
bool SSendRecvWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecv.
bool SGather(T &send, S &recv, size_t root)
Semantic Gather, gather the data from all processors into one node.
bool SSendRecvP_opWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Synchronize with SSendRecvP_op.
void process_receive_buffer_with_prp(S &recv, openfpm::vector< size_t > *sz, openfpm::vector< size_t > *sz_byte, op &op_param, size_t opt)
Process the receive buffer.
void reset_recv_buf()
Reset the receive buffer.
void reorder_buffer(openfpm::vector< size_t > &prc, const openfpm::vector< size_t > &tags, openfpm::vector< size_t > &sz_recv)
reorder the receiving buffer
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecvP.
bool SSendRecvAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
bool SSendRecv(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
KeyT const ValueT ValueT OffsetIteratorT OffsetIteratorT int
[in] The number of segments that comprise the sorting data
openfpm::vector< size_t > * tags
tags
base_info(openfpm::vector_fr< BMemory< Memory >> *recv_buf, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, openfpm::vector< size_t > &tags, size_t opt)
constructor
openfpm::vector< size_t > * sz
size of each message
openfpm::vector< size_t > * prc
receiving processor list
openfpm::vector_fr< BMemory< Memory > > * recv_buf
Receive buffer.
base_info()
default constructor
static void process_recv(Vcluster &vcl, S &recv, openfpm::vector< size_t > *sz_recv, openfpm::vector< size_t > *sz_recv_byte, op &op_param, size_t opt)
Process the receive buffer.
It return true if the object T require complex serialization.
These set of classes generate an array definition at compile-time.
Transform the boost::fusion::vector into memory specification (memory_traits)
Helper class to add data.
There is max_prop inside.