13#include "VCluster_base.hpp"
14#include "VCluster_meta_function.hpp"
15#include "util/math_util_complex.hpp"
16#include "memory/mem_conf.hpp"
17#include "util/cudify/cudify.hpp"
22#ifndef MAX_NUMER_OF_PROPERTIES
23#define MAX_NUMER_OF_PROPERTIES 20
27extern CudaMemory exp_tmp2[MAX_NUMER_OF_PROPERTIES];
30extern CudaMemory rem_tmp2[MAX_NUMER_OF_PROPERTIES];
36void bt_sighandler(
int sig, siginfo_t * info,
void * ctx);
57template<
typename InternalMemory = HeapMemory>
71 unsigned int NBX_prc_scnt = 0;
72 unsigned int NBX_prc_pcnt = 0;
86 template<
typename Memory>
90 openfpm::vector_fr<BMemory<Memory>> *
recv_buf;
121 base_info<InternalMemory> NBX_prc_bi[NQUEUE];
129 template<
int ... prp>
133 template<
typename op,
140 if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value)
144 std::cout << __FILE__ <<
":" << __LINE__ <<
" error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" <<
145 " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl;
170 template<
typename op,
typename T,
typename S,
template <
typename>
class layout_base>
178 sz_recv_byte[NBX_prc_scnt].resize(sz_recv.
size());
186 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
192 send_sz_byte.resize(0);
197 for (
size_t i = 0; i <
send.size() ; i++)
241 mem[NBX_prc_scnt]->
incRef();
243 for (
size_t i = 0; i <
send.size() ; i++)
253 NBX_prc_bi[NBX_prc_scnt].set(&this->
recv_buf[NBX_prc_scnt],prc_recv,sz_recv_byte[NBX_prc_scnt],this->
tags[NBX_prc_scnt],opt);
256 if (opt & RECEIVE_KNOWN)
259 if (opt & KNOWN_ELEMENT_OR_BYTE)
264 for (
size_t i = 0 ; i < sz_recv.
size() ; i++)
265 {sz_recv_byte[NBX_prc_scnt].get(i) = sz_recv.get(i) *
sizeof(
typename T::value_type);}
269#ifndef DISABLE_ALL_RTTI
270 std::cout << __FILE__ <<
":" << __LINE__ <<
" Error " << demangle(
typeid(T).name()) <<
" the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;
275 prc_recv.
size(),(
size_t *)prc_recv.getPointer(),(
size_t *)sz_recv_byte[NBX_prc_scnt].getPointer(),
msg_alloc_known,(
void *)&NBX_prc_bi);
281 sz_recv_byte[NBX_prc_scnt] = self_base::sz_recv_tmp;
318 static void *
msg_alloc(
size_t msg_i ,
size_t total_msg,
size_t total_p,
size_t i,
size_t ri,
size_t tag,
void * ptr)
324 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Internal error this processor is not suppose to receive\n";
330 rinfo.
recv_buf->get(ri).resize(msg_i);
334 rinfo.
sz->add(msg_i);
335 rinfo.
tags->add(tag);
340 if (rinfo.
opt & MPI_GPU_DIRECT)
342#if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
343 return rinfo.
recv_buf->last().getDevicePointer();
345 return rinfo.
recv_buf->last().getPointer();
349 return rinfo.
recv_buf->last().getPointer();
366 static void *
msg_alloc_known(
size_t msg_i ,
size_t total_msg,
size_t total_p,
size_t i,
size_t ri,
size_t tag,
void * ptr)
372 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Internal error this processor is not suppose to receive\n";
378 rinfo.
recv_buf->get(ri).resize(msg_i);
381 return rinfo.
recv_buf->last().getPointer();
397 template<
typename op,
typename T,
typename S,
template <
typename>
class layout_base ,
unsigned int ... prp >
407 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(
recv,
self_base::recv_buf[NBX_prc_pcnt], sz, sz_byte, op_param,opt);
455 return SGather<T,S,layout_base>(
send,
recv,prc,sz,root);
460 enum { value = index };
503 {std::cerr <<
"Error: " << __FILE__ <<
":" << __LINE__ <<
" using SGather in general the sending object and the receiving object must be different" << std::endl;}
525 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
591 MPI_Barrier(MPI_COMM_WORLD);
620 template<
typename T,
typename S,
template <
typename>
class layout_base=
memory_traits_lin>
634 sz_byte.resize(sz.size());
638 for (
size_t i = 0; i < sz.size() ; i++)
640 send_buf.add((
char *)
send.getPointer() +
sizeof(
typename T::value_type)*ptr );
641 sz_byte.get(i) = sz.get(i) *
sizeof(
typename T::value_type);
654 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
675 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
695 struct recv_buff_reorder
707 :proc(0),tag(0),pos(0)
711 bool operator<(
const recv_buff_reorder & rd)
const
714 {
return tag < rd.tag;}
716 return (proc < rd.proc);
724 for (
size_t i = 0 ; i < rcv.
size() ; i++)
726 rcv.get(i).proc = prc.get(i);
728 {rcv.get(i).tag =
tags.get(i);}
730 {rcv.get(i).tag = (
unsigned int)-1;}
737 openfpm::vector_fr<BMemory<InternalMemory>> recv_ord;
738 recv_ord.resize(rcv.
size());
741 prc_ord.resize(rcv.
size());
744 sz_recv_ord.resize(rcv.
size());
747 for (
size_t i = 0 ; i < rcv.
size() ; i++)
750 prc_ord.get(i) = rcv.get(i).proc;
751 sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
756 for (
size_t i = 0 ; i < rcv.
size() ; i++)
762 sz_recv.swap(sz_recv_ord);
804 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
811 mem[NBX_prc_scnt]->
decRef();
812 delete mem[NBX_prc_scnt];
813 delete pmem[NBX_prc_scnt];
816 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
865 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
900 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
909 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
916 mem[NBX_prc_scnt]->
decRef();
917 delete mem[NBX_prc_scnt];
918 delete pmem[NBX_prc_scnt];
924 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,&sz_recv_byte_out,opa,opt);
960 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
969 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
1003 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
1011 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
1018 mem[NBX_prc_scnt]->
decRef();
1019 delete mem[NBX_prc_scnt];
1020 delete pmem[NBX_prc_scnt];
1026 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,NULL,opa,opt);
1061 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
1069 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
1112 template<
typename op,
1115 template <
typename>
class layout_base,
1125 prepare_send_buffer<op,T,S,layout_base>(
send,
recv,prc_send,prc_recv,recv_sz,opt);
1132 mem[NBX_prc_scnt]->
decRef();
1133 delete mem[NBX_prc_scnt];
1134 delete pmem[NBX_prc_scnt];
1180 template<
typename op,
1183 template <
typename>
class layout_base,
1193 prepare_send_buffer<op,T,S,layout_base>(
send,
recv,prc_send,prc_recv,recv_sz,opt);
1205 template<
typename T,
1220 mem[NBX_prc_pcnt]->
decRef();
1221 delete mem[NBX_prc_pcnt];
1222 delete pmem[NBX_prc_pcnt];
1225 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
1232 if (NBX_prc_scnt == NBX_prc_pcnt)
1246 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
1260 mem[NBX_prc_pcnt]->
decRef();
1261 delete mem[NBX_prc_pcnt];
1262 delete pmem[NBX_prc_pcnt];
1268 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,&sz_recv_byte_out,opa,opt);
1271 if (NBX_prc_scnt == NBX_prc_pcnt)
1285 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
1298 mem[NBX_prc_pcnt]->
decRef();
1299 delete mem[NBX_prc_pcnt];
1300 delete pmem[NBX_prc_pcnt];
1306 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,NULL,opa,opt);
1309 if (NBX_prc_scnt == NBX_prc_pcnt)
1323 template<
typename op,
1326 template <
typename>
class layout_base,
1341 mem[NBX_prc_pcnt]->
decRef();
1342 delete mem[NBX_prc_pcnt];
1343 delete pmem[NBX_prc_pcnt];
1349 if (NBX_prc_scnt == NBX_prc_pcnt)
1364extern Vcluster<> * global_v_cluster_private_heap;
1373static inline void init_global_v_cluster_private(
int *argc,
char ***argv)
1375 if (global_v_cluster_private_heap == NULL)
1376 {global_v_cluster_private_heap =
new Vcluster<>(argc,argv);}
1378 if (global_v_cluster_private_cuda == NULL)
1382static inline void delete_global_v_cluster_private()
1384 delete global_v_cluster_private_heap;
1385 delete global_v_cluster_private_cuda;
1388template<
typename Memory>
1393 return *global_v_cluster_private_heap;
1402 return *global_v_cluster_private_cuda;
1406template<
typename Memory = HeapMemory>
1409 if (global_v_cluster_private_heap == NULL)
1410 {std::cerr << __FILE__ <<
":" << __LINE__ <<
" Error you must call openfpm_init before using any distributed data structures";}
1422static inline bool is_openfpm_init()
1424 return ofp_initialized;
1433void openfpm_init_vcl(
int *argc,
char ***argv);
1435size_t openfpm_vcluster_compilation_mask();
1442void openfpm_finalize();
1449static void openfpm_init(
int *argc,
char ***argv)
1451 openfpm_init_vcl(argc,argv);
1453 size_t compiler_mask = CUDA_ON_BACKEND;
1457 if (compiler_mask != openfpm_vcluster_compilation_mask() || compiler_mask != openfpm_ofpmmemory_compilation_mask())
1459 std::cout << __FILE__ <<
":" << __LINE__ <<
" Error: the program has been compiled with CUDA_ON_BACKEND: " << compiler_mask <<
" but libvcluster has been compiled with CUDA_ON_BACKEND: " <<
1460 openfpm_vcluster_compilation_mask() <<
", and libofpmmemory has been compiled with CUDA_ON_BACKEND: " << openfpm_ofpmmemory_compilation_mask() <<
1461 " recompile the library with the right CUDA_ON_BACKEND" << std::endl;
It override the behavior if size()
virtual void decRef()
Decrement the reference counter.
virtual void incRef()
Increment the reference counter.
This class allocate, and destroy CPU memory.
This class virtualize the cluster of PC as a set of processes that communicate.
size_t size()
Get the total number of processors.
openfpm::vector_fr< BMemory< InternalMemory > > recv_buf[NQUEUE]
Receive buffers.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t getProcessUnitID()
Get the process unit id.
openfpm::vector< size_t > tags[NQUEUE]
tags receiving
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
openfpm::vector< MPI_Request > req
vector of MPI requests
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Implementation of VCluster class.
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
bool SGather(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Gather, gather the data from all processors into one node.
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Vcluster(int *argc, char ***argv)
Constructor.
bool SSendRecvP_op(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
void barrier()
Just a call to mpi_barrier.
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Synchronize with SSendRecvP.
void prepare_send_buffer(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt)
Prepare the send buffer and send the message to other processors.
static void * msg_alloc_known(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
bool SSendRecvP_opAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
bool SScatter(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Scatter, scatter the data from one processor to the other node.
bool SSendRecvWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecv.
bool SGather(T &send, S &recv, size_t root)
Semantic Gather, gather the data from all processors into one node.
bool SSendRecvP_opWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Synchronize with SSendRecvP_op.
void process_receive_buffer_with_prp(S &recv, openfpm::vector< size_t > *sz, openfpm::vector< size_t > *sz_byte, op &op_param, size_t opt)
Process the receive buffer.
static void * msg_alloc(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
void reset_recv_buf()
Reset the receive buffer.
void reorder_buffer(openfpm::vector< size_t > &prc, const openfpm::vector< size_t > &tags, openfpm::vector< size_t > &sz_recv)
reorder the receiving buffer
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecvP.
bool SSendRecvAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
bool SSendRecv(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
Implementation of 1-D std::vector like structure.
base_info(openfpm::vector_fr< BMemory< Memory > > *recv_buf, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, openfpm::vector< size_t > &tags, size_t opt)
constructor
openfpm::vector< size_t > * tags
tags
openfpm::vector< size_t > * sz
size of each message
openfpm::vector< size_t > * prc
receiving processor list
openfpm::vector_fr< BMemory< Memory > > * recv_buf
Receive buffer.
base_info()
default constructor
static void process_recv(Vcluster &vcl, S &recv, openfpm::vector< size_t > *sz_recv, openfpm::vector< size_t > *sz_recv_byte, op &op_param, size_t opt)
Process the receive buffer.
It return true if the object T require complex serialization.
These set of classes generate an array definition at compile-time.
Transform the boost::fusion::vector into memory specification (memory_traits)
Helper class to add data.
There is max_prop inside.