1 #ifndef VCLUSTER_BASE_HPP_
2 #define VCLUSTER_BASE_HPP_
6 #include "MPI_wrapper/MPI_util.hpp"
7 #include "Vector/map_vector.hpp"
8 #include "MPI_wrapper/MPI_IallreduceW.hpp"
9 #include "MPI_wrapper/MPI_IrecvW.hpp"
10 #include "MPI_wrapper/MPI_IsendW.hpp"
11 #include "MPI_wrapper/MPI_IAllGather.hpp"
12 #include "MPI_wrapper/MPI_IBcastW.hpp"
14 #include "Vector/map_vector.hpp"
16 #include "util/check_no_pointers.hpp"
17 #include "util/util_debug.hpp"
19 #include "util/Vcluster_log.hpp"
20 #include "memory/BHeapMemory.hpp"
21 #include "Packer_Unpacker/has_max_prop.hpp"
22 #include "data_type/aggregate.hpp"
28 #define MSG_LENGTH 1024
29 #define MSG_SEND_RECV 1025
30 #define SEND_SPARSE 4096
32 #define NEED_ALL_SIZE 2
34 #define SERIVCE_MESSAGE_TAG 16384
35 #define SEND_RECV_BASE 8192
36 #define GATHER_BASE 24576
38 #define RECEIVE_KNOWN 4
39 #define KNOWN_ELEMENT_OR_BYTE 8
42 extern size_t n_vcluster;
44 extern bool global_mpi_init;
46 extern bool ofp_initialized;
47 extern size_t tot_sent;
48 extern size_t tot_recv;
52 template<
typename T>
void assign(T * ptr1, T * ptr2)
200 int already_finalised;
202 MPI_Finalized(&already_finalised);
203 if (!already_finalised)
205 if (MPI_Finalize() != 0)
207 std::cerr << __FILE__ <<
":" << __LINE__ <<
" MPI_Finalize FAILED \n";
223 check_new(
this,8,VCLUSTER_EVENT,PRJ_VCLUSTER);
228 int already_initialised;
229 MPI_Initialized(&already_initialised);
232 if (!already_initialised)
241 MPI_Comm_size(MPI_COMM_WORLD, &
m_size);
242 MPI_Comm_rank(MPI_COMM_WORLD, &
m_rank);
276 template<
typename T>
void checkType()
280 if (std::is_fundamental<T>::value ==
true)
284 if (std::is_pointer<T>::value ==
true)
285 std::cerr <<
"Error: " << __FILE__ <<
":" << __LINE__ <<
" the type " << demangle(
typeid(T).name()) <<
" is a pointer, sending pointers values has no sense\n";
288 if (std::is_lvalue_reference<T>::value ==
true)
289 std::cerr <<
"Error: " << __FILE__ <<
":" << __LINE__ <<
" the type " << demangle(
typeid(T).name()) <<
" is a pointer, sending pointers values has no sense\n";
292 if (std::is_rvalue_reference<T>::value ==
true)
293 std::cerr <<
"Error: " << __FILE__ <<
":" << __LINE__ <<
" the type " << demangle(
typeid(T).name()) <<
" is a pointer, sending pointers values has no sense\n";
300 std::cerr <<
"Warning: " << __FILE__ <<
":" << __LINE__ <<
" impossible to check the type " << demangle(
typeid(T).name()) <<
" please consider to add a static method \"static bool noPointers()\" \n" ;
305 std::cerr <<
"Error: " << __FILE__ <<
":" << __LINE__ <<
" the type " << demangle(
typeid(T).name()) <<
" has pointers inside, sending pointers values has no sense\n";
324 return MPI_COMM_WORLD;
382 template<
typename T>
void sum(T & num)
402 template<
typename T>
void max(T & num)
422 template<
typename T>
void min(T & num)
483 void * (* msg_alloc)(
size_t,
size_t,
size_t,
size_t,
size_t,
void *),
489 for (
size_t i = 0 ; i < prc.
size() ; i++)
490 send(prc.get(i),SEND_SPARSE +
NBX_cnt,data.get(i).getPointer(),data.get(i).
size());
492 for (
size_t i = 0 ; i < prc_recv.
size() ; i++)
494 void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,ptr_arg);
496 recv(prc_recv.get(i),SEND_SPARSE +
NBX_cnt,ptr_recv,recv_sz.get(i));
502 NBX_cnt = (NBX_cnt + 1) % 1024;
544 void * (* msg_alloc)(
size_t,
size_t,
size_t,
size_t,
size_t,
void *),
545 void * ptr_arg,
long int opt=NONE)
548 checkType<typename T::value_type>();
554 for (
size_t i = 0 ; i < prc.
size() ; i++)
556 ptr_send.get(i) = data.get(i).getPointer();
557 sz_send.get(i) = data.get(i).
size() *
sizeof(
typename T::value_type);
606 void * ptr[],
size_t n_recv,
size_t prc_recv[] ,
608 void * (* msg_alloc)(
size_t,
size_t,
size_t,
size_t,
size_t,
void *),
609 void * ptr_arg,
long int opt=NONE)
613 for (
size_t i = 0 ; i < n_send ; i++)
616 for (
size_t i = 0 ; i < n_recv ; i++)
618 void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,ptr_arg);
620 recv(prc_recv[i],SEND_SPARSE +
NBX_cnt,ptr_recv,sz_recv[i]);
674 void * ptr[],
size_t n_recv,
size_t prc_recv[] ,
675 void * (* msg_alloc)(
size_t,
size_t,
size_t,
size_t,
size_t,
void *),
676 void * ptr_arg,
long int opt=NONE)
678 sz_recv_tmp.resize(n_recv);
682 for (
size_t i = 0 ; i < n_send ; i++)
683 {
send(prc[i],SEND_SPARSE +
NBX_cnt,&sz[i],
sizeof(
size_t));}
685 for (
size_t i = 0 ; i < n_recv ; i++)
686 {
recv(prc_recv[i],SEND_SPARSE +
NBX_cnt,&sz_recv_tmp.get(i),
sizeof(size_t));}
695 for (
size_t i = 0 ; i < n_send ; i++)
698 for (
size_t i = 0 ; i < n_recv ; i++)
700 void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,ptr_arg);
702 recv(prc_recv[i],SEND_SPARSE +
NBX_cnt,ptr_recv,sz_recv_tmp.get(i));
753 void * (* msg_alloc)(
size_t,
size_t,
size_t,
size_t,
size_t,
void *),
754 void * ptr_arg,
long int opt = NONE)
757 std::cerr <<
"Error: " << __FILE__ <<
":" << __LINE__ <<
" this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";
764 for (
size_t i = 0 ; i < n_send ; i++)
771 check_valid(ptr[i],sz[i]);
775 MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE +
NBX_cnt, MPI_COMM_WORLD,&
req.last()));
783 bool reached_bar_req =
false;
796 MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,SEND_SPARSE +
NBX_cnt,MPI_COMM_WORLD,&stat,&stat_t));
803 MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize));
806 void * ptr = msg_alloc(msize,0,0,stat_t.MPI_SOURCE,rid,ptr_arg);
815 check_valid(ptr,msize);
818 MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,SEND_SPARSE+
NBX_cnt,MPI_COMM_WORLD,&stat_t));
821 check_valid(ptr,msize);
827 if (reached_bar_req ==
false)
831 {MPI_SAFE_CALL(MPI_Testall(
req.
size(),&
req.get(0),&flag,MPI_STATUSES_IGNORE));}
837 {MPI_SAFE_CALL(MPI_Ibarrier(MPI_COMM_WORLD,&
bar_req));reached_bar_req =
true;}
847 }
while (flag ==
false);
877 bool send(
size_t proc,
size_t tag,
const void * mem,
size_t sz)
885 MPI_IsendWB::send(proc,SEND_RECV_BASE + tag,mem,sz,
req.last());
943 bool recv(
size_t proc,
size_t tag,
void * v,
size_t sz)
void sum(T &num)
Sum the numbers across all processors and get the result.
openfpm::vector< void * > ptr_send
vector of pointers of send buffers
size_t getProcessUnitID()
Get the process unit id.
void execute()
Execute all the requests.
int m_size
number of processes
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
bool Bcast(openfpm::vector< T, Mem, gr > &v, size_t root)
Broadcast the data to all processors.
size_t rank()
Get the process unit id.
Vcluster_base(const Vcluster_base &)
disable copy constructor
Set of wrapping classing for MPI_Iallreduce.
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
std::vector< int > post_exe
vector of functions to execute after all the request has been performed
Vcluster_base(int *argc, char ***argv)
Virtual cluster constructor.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
This class check if the type T has pointers inside.
openfpm::vector< MPI_Status > stat
vector of MPI status
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
unsigned short us
unsigned short
int numPE
number of processing unit per process
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], size_t sz_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t size()
Get the total number of processors.
This class virtualize the cluster of PC as a set of processes that communicate.
General recv for vector of.
unsigned int ui
unsigned integer
void clear()
Release the buffer used for communication.
static void recv(size_t proc, size_t tag, void *buf, size_t sz, MPI_Request &req)
General recv for general buffer.
General recv for vector of.
openfpm::vector< size_t > proc_com
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
Vcluster_base & operator=(const Vcluster_base &)
disable operator=
MPI_Request bar_req
barrier request
unsigned char uc
unsigned char
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
bool send(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Send data to a processor.
MPI_Status bar_stat
barrier status
openfpm::vector< int > map_scatter
vector that contain the scatter map (it is basically an array of one)
void min(T &num)
Get the minimum number across all processors (or reduction with insinity norm)
temporal buffer for reductions
General recv for vector of.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t getProcessingUnits()
Get the total number of processors.
bool recv(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Recv data from a processor.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
openfpm::vector< size_t > sz_send
vector of the size of send buffers
openfpm::vector< MPI_Request > req
vector of MPI requests
General send for a vector of any type.
openfpm::vector< BHeapMemory > recv_buf
Receive buffers.