12 #include "VCluster_base.hpp"
13 #include "VCluster_meta_function.hpp"
15 void bt_sighandler(
int sig, siginfo_t * info,
void * ctx);
89 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
99 for (
size_t i = 0; i < send.
size() ; i++)
113 for (
size_t i = 0; i < send.
size() ; i++)
126 if (opt & RECEIVE_KNOWN)
129 if (opt & KNOWN_ELEMENT_OR_BYTE)
134 for (
size_t i = 0 ; i < sz_recv.
size() ; i++)
135 sz_recv_byte.get(i) = sz_recv.get(i) *
sizeof(
typename T::value_type);
138 {std::cout << __FILE__ <<
":" << __LINE__ <<
" Error " << demangle(
typeid(T).name()) <<
" the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;}
141 prc_recv.
size(),(
size_t *)prc_recv.getPointer(),(
size_t *)sz_recv_byte.getPointer(),
msg_alloc_known,(
void *)&bi);
147 sz_recv_byte = sz_recv_tmp;
194 :recv_buf(recv_buf),prc(prc),sz(sz)
211 static void *
msg_alloc(
size_t msg_i ,
size_t total_msg,
size_t total_p,
size_t i,
size_t ri,
void * ptr)
217 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Internal error this processor is not suppose to receive\n";
223 rinfo.
recv_buf->get(ri).resize(msg_i);
230 return rinfo.
recv_buf->last().getPointer();
247 static void *
msg_alloc_known(
size_t msg_i ,
size_t total_msg,
size_t total_p,
size_t i,
size_t ri,
void * ptr)
253 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Internal error this processor is not suppose to receive\n";
259 rinfo.
recv_buf->get(ri).resize(msg_i);
262 return rinfo.
recv_buf->last().getPointer();
278 template<
typename op,
typename T,
typename S,
template <
typename>
class layout_base ,
unsigned int ... prp >
287 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv,
recv_buf, sz, sz_byte, op_param);
335 return SGather(send,recv,prc,sz,root);
340 enum { value = index };
382 if (&send == (T *)&recv)
383 {std::cerr <<
"Error: " << __FILE__ <<
":" << __LINE__ <<
" using SGather in general the sending object and the receiving object must be different" << std::endl;}
403 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
494 sz_byte.resize(sz.
size());
498 for (
size_t i = 0; i < sz.
size() ; i++)
500 send_buf.add((
char *)send.getPointer() +
sizeof(
typename T::value_type)*ptr );
501 sz_byte.get(i) = sz.get(i) *
sizeof(
typename T::value_type);
512 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
531 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
551 struct recv_buff_reorder
565 bool operator<(
const recv_buff_reorder & rd)
const
567 return proc < rd.proc;
575 for (
size_t i = 0 ; i < rcv.
size() ; i++)
577 rcv.get(i).proc = prc.get(i);
585 recv_ord.resize(rcv.
size());
588 prc_ord.resize(rcv.
size());
591 sz_recv_ord.resize(rcv.
size());
594 for (
size_t i = 0 ; i < rcv.
size() ; i++)
596 recv_ord.get(i).swap(
recv_buf.get(rcv.get(i).pos));
597 prc_ord.get(i) = rcv.get(i).proc;
598 sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
604 sz_recv.swap(sz_recv_ord);
646 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
649 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number,
MetaFuncOrd>::result ind_prop_to_pack;
695 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
701 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,&sz_recv_byte,opa);
734 template<
typename T,
typename S,
template <
typename>
class layout_base,
int ... prp>
742 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(
send,
recv,prc_send,prc_recv,sz_recv,opt);
748 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(
recv,&sz_recv,NULL,opa);
789 template<
typename op,
792 template <
typename>
class layout_base,
802 prepare_send_buffer<op,T,S,layout_base>(
send,
recv,prc_send,prc_recv,recv_sz,opt);
816 extern Vcluster * global_v_cluster_private;
824 static inline void init_global_v_cluster_private(
int *argc,
char ***argv)
826 if (global_v_cluster_private == NULL)
827 global_v_cluster_private =
new Vcluster(argc,argv);
830 static inline void delete_global_v_cluster_private()
832 delete global_v_cluster_private;
835 static inline Vcluster & create_vcluster()
839 if (global_v_cluster_private == NULL)
840 std::cerr << __FILE__ <<
":" << __LINE__ <<
" Error you must call openfpm_init before using any distributed data structures";
844 return *global_v_cluster_private;
854 static inline bool is_openfpm_init()
856 return ofp_initialized;
864 static inline void openfpm_init(
int *argc,
char ***argv)
868 PetscInitialize(argc,argv,NULL,NULL);
872 init_global_v_cluster_private(argc,argv);
875 std::cout <<
"OpenFPM is compiled with debug mode LEVEL:1. Remember to remove SE_CLASS1 when you go in production" << std::endl;
879 std::cout <<
"OpenFPM is compiled with debug mode LEVEL:2. Remember to remove SE_CLASS2 when you go in production" << std::endl;
883 std::cout <<
"OpenFPM is compiled with debug mode LEVEL:3. Remember to remove SE_CLASS3 when you go in production" << std::endl;
890 sa.sa_sigaction = bt_sighandler;
891 sigemptyset(&sa.sa_mask);
892 sa.sa_flags = SA_RESTART;
894 sigaction(SIGSEGV, &sa, NULL);
897 program_name = std::string(*argv[0]);
899 ofp_initialized =
true;
908 static inline void openfpm_finalize()
916 delete_global_v_cluster_private();
917 ofp_initialized =
false;
void process_receive_buffer_with_prp(S &recv, openfpm::vector< size_t > *sz, openfpm::vector< size_t > *sz_byte, op &op_param)
Process the receive buffer.
Transform the boost::fusion::vector into memory specification (memory_traits)
base_info(openfpm::vector< BHeapMemory > *recv_buf, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz)
constructor
size_t getProcessUnitID()
Get the process unit id.
bool SSendRecvP_op(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors...
There is max_prop inside.
void reset_recv_buf()
Reset the receive buffer.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
static void * msg_alloc(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, void *ptr)
Call-back to allocate buffer to receive data.
void prepare_send_buffer(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt)
Prepare the send buffer and send the message to other processors.
This class allocate, and destroy CPU memory.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
static void * msg_alloc_known(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, void *ptr)
Call-back to allocate buffer to receive data.
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors...
Implementation of VCluster class.
This class virtualize the cluster of PC as a set of processes that communicate.
bool SSendRecv(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors...
openfpm::vector< BHeapMemory > * recv_buf
Receive buffer.
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors...
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
virtual void incRef()
Increment the reference counter.
bool SGather(T &send, S &recv, size_t root)
Semantic Gather, gather the data from all processors into one node.
static void process_recv(Vcluster &vcl, S &recv, openfpm::vector< size_t > *sz_recv, openfpm::vector< size_t > *sz_recv_byte, op &op_param)
Process the receive buffer.
These set of classes generate an array definition at compile-time.
void reorder_buffer(openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz_recv)
reorder the receiving buffer
It return true if the object T require complex serialization.
openfpm::vector< size_t > & sz
size of each message
bool SGather(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Gather, gather the data from all processors into one node.
virtual void decRef()
Decrement the reference counter.
Helper class to add data.
openfpm::vector< size_t > & prc
receiving processor list
bool SScatter(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Scatter, scatter the data from one processor to the other node.
Vcluster(int *argc, char ***argv)
Constructor.
openfpm::vector< MPI_Request > req
vector of MPI requests
openfpm::vector< BHeapMemory > recv_buf
Receive buffers.