OpenFPM_pdata  1.1.0
Project that contain the implementation of distributed structures
 All Data Structures Namespaces Functions Variables Typedefs Enumerations Friends Pages
VCluster_base.hpp
1 #ifndef VCLUSTER_BASE_HPP_
2 #define VCLUSTER_BASE_HPP_
3 
4 #include "config.h"
5 #include <mpi.h>
6 #include "MPI_wrapper/MPI_util.hpp"
7 #include "Vector/map_vector.hpp"
8 #include "MPI_wrapper/MPI_IallreduceW.hpp"
9 #include "MPI_wrapper/MPI_IrecvW.hpp"
10 #include "MPI_wrapper/MPI_IsendW.hpp"
11 #include "MPI_wrapper/MPI_IAllGather.hpp"
12 #include "MPI_wrapper/MPI_IBcastW.hpp"
13 #include <exception>
14 #include "Vector/map_vector.hpp"
15 #ifdef DEBUG
16 #include "util/check_no_pointers.hpp"
17 #include "util/util_debug.hpp"
18 #endif
19 #include "util/Vcluster_log.hpp"
20 #include "memory/BHeapMemory.hpp"
21 #include "Packer_Unpacker/has_max_prop.hpp"
22 #include "data_type/aggregate.hpp"
23 
24 #ifdef HAVE_PETSC
25 #include <petscvec.h>
26 #endif
27 
28 #define MSG_LENGTH 1024
29 #define MSG_SEND_RECV 1025
30 #define SEND_SPARSE 4096
31 #define NONE 1
32 #define NEED_ALL_SIZE 2
33 
34 #define SERIVCE_MESSAGE_TAG 16384
35 #define SEND_RECV_BASE 8192
36 #define GATHER_BASE 24576
37 
38 #define RECEIVE_KNOWN 4
39 #define KNOWN_ELEMENT_OR_BYTE 8
40 
41 // number of vcluster instances
42 extern size_t n_vcluster;
43 // Global MPI initialization
44 extern bool global_mpi_init;
45 // initialization flag
46 extern bool ofp_initialized;
47 extern size_t tot_sent;
48 extern size_t tot_recv;
49 
51 
52 template<typename T> void assign(T * ptr1, T * ptr2)
53 {
54  *ptr1 = *ptr2;
55 };
56 
57 
59 union red
60 {
62  char c;
64  unsigned char uc;
66  short s;
68  unsigned short us;
70  int i;
72  unsigned int ui;
74  float f;
76  double d;
77 };
78 
104 {
107 
122  size_t NBX_cnt;
123 
127 
130 
133 
136 
138  std::vector<int> post_exe;
139 
140  // Object array
141 
142 
143  // Single objects
144 
146  int m_size;
148  int m_rank;
149 
151  int numPE = 1;
152 
160  std::vector<red> r;
161 
164 
167 
169  MPI_Request bar_req;
170 
172  MPI_Status bar_stat;
173 
175  Vcluster_base & operator=(const Vcluster_base &) {return *this;};
176 
179  :NBX_cnt(0)
180  {};
181 
182 protected:
183 
186 
187 public:
188 
189  // Finalize the MPI program
190  ~Vcluster_base()
191  {
192 #ifdef SE_CLASS2
193  check_delete(this);
194 #endif
195  n_vcluster--;
196 
197  // if there are no other vcluster instances finalize
198  if (n_vcluster == 0)
199  {
200  int already_finalised;
201 
202  MPI_Finalized(&already_finalised);
203  if (!already_finalised)
204  {
205  if (MPI_Finalize() != 0)
206  {
207  std::cerr << __FILE__ << ":" << __LINE__ << " MPI_Finalize FAILED \n";
208  }
209  }
210  }
211  }
212 
219  Vcluster_base(int *argc, char ***argv)
220  :NBX_cnt(0)
221  {
222 #ifdef SE_CLASS2
223  check_new(this,8,VCLUSTER_EVENT,PRJ_VCLUSTER);
224 #endif
225 
226  n_vcluster++;
227 
228  int already_initialised;
229  MPI_Initialized(&already_initialised);
230 
231  // Check if MPI is already initialized
232  if (!already_initialised)
233  {
234 
235  MPI_Init(argc,argv);
236  }
237 
238  // Get the total number of process
239  // and the rank of this process
240 
241  MPI_Comm_size(MPI_COMM_WORLD, &m_size);
242  MPI_Comm_rank(MPI_COMM_WORLD, &m_rank);
243 
244 #ifdef SE_CLASS2
245  process_v_cl = m_rank;
246 #endif
247 
248  // create and fill map scatter with one
249  map_scatter.resize(m_size);
250 
251  for (size_t i = 0 ; i < map_scatter.size() ; i++)
252  {
253  map_scatter.get(i) = 1;
254  }
255 
256  // open the log file
257  log.openLog(m_rank);
258 
259  // Initialize bar_req
260  bar_req = MPI_Request();
261  bar_stat = MPI_Status();
262  }
263 
264 #ifdef SE_CLASS1
265 
276  template<typename T> void checkType()
277  {
278  // if T is a primitive like int, long int, float, double, ... make sense
279  // (pointers, l-references and r-references are not fundamentals)
280  if (std::is_fundamental<T>::value == true)
281  return;
282 
283  // if it is a pointer make no sense
284  if (std::is_pointer<T>::value == true)
285  std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";
286 
287  // if it is an l-value reference make no send
288  if (std::is_lvalue_reference<T>::value == true)
289  std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";
290 
291  // if it is an r-value reference make no send
292  if (std::is_rvalue_reference<T>::value == true)
293  std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";
294 
295  // ... if not, check that T has a method called noPointers
296  switch (check_no_pointers<T>::value())
297  {
298  case PNP::UNKNOWN:
299  {
300  std::cerr << "Warning: " << __FILE__ << ":" << __LINE__ << " impossible to check the type " << demangle(typeid(T).name()) << " please consider to add a static method \"static bool noPointers()\" \n" ;
301  break;
302  }
303  case PNP::POINTERS:
304  {
305  std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " has pointers inside, sending pointers values has no sense\n";
306  break;
307  }
308  default:
309  {
310 
311  }
312  }
313  }
314 
315 #endif
316 
322  MPI_Comm getMPIComm()
323  {
324  return MPI_COMM_WORLD;
325  }
326 
333  {
334  return m_size*numPE;
335  }
336 
346  size_t size()
347  {
348  return this->m_size*numPE;
349  }
350 
357  {
358  return m_rank;
359  }
360 
370  size_t rank()
371  {
372  return m_rank;
373  }
374 
375 
382  template<typename T> void sum(T & num)
383  {
384 #ifdef SE_CLASS1
385  checkType<T>();
386 #endif
387 
388  // reduce over MPI
389 
390  // Create one request
391  req.add();
392 
393  // reduce
394  MPI_IallreduceW<T>::reduce(num,MPI_SUM,req.last());
395  }
396 
402  template<typename T> void max(T & num)
403  {
404 #ifdef SE_CLASS1
405  checkType<T>();
406 #endif
407  // reduce over MPI
408 
409  // Create one request
410  req.add();
411 
412  // reduce
413  MPI_IallreduceW<T>::reduce(num,MPI_MAX,req.last());
414  }
415 
422  template<typename T> void min(T & num)
423  {
424 #ifdef SE_CLASS1
425  checkType<T>();
426 #endif
427  // reduce over MPI
428 
429  // Create one request
430  req.add();
431 
432  // reduce
433  MPI_IallreduceW<T>::reduce(num,MPI_MIN,req.last());
434  }
435 
480  openfpm::vector< T > & data,
481  openfpm::vector< size_t > prc_recv,
482  openfpm::vector< size_t > & recv_sz ,
483  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
484  void * ptr_arg,
485  long int opt=NONE)
486  {
487  // Allocate the buffers
488 
489  for (size_t i = 0 ; i < prc.size() ; i++)
490  send(prc.get(i),SEND_SPARSE + NBX_cnt,data.get(i).getPointer(),data.get(i).size());
491 
492  for (size_t i = 0 ; i < prc_recv.size() ; i++)
493  {
494  void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,ptr_arg);
495 
496  recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt,ptr_recv,recv_sz.get(i));
497  }
498 
499  execute();
500 
501  // Circular counter
502  NBX_cnt = (NBX_cnt + 1) % 1024;
503  }
504 
505 
541  template<typename T>
543  openfpm::vector< T > & data,
544  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
545  void * ptr_arg, long int opt=NONE)
546  {
547 #ifdef SE_CLASS1
548  checkType<typename T::value_type>();
549 #endif
550  // resize the pointer list
551  ptr_send.resize(prc.size());
552  sz_send.resize(prc.size());
553 
554  for (size_t i = 0 ; i < prc.size() ; i++)
555  {
556  ptr_send.get(i) = data.get(i).getPointer();
557  sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type);
558  }
559 
560  sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
561  }
562 
605  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
606  void * ptr[], size_t n_recv, size_t prc_recv[] ,
607  size_t sz_recv[] ,
608  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
609  void * ptr_arg, long int opt=NONE)
610  {
611  // Allocate the buffers
612 
613  for (size_t i = 0 ; i < n_send ; i++)
614  send(prc[i],SEND_SPARSE + NBX_cnt,ptr[i],sz[i]);
615 
616  for (size_t i = 0 ; i < n_recv ; i++)
617  {
618  void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,ptr_arg);
619 
620  recv(prc_recv[i],SEND_SPARSE + NBX_cnt,ptr_recv,sz_recv[i]);
621  }
622 
623  execute();
624 
625  // Circular counter
626  NBX_cnt = (NBX_cnt + 1) % 1024;
627  }
628 
629  openfpm::vector<size_t> sz_recv_tmp;
630 
673  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
674  void * ptr[], size_t n_recv, size_t prc_recv[] ,
675  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
676  void * ptr_arg, long int opt=NONE)
677  {
678  sz_recv_tmp.resize(n_recv);
679 
680  // First we understand the receive size for each processor
681 
682  for (size_t i = 0 ; i < n_send ; i++)
683  {send(prc[i],SEND_SPARSE + NBX_cnt,&sz[i],sizeof(size_t));}
684 
685  for (size_t i = 0 ; i < n_recv ; i++)
686  {recv(prc_recv[i],SEND_SPARSE + NBX_cnt,&sz_recv_tmp.get(i),sizeof(size_t));}
687 
688  execute();
689 
690  // Circular counter
691  NBX_cnt = (NBX_cnt + 1) % 1024;
692 
693  // Allocate the buffers
694 
695  for (size_t i = 0 ; i < n_send ; i++)
696  {send(prc[i],SEND_SPARSE + NBX_cnt,ptr[i],sz[i]);}
697 
698  for (size_t i = 0 ; i < n_recv ; i++)
699  {
700  void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,ptr_arg);
701 
702  recv(prc_recv[i],SEND_SPARSE + NBX_cnt,ptr_recv,sz_recv_tmp.get(i));
703  }
704 
705  execute();
706 
707  // Circular counter
708  NBX_cnt = (NBX_cnt + 1) % 1024;
709  }
710 
751  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
752  void * ptr[],
753  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
754  void * ptr_arg, long int opt = NONE)
755  {
756  if (stat.size() != 0 || req.size() != 0)
757  std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";
758 
759 
760  stat.clear();
761  req.clear();
762  // Do MPI_Issend
763 
764  for (size_t i = 0 ; i < n_send ; i++)
765  {
766  if (sz[i] != 0)
767  {
768  req.add();
769 
770 #ifdef SE_CLASS2
771  check_valid(ptr[i],sz[i]);
772 #endif
773 
774  tot_sent += sz[i];
775  MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + NBX_cnt, MPI_COMM_WORLD,&req.last()));
776  log.logSend(prc[i]);
777  }
778  }
779 
780  size_t rid = 0;
781  int flag = false;
782 
783  bool reached_bar_req = false;
784 
785  log.start(10);
786 
787  // Wait that all the send are acknowledge
788  do
789  {
790 
791  // flag that notify that this processor reach the barrier
792  // Barrier request
793 
794  MPI_Status stat_t;
795  int stat = false;
796  MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,SEND_SPARSE + NBX_cnt,MPI_COMM_WORLD,&stat,&stat_t));
797 
798  // If I have an incoming message and is related to this NBX communication
799  if (stat == true)
800  {
801  // Get the message size
802  int msize;
803  MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize));
804 
805  // Get the pointer to receive the message
806  void * ptr = msg_alloc(msize,0,0,stat_t.MPI_SOURCE,rid,ptr_arg);
807 
808  // Log the receiving request
809  log.logRecv(stat_t);
810 
811  rid++;
812 
813  // Check the pointer
814 #ifdef SE_CLASS2
815  check_valid(ptr,msize);
816 #endif
817  tot_recv += msize;
818  MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,SEND_SPARSE+NBX_cnt,MPI_COMM_WORLD,&stat_t));
819 
820 #ifdef SE_CLASS2
821  check_valid(ptr,msize);
822 #endif
823  }
824 
825  // Check the status of all the MPI_issend and call the barrier if finished
826 
827  if (reached_bar_req == false)
828  {
829  int flag = false;
830  if (req.size() != 0)
831  {MPI_SAFE_CALL(MPI_Testall(req.size(),&req.get(0),&flag,MPI_STATUSES_IGNORE));}
832  else
833  flag = true;
834 
835  // If all send has been completed
836  if (flag == true)
837  {MPI_SAFE_CALL(MPI_Ibarrier(MPI_COMM_WORLD,&bar_req));reached_bar_req = true;}
838  }
839 
840  // Check if all processor reached the async barrier
841  if (reached_bar_req)
842  {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
843 
844  // produce a report if communication get stuck
845  log.NBXreport(NBX_cnt,req,reached_bar_req,bar_stat);
846 
847  } while (flag == false);
848 
849  // Remove the executed request
850 
851  req.clear();
852  stat.clear();
853  log.clear();
854 
855  // Circular counter
856  NBX_cnt = (NBX_cnt + 1) % 1024;
857  }
858 
877  bool send(size_t proc, size_t tag, const void * mem, size_t sz)
878  {
879  // send over MPI
880 
881  // Create one request
882  req.add();
883 
884  // send
885  MPI_IsendWB::send(proc,SEND_RECV_BASE + tag,mem,sz,req.last());
886 
887  return true;
888  }
889 
890 
908  template<typename T, typename Mem, typename gr> bool send(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
909  {
910 #ifdef SE_CLASS1
911  checkType<T>();
912 #endif
913 
914  // send over MPI
915 
916  // Create one request
917  req.add();
918 
919  // send
920  MPI_IsendW<T,Mem,gr>::send(proc,SEND_RECV_BASE + tag,v,req.last());
921 
922  return true;
923  }
924 
943  bool recv(size_t proc, size_t tag, void * v, size_t sz)
944  {
945  // recv over MPI
946 
947  // Create one request
948  req.add();
949 
950  // receive
951  MPI_IrecvWB::recv(proc,SEND_RECV_BASE + tag,v,sz,req.last());
952 
953  return true;
954  }
955 
973  template<typename T, typename Mem, typename gr> bool recv(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
974  {
975 #ifdef SE_CLASS1
976  checkType<T>();
977 #endif
978 
979  // recv over MPI
980 
981  // Create one request
982  req.add();
983 
984  // receive
985  MPI_IrecvW<T>::recv(proc,SEND_RECV_BASE + tag,v,req.last());
986 
987  return true;
988  }
989 
1002  template<typename T, typename Mem, typename gr> bool allGather(T & send, openfpm::vector<T,Mem,gr> & v)
1003  {
1004 #ifdef SE_CLASS1
1005  checkType<T>();
1006 #endif
1007 
1008  // Create one request
1009  req.add();
1010 
1011  // Number of processors
1012  v.resize(getProcessingUnits());
1013 
1014  // gather
1015  MPI_IAllGatherW<T>::gather(&send,1,v.getPointer(),1,req.last());
1016 
1017  return true;
1018  }
1019 
1036  template<typename T, typename Mem, typename gr> bool Bcast(openfpm::vector<T,Mem,gr> & v, size_t root)
1037  {
1038 #ifdef SE_CLASS1
1039  checkType<T>();
1040 #endif
1041 
1042  // Create one request
1043  req.add();
1044 
1045  // gather
1046  MPI_IBcastW<T>::bcast(root,v,req.last());
1047 
1048  return true;
1049  }
1050 
1054  void execute()
1055  {
1056  // if req == 0 return
1057  if (req.size() == 0)
1058  return;
1059 
1060  // Wait for all the requests
1061  stat.resize(req.size());
1062  MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));
1063 
1064  // Remove executed request and status
1065  req.clear();
1066  stat.clear();
1067  }
1068 
1073  void clear()
1074  {
1075  recv_buf.clear();
1076  }
1077 };
1078 
1079 
1080 
1081 
1082 #endif
1083 
void sum(T &num)
Sum the numbers across all processors and get the result.
float f
float
openfpm::vector< void * > ptr_send
vector of pointers of send buffers
char c
char
size_t getProcessUnitID()
Get the process unit id.
void execute()
Execute all the requests.
int m_size
number of processes
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
bool Bcast(openfpm::vector< T, Mem, gr > &v, size_t root)
Broadcast the data to all processors.
size_t rank()
Get the process unit id.
Vcluster_base(const Vcluster_base &)
disable copy constructor
Set of wrapping classing for MPI_Iallreduce.
size_t size()
Stub size.
Definition: map_vector.hpp:70
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
std::vector< int > post_exe
vector of functions to execute after all the request has been performed
Vcluster_base(int *argc, char ***argv)
Virtual cluster constructor.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
This class check if the type T has pointers inside.
openfpm::vector< MPI_Status > stat
vector of MPI status
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
unsigned short us
unsigned short
int numPE
number of processing unit per process
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], size_t sz_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t size()
Get the total number of processors.
This class virtualize the cluster of PC as a set of processes that communicate.
General recv for vector of.
Definition: MPI_IBcastW.hpp:46
std::vector< red > r
unsigned int ui
unsigned integer
void clear()
Release the buffer used for communication.
static void recv(size_t proc, size_t tag, void *buf, size_t sz, MPI_Request &req)
General recv for general buffer.
Definition: MPI_IrecvW.hpp:25
General recv for vector of.
Definition: MPI_IrecvW.hpp:37
openfpm::vector< size_t > proc_com
int i
integer
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
double d
double
int m_rank
actual rank
Vcluster_base & operator=(const Vcluster_base &)
disable operator=
MPI_Request bar_req
barrier request
unsigned char uc
unsigned char
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
bool send(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Send data to a processor.
MPI_Status bar_stat
barrier status
openfpm::vector< int > map_scatter
vector that contain the scatter map (it is basically an array of one)
void min(T &num)
Get the minimum number across all processors (or reduction with insinity norm)
Vcluster_log log
log file
temporal buffer for reductions
Definition: ids.hpp:18
General recv for vector of.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
Vcluster log.
size_t getProcessingUnits()
Get the total number of processors.
bool recv(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Recv data from a processor.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
openfpm::vector< size_t > sz_send
vector of the size of send buffers
openfpm::vector< MPI_Request > req
vector of MPI requests
General send for a vector of any type.
Definition: MPI_IsendW.hpp:31
short s
signed
openfpm::vector< BHeapMemory > recv_buf
Receive buffers.