OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
VCluster_base.hpp
1 #ifndef VCLUSTER_BASE_HPP_
2 #define VCLUSTER_BASE_HPP_
3 
4 
5 #include "util/cuda_util.hpp"
6 #ifdef OPENMPI
7 #include <mpi.h>
8 #include <mpi-ext.h>
9 #else
10 #include <mpi.h>
11 #endif
12 #include "MPI_wrapper/MPI_util.hpp"
13 #include "Vector/map_vector.hpp"
14 #include "MPI_wrapper/MPI_IallreduceW.hpp"
15 #include "MPI_wrapper/MPI_IrecvW.hpp"
16 #include "MPI_wrapper/MPI_IsendW.hpp"
17 #include "MPI_wrapper/MPI_IAllGather.hpp"
18 #include "MPI_wrapper/MPI_IBcastW.hpp"
19 #include <exception>
20 #include "Vector/map_vector.hpp"
21 #ifdef DEBUG
22 #include "util/check_no_pointers.hpp"
23 #include "util/util_debug.hpp"
24 #endif
25 #include "util/Vcluster_log.hpp"
26 #include "memory/BHeapMemory.hpp"
27 #include "Packer_Unpacker/has_max_prop.hpp"
28 #include "data_type/aggregate.hpp"
29 #include "util/cuda/ofp_context.hxx"
30 
31 #ifdef HAVE_PETSC
32 #include <petscvec.h>
33 #endif
34 
35 extern double time_spent;
36 
37 enum NBX_Type
38 {
39  NBX_UNACTIVE,
40  NBX_UNKNOWN,
41  NBX_KNOWN,
42  NBX_KNOWN_PRC
43 };
44 
45 constexpr int MSG_LENGTH = 1024;
46 constexpr int MSG_SEND_RECV = 1025;
47 constexpr int SEND_SPARSE = 8192;
48 constexpr int NONE = 1;
49 constexpr int NEED_ALL_SIZE = 2;
50 
51 constexpr int SERIVCE_MESSAGE_TAG = 16384;
52 constexpr int SEND_RECV_BASE = 4096;
53 constexpr int GATHER_BASE = 24576;
54 
55 constexpr int RECEIVE_KNOWN = 4;
56 constexpr int KNOWN_ELEMENT_OR_BYTE = 8;
57 constexpr int MPI_GPU_DIRECT = 16;
58 
59 constexpr int NQUEUE = 4;
60 
61 // number of vcluster instances
62 extern size_t n_vcluster;
63 // Global MPI initialization
64 extern bool global_mpi_init;
65 // initialization flag
66 extern bool ofp_initialized;
67 extern size_t tot_sent;
68 extern size_t tot_recv;
69 
71 
72 extern size_t NBX_cnt;
73 
74 template<typename T> void assign(T * ptr1, T * ptr2)
75 {
76  *ptr1 = *ptr2;
77 };
78 
79 
81 union red
82 {
84  char c;
86  unsigned char uc;
88  short s;
90  unsigned short us;
92  int i;
94  unsigned int ui;
96  float f;
98  double d;
99 };
100 
124 template<typename InternalMemory>
126 {
129 
133 
136 
139 
142 
144  std::vector<int> post_exe;
145 
147  mgpu::ofp_context_t * context;
148 
149  // Single objects
150 
152  int m_size;
154  int m_rank;
155 
157  int numPE = 1;
158 
160 
161  NBX_Type NBX_active[NQUEUE];
162 
164  size_t rid[NQUEUE];
165 
167  int NBX_prc_qcnt = -1;
168 
171 
173 
174  int NBX_prc_cnt_base = 0;
175  size_t NBX_prc_n_send[NQUEUE];
176  size_t * NBX_prc_prc[NQUEUE];
177  void ** NBX_prc_ptr[NQUEUE];
178  size_t * NBX_prc_sz[NQUEUE];
179  size_t NBX_prc_n_recv[NQUEUE];
180  void * (* NBX_prc_msg_alloc[NQUEUE])(size_t,size_t,size_t,size_t,size_t,size_t,void *);
181  size_t * NBX_prc_prc_recv[NQUEUE];
182  void * NBX_prc_ptr_arg[NQUEUE];
183 
185 
193  std::vector<red> r;
194 
197 
200 
202  MPI_Request bar_req;
203 
205  MPI_Status bar_stat;
206 
208  Vcluster_base & operator=(const Vcluster_base &) {return *this;};
209 
211  int shmrank;
212 
215 
218  {};
219 
220  void queue_all_sends(size_t n_send , size_t sz[],
221  size_t prc[], void * ptr[])
222  {
223  if (stat.size() != 0 || (req.size() != 0 && NBX_prc_qcnt == 0))
224  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";}
225 
226 
227  if (NBX_prc_qcnt == 0)
228  {
229  stat.clear();
230  req.clear();
231  // Do MPI_Issend
232  }
233 
234  for (size_t i = 0 ; i < n_send ; i++)
235  {
236  if (sz[i] != 0)
237  {
238  req.add();
239 
240 #ifdef SE_CLASS2
241  check_valid(ptr[i],sz[i]);
242 #endif
243 
244  tot_sent += sz[i];
245 
246 // std::cout << "TAG: " << SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i << " " << NBX_cnt << " " << NBX_prc_qcnt << " " << " rank: " << rank() << " " << NBX_prc_cnt_base << " nbx_cycle: " << nbx_cycle << std::endl;
247 
248  MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i, MPI_COMM_WORLD,&req.last()));
249  log.logSend(prc[i]);
250  }
251  }
252  }
253 
254 protected:
255 
258 
261 
262 public:
263 
264  // Finalize the MPI program
265  ~Vcluster_base()
266  {
267 #ifdef SE_CLASS2
268  check_delete(this);
269 #endif
270  n_vcluster--;
271 
272  // if there are no other vcluster instances finalize
273  if (n_vcluster == 0)
274  {
275  int already_finalised;
276 
277  MPI_Finalized(&already_finalised);
278  if (!already_finalised)
279  {
280  if (MPI_Finalize() != 0)
281  {
282  std::cerr << __FILE__ << ":" << __LINE__ << " MPI_Finalize FAILED \n";
283  }
284  }
285  }
286 
287  delete context;
288  }
289 
296  Vcluster_base(int *argc, char ***argv)
297  {
298  // reset NBX_Active
299 
300  for (unsigned int i = 0 ; i < NQUEUE ; i++)
301  {
302  NBX_active[i] = NBX_Type::NBX_UNACTIVE;
303  rid[i] = 0;
304  }
305 
306 #ifdef SE_CLASS2
307  check_new(this,8,VCLUSTER_EVENT,PRJ_VCLUSTER);
308 #endif
309 
310  n_vcluster++;
311 
312  int already_initialised;
313  MPI_Initialized(&already_initialised);
314 
315  // Check if MPI is already initialized
316  if (!already_initialised)
317  {
318  MPI_Init(argc,argv);
319  }
320 
321  // We try to get the local processors rank
322 
323  MPI_Comm shmcomm;
324  MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0,
325  MPI_INFO_NULL, &shmcomm);
326 
327  MPI_Comm_rank(shmcomm, &shmrank);
328  MPI_Comm_free(&shmcomm);
329 
330  // Get the total number of process
331  // and the rank of this process
332 
333  MPI_Comm_size(MPI_COMM_WORLD, &m_size);
334  MPI_Comm_rank(MPI_COMM_WORLD, &m_rank);
335 
336 #ifdef SE_CLASS2
337  process_v_cl = m_rank;
338 #endif
339 
340  // create and fill map scatter with one
341  map_scatter.resize(m_size);
342 
343  for (size_t i = 0 ; i < map_scatter.size() ; i++)
344  {
345  map_scatter.get(i) = 1;
346  }
347 
348  // open the log file
349  log.openLog(m_rank);
350 
351  // Initialize bar_req
352  bar_req = MPI_Request();
353  bar_stat = MPI_Status();
354 
355 #ifdef EXTERNAL_SET_GPU
356  int dev;
357  cudaGetDevice(&dev);
358  context = new mgpu::ofp_context_t(mgpu::gpu_context_opt::no_print_props,dev);
359 #else
360  context = new mgpu::ofp_context_t(mgpu::gpu_context_opt::no_print_props,shmrank);
361 #endif
362 
363 
364 #if defined(PRINT_RANK_TO_GPU) && defined(CUDA_GPU)
365 
366  char node_name[MPI_MAX_PROCESSOR_NAME];
367  int len;
368 
369  MPI_Get_processor_name(node_name,&len);
370 
371  std::cout << "Rank: " << m_rank << " on host: " << node_name << " work on GPU: " << context->getDevice() << "/" << context->getNDevice() << std::endl;
372 #endif
373 
374  int flag;
375  void *tag_ub_v;
376  int tag_ub;
377 
378  MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &tag_ub_v, &flag);
379  tag_ub = *(int*)tag_ub_v;
380 
381  if (flag == true)
382  {
383  nbx_cycle = (tag_ub - SEND_SPARSE - 131072 - NQUEUE*131072) / 131072;
384 
385  if (nbx_cycle < NQUEUE*2)
386  {std::cerr << __FILE__ << ":" << __LINE__ << " Error MPI_TAG_UB is too small for OpenFPM" << std::endl;}
387  }
388  else
389  {nbx_cycle = 2048;}
390  }
391 
392 #ifdef SE_CLASS1
393 
404  template<typename T> void checkType()
405  {
406  // if T is a primitive like int, long int, float, double, ... make sense
407  // (pointers, l-references and r-references are not fundamentals)
408  if (std::is_fundamental<T>::value == true)
409  {return;}
410 
411  // if it is a pointer make no sense
412  if (std::is_pointer<T>::value == true)
413  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
414 
415  // if it is an l-value reference make no send
416  if (std::is_lvalue_reference<T>::value == true)
417  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
418 
419  // if it is an r-value reference make no send
420  if (std::is_rvalue_reference<T>::value == true)
421  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
422 
423  // ... if not, check that T has a method called noPointers
424  switch (check_no_pointers<T>::value())
425  {
426  case PNP::UNKNOWN:
427  {
428  std::cerr << "Warning: " << __FILE__ << ":" << __LINE__ << " impossible to check the type " << demangle(typeid(T).name()) << " please consider to add a static method \"static bool noPointers()\" \n" ;
429  break;
430  }
431  case PNP::POINTERS:
432  {
433  std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " has pointers inside, sending pointers values has no sense\n";
434  break;
435  }
436  default:
437  {
438 
439  }
440  }
441  }
442 
443 #endif
444 
450  mgpu::ofp_context_t & getmgpuContext(bool iw = true)
451  {
452  if (context == NULL && iw == true)
453  {
454  std::cout << __FILE__ << ":" << __LINE__ << " Warning: it seem that modern gpu context is not initialized."
455  "Either a compatible working cuda device has not been found, either openfpm_init has been called in a file that not compiled with NVCC" << std::endl;
456  }
457 
458  return *context;
459  }
460 
466  MPI_Comm getMPIComm()
467  {
468  return MPI_COMM_WORLD;
469  }
470 
477  {
478  return m_size*numPE;
479  }
480 
490  size_t size()
491  {
492  return this->m_size*numPE;
493  }
494 
495  void print_stats()
496  {
497 #ifdef VCLUSTER_PERF_REPORT
498  std::cout << "-- REPORT COMMUNICATIONS -- " << std::endl;
499 
500  std::cout << "Processor " << this->rank() << " sent: " << tot_sent << std::endl;
501  std::cout << "Processor " << this->rank() << " received: " << tot_recv << std::endl;
502 
503  std::cout << "Processor " << this->rank() << " time spent: " << time_spent << std::endl;
504  std::cout << "Processor " << this->rank() << " Bandwidth: S:" << (double)tot_sent / time_spent * 1e-9 << "GB/s R:" << (double)tot_recv / time_spent * 1e-9 << "GB/s" << std::endl;
505 #else
506 
507  std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
508 
509 #endif
510  }
511 
512  void clear_stats()
513  {
514 #ifdef VCLUSTER_PERF_REPORT
515 
516  tot_sent = 0;
517  tot_recv = 0;
518 
519  time_spent = 0;
520 #else
521 
522  std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
523 
524 #endif
525  }
526 
533  {
534  return m_rank;
535  }
536 
546  size_t rank()
547  {
548  return m_rank;
549  }
550 
551 
558  template<typename T> void sum(T & num)
559  {
560 #ifdef SE_CLASS1
561  checkType<T>();
562 #endif
563 
564  // reduce over MPI
565 
566  // Create one request
567  req.add();
568 
569  // reduce
570  MPI_IallreduceW<T>::reduce(num,MPI_SUM,req.last());
571  }
572 
578  template<typename T> void max(T & num)
579  {
580 #ifdef SE_CLASS1
581  checkType<T>();
582 #endif
583  // reduce over MPI
584 
585  // Create one request
586  req.add();
587 
588  // reduce
589  MPI_IallreduceW<T>::reduce(num,MPI_MAX,req.last());
590  }
591 
598  template<typename T> void min(T & num)
599  {
600 #ifdef SE_CLASS1
601  checkType<T>();
602 #endif
603  // reduce over MPI
604 
605  // Create one request
606  req.add();
607 
608  // reduce
609  MPI_IallreduceW<T>::reduce(num,MPI_MIN,req.last());
610  }
611 
618  {
619  MPI_Status stat_t;
620  int stat = false;
621  MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&stat,&stat_t));
622 
623  // If I have an incoming message and is related to this NBX communication
624  if (stat == true)
625  {
626  unsigned int i = (stat_t.MPI_TAG - SEND_SPARSE) / 131072 - NBX_prc_cnt_base;
627 
628  if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
629  {return;}
630 
631  int msize;
632 
633  // Get the message tag and size
634  MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize));
635 
636  // Ok we check if the TAG come from one of our send TAG
637  if (stat_t.MPI_TAG >= (int)(SEND_SPARSE + NBX_prc_cnt_base*131072) && stat_t.MPI_TAG < (int)(SEND_SPARSE + (NBX_prc_cnt_base + NBX_prc_qcnt + 1)*131072))
638  {
639  // Get the pointer to receive the message
640  void * ptr = this->NBX_prc_msg_alloc[i](msize,0,0,stat_t.MPI_SOURCE,rid[i],stat_t.MPI_TAG,this->NBX_prc_ptr_arg[i]);
641 
642  // Log the receiving request
643  log.logRecv(stat_t);
644 
645  rid[i]++;
646 
647  // Check the pointer
648 #ifdef SE_CLASS2
649  check_valid(ptr,msize);
650 #endif
651  tot_recv += msize;
652  #ifdef VCLUSTER_GARBAGE_INJECTOR
653  #if defined (__NVCC__) && !defined(CUDA_ON_CPU)
654  cudaPointerAttributes cpa;
655  auto error = cudaPointerGetAttributes(&cpa,ptr);
656  if (error == cudaSuccess)
657  {
658  if(cpa.type == cudaMemoryTypeDevice)
659  {cudaMemset(ptr,0xFF,msize);}
660  else
661  {memset(ptr,0xFF,msize);}
662  }
663  #else
664  memset(ptr,0xFF,msize);
665  #endif
666  #endif
667  MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,MPI_COMM_WORLD,&stat_t));
668 
669 #ifdef SE_CLASS2
670  check_valid(ptr,msize);
671 #endif
672  }
673  }
674 
675  // Check the status of all the MPI_issend and call the barrier if finished
676 
677  for (unsigned int i = 0 ; i < NQUEUE ; i++)
678  {
679  if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
680  {continue;}
681 
682  if (NBX_prc_reached_bar_req[i] == false)
683  {
684  int flag = false;
685  if (req.size() != 0)
686  {MPI_SAFE_CALL(MPI_Testall(req.size(),&req.get(0),&flag,MPI_STATUSES_IGNORE));}
687  else
688  {flag = true;}
689 
690  // If all send has been completed
691  if (flag == true)
692  {MPI_SAFE_CALL(MPI_Ibarrier(MPI_COMM_WORLD,&bar_req));NBX_prc_reached_bar_req[i] = true;}
693  }
694  }
695  }
696 
741  openfpm::vector< T > & data,
742  openfpm::vector< size_t > & prc_recv,
743  openfpm::vector< size_t > & recv_sz ,
744  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
745  void * ptr_arg,
746  long int opt=NONE)
747  {
748  #ifdef VCLUSTER_PERF_REPORT
749  timer nbx_timer;
750  nbx_timer.start();
751 
752  #endif
753 
754  // Allocate the buffers
755 
756  for (size_t i = 0 ; i < prc.size() ; i++)
757  {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
758 
759  for (size_t i = 0 ; i < prc_recv.size() ; i++)
760  {
761  void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
762 
763  recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
764  }
765 
766  execute();
767 
768  // Circular counter
769  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
770 
771  #ifdef VCLUSTER_PERF_REPORT
772  nbx_timer.stop();
773  time_spent += nbx_timer.getwct();
774  #endif
775  }
776 
821  openfpm::vector< T > & data,
822  openfpm::vector< size_t > & prc_recv,
823  openfpm::vector< size_t > & recv_sz ,
824  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
825  void * ptr_arg,
826  long int opt=NONE)
827  {
828  NBX_prc_qcnt++;
829  if (NBX_prc_qcnt >= NQUEUE)
830  {
831  std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
832  return;
833  }
834 
835  // Allocate the buffers
836 
837  for (size_t i = 0 ; i < prc.size() ; i++)
838  {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
839 
840  for (size_t i = 0 ; i < prc_recv.size() ; i++)
841  {
842  void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
843 
844  recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
845  }
846 
847  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
848  if (NBX_prc_qcnt == 0)
849  {NBX_prc_cnt_base = NBX_cnt;}
850  }
851 
887  template<typename T>
889  openfpm::vector< T > & data,
890  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
891  void * ptr_arg, long int opt=NONE)
892  {
893 #ifdef SE_CLASS1
894  checkType<typename T::value_type>();
895 #endif
896 
897  // resize the pointer list
898  ptr_send[NBX_prc_qcnt+1].resize(prc.size());
899  sz_send[NBX_prc_qcnt+1].resize(prc.size());
900 
901  for (size_t i = 0 ; i < prc.size() ; i++)
902  {
903  ptr_send[NBX_prc_qcnt+1].get(i) = data.get(i).getPointer();
904  sz_send[NBX_prc_qcnt+1].get(i) = data.get(i).size() * sizeof(typename T::value_type);
905  }
906 
907  sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send[NBX_prc_qcnt+1].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt+1].getPointer(),msg_alloc,ptr_arg,opt);
908  }
909 
949  template<typename T>
951  openfpm::vector< T > & data,
952  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
953  void * ptr_arg, long int opt=NONE)
954  {
955 #ifdef SE_CLASS1
956  checkType<typename T::value_type>();
957 #endif
958  // resize the pointer list
959  ptr_send[NBX_prc_qcnt+1].resize(prc.size());
960  sz_send[NBX_prc_qcnt+1].resize(prc.size());
961 
962  for (size_t i = 0 ; i < prc.size() ; i++)
963  {
964  ptr_send[NBX_prc_qcnt+1].get(i) = data.get(i).getPointer();
965  sz_send[NBX_prc_qcnt+1].get(i) = data.get(i).size() * sizeof(typename T::value_type);
966  }
967 
968  sendrecvMultipleMessagesNBXAsync(prc.size(),(size_t *)sz_send[NBX_prc_qcnt+1].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt+1].getPointer(),msg_alloc,ptr_arg,opt);
969  }
970 
1014  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
1015  size_t prc[] , void * ptr[],
1016  size_t n_recv, size_t prc_recv[] ,
1017  size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1018  void * ptr_arg, long int opt=NONE)
1019  {
1020  #ifdef VCLUSTER_PERF_REPORT
1021  timer nbx_timer;
1022  nbx_timer.start();
1023 
1024  #endif
1025 
1026  // Allocate the buffers
1027 
1028  for (size_t i = 0 ; i < n_send ; i++)
1029  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1030 
1031  for (size_t i = 0 ; i < n_recv ; i++)
1032  {
1033  void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1034 
1035  recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1036  }
1037 
1038  execute();
1039 
1040  // Circular counter
1041  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1042 
1043  #ifdef VCLUSTER_PERF_REPORT
1044  nbx_timer.stop();
1045  time_spent += nbx_timer.getwct();
1046  #endif
1047  }
1048 
1092  void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1093  size_t prc[] , void * ptr[],
1094  size_t n_recv, size_t prc_recv[] ,
1095  size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1096  void * ptr_arg, long int opt=NONE)
1097  {
1098  NBX_prc_qcnt++;
1099  if (NBX_prc_qcnt >= NQUEUE)
1100  {
1101  std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1102  return;
1103  }
1104 
1105  // Allocate the buffers
1106 
1107  for (size_t i = 0 ; i < n_send ; i++)
1108  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1109 
1110  for (size_t i = 0 ; i < n_recv ; i++)
1111  {
1112  void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1113 
1114  recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1115  }
1116 
1117  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
1118  if (NBX_prc_qcnt == 0)
1119  {NBX_prc_cnt_base = NBX_cnt;}
1120  }
1121 
1122  openfpm::vector<size_t> sz_recv_tmp;
1123 
1166  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
1167  void * ptr[], size_t n_recv, size_t prc_recv[] ,
1168  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1169  void * ptr_arg, long int opt=NONE)
1170  {
1171  #ifdef VCLUSTER_PERF_REPORT
1172  timer nbx_timer;
1173  nbx_timer.start();
1174  #endif
1175 
1176  sz_recv_tmp.resize(n_recv);
1177 
1178  // First we understand the receive size for each processor
1179 
1180  for (size_t i = 0 ; i < n_send ; i++)
1181  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1182 
1183  for (size_t i = 0 ; i < n_recv ; i++)
1184  {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1185 
1186  execute();
1187 
1188  // Circular counter
1189  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1190 
1191  // Allocate the buffers
1192 
1193  for (size_t i = 0 ; i < n_send ; i++)
1194  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1195 
1196  for (size_t i = 0 ; i < n_recv ; i++)
1197  {
1198  void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,0,ptr_arg);
1199 
1200  recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1201  }
1202 
1203  execute();
1204 
1205  // Circular counter
1206  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1207 
1208  #ifdef VCLUSTER_PERF_REPORT
1209  nbx_timer.stop();
1210  time_spent += nbx_timer.getwct();
1211  #endif
1212  }
1213 
1256  void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[], size_t prc[] ,
1257  void * ptr[], size_t n_recv, size_t prc_recv[] ,
1258  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1259  void * ptr_arg, long int opt=NONE)
1260  {
1261  NBX_prc_qcnt++;
1262  if (NBX_prc_qcnt >= NQUEUE)
1263  {
1264  std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1265  return;
1266  }
1267 
1268  sz_recv_tmp.resize(n_recv);
1269 
1270  // First we understand the receive size for each processor
1271 
1272  for (size_t i = 0 ; i < n_send ; i++)
1273  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1274 
1275  for (size_t i = 0 ; i < n_recv ; i++)
1276  {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1277 
1279 
1280  NBX_prc_n_send[NBX_prc_qcnt] = n_send;
1281  NBX_prc_prc[NBX_prc_qcnt] = prc;
1282  NBX_prc_ptr[NBX_prc_qcnt] = ptr;
1283  NBX_prc_sz[NBX_prc_qcnt] = sz;
1284  NBX_prc_n_recv[NBX_prc_qcnt] = n_recv;
1285  NBX_prc_prc_recv[NBX_prc_qcnt] = prc_recv;
1286  NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1287  NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1288 
1289  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN_PRC;
1290  if (NBX_prc_qcnt == 0)
1291  {NBX_prc_cnt_base = NBX_cnt;}
1292  }
1293 
1334  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
1335  size_t prc[] , void * ptr[],
1336  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1337  void * ptr_arg, long int opt = NONE)
1338  {
1339  #ifdef VCLUSTER_PERF_REPORT
1340  timer nbx_timer;
1341  nbx_timer.start();
1342 
1343  #endif
1344 
1345  NBX_prc_qcnt++;
1346  if (NBX_prc_qcnt != 0)
1347  {
1348  std::cout << __FILE__ << ":" << __LINE__ << " error there are some asynchronous call running you have to complete them before go back to synchronous" << std::endl;
1349  return;
1350  }
1351 
1352  queue_all_sends(n_send,sz,prc,ptr);
1353 
1354  this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1355  this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1356 
1357  rid[NBX_prc_qcnt] = 0;
1358  int flag = false;
1359 
1361  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1362  NBX_prc_cnt_base = NBX_cnt;
1363 
1364  log.start(10);
1365 
1366  // Wait that all the send are acknowledge
1367  do
1368  {
1370 
1371  // Check if all processor reached the async barrier
1373  {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1374 
1375  // produce a report if communication get stuck
1376  log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[NBX_prc_qcnt],bar_stat);
1377 
1378  } while (flag == false);
1379 
1380  // Remove the executed request
1381 
1382  req.clear();
1383  stat.clear();
1384  log.clear();
1385 
1386  // Circular counter
1387  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1388  NBX_prc_qcnt = -1;
1389 
1390  #ifdef VCLUSTER_PERF_REPORT
1391  nbx_timer.stop();
1392  time_spent += nbx_timer.getwct();
1393  #endif
1394  }
1395 
1440  void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1441  size_t prc[] , void * ptr[],
1442  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1443  void * ptr_arg, long int opt = NONE)
1444  {
1445  NBX_prc_qcnt++;
1446  queue_all_sends(n_send,sz,prc,ptr);
1447 
1448  this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1449  this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1450 
1451  rid[NBX_prc_qcnt] = 0;
1452 
1454  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1455 
1456  log.start(10);
1457  if (NBX_prc_qcnt == 0)
1458  {NBX_prc_cnt_base = NBX_cnt;}
1459 
1460  return;
1461  }
1462 
1468  {
1469  for (unsigned int j = 0 ; j < NQUEUE ; j++)
1470  {
1471  if (NBX_active[j] == NBX_Type::NBX_UNACTIVE)
1472  {continue;}
1473 
1474  if (NBX_active[j] == NBX_Type::NBX_KNOWN_PRC)
1475  {
1476  execute();
1477 
1478  // Circular counter
1479  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1480 
1481  // Allocate the buffers
1482 
1483  for (size_t i = 0 ; i < NBX_prc_n_send[j] ; i++)
1484  {send(NBX_prc_prc[j][i],SEND_SPARSE + NBX_cnt*131072,NBX_prc_ptr[j][i],NBX_prc_sz[j][i]);}
1485 
1486  for (size_t i = 0 ; i < NBX_prc_n_recv[j] ; i++)
1487  {
1488  void * ptr_recv = NBX_prc_msg_alloc[j](sz_recv_tmp.get(i),0,0,NBX_prc_prc_recv[j][i],i,0,this->NBX_prc_ptr_arg[j]);
1489 
1490  recv(NBX_prc_prc_recv[j][i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1491  }
1492 
1493  NBX_active[j] = NBX_Type::NBX_KNOWN;
1494  }
1495 
1496  if (NBX_active[j] == NBX_Type::NBX_KNOWN)
1497  {
1498  execute();
1499 
1500  // Circular counter
1501  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1502  NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1503 
1504  continue;
1505  }
1506 
1507  int flag = false;
1508 
1509  // Wait that all the send are acknowledge
1510  do
1511  {
1513 
1514  // Check if all processor reached the async barrier
1515  if (NBX_prc_reached_bar_req[j])
1516  {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1517 
1518  // produce a report if communication get stuck
1519  log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[j],bar_stat);
1520 
1521  } while (flag == false);
1522 
1523  // Remove the executed request
1524 
1525  req.clear();
1526  stat.clear();
1527  log.clear();
1528 
1529  // Circular counter
1530  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1531  NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1532 
1533  }
1534 
1535  NBX_prc_qcnt = -1;
1536  return;
1537  }
1538 
1557  bool send(size_t proc, size_t tag, const void * mem, size_t sz)
1558  {
1559  // send over MPI
1560 
1561  // Create one request
1562  req.add();
1563 
1564  // send
1565  MPI_IsendWB::send(proc,SEND_RECV_BASE + tag,mem,sz,req.last());
1566 
1567  return true;
1568  }
1569 
1570 
1588  template<typename T, typename Mem, template<typename> class gr> bool send(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1589  {
1590 #ifdef SE_CLASS1
1591  checkType<T>();
1592 #endif
1593 
1594  // send over MPI
1595 
1596  // Create one request
1597  req.add();
1598 
1599  // send
1600  MPI_IsendW<T,Mem,gr>::send(proc,SEND_RECV_BASE + tag,v,req.last());
1601 
1602  return true;
1603  }
1604 
1623  bool recv(size_t proc, size_t tag, void * v, size_t sz)
1624  {
1625  // recv over MPI
1626 
1627  // Create one request
1628  req.add();
1629 
1630  // receive
1631  MPI_IrecvWB::recv(proc,SEND_RECV_BASE + tag,v,sz,req.last());
1632 
1633  return true;
1634  }
1635 
1653  template<typename T, typename Mem, template<typename> class gr> bool recv(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1654  {
1655 #ifdef SE_CLASS1
1656  checkType<T>();
1657 #endif
1658 
1659  // recv over MPI
1660 
1661  // Create one request
1662  req.add();
1663 
1664  // receive
1665  MPI_IrecvW<T>::recv(proc,SEND_RECV_BASE + tag,v,req.last());
1666 
1667  return true;
1668  }
1669 
1682  template<typename T, typename Mem, template<typename> class gr> bool allGather(T & send, openfpm::vector<T,Mem,gr> & v)
1683  {
1684 #ifdef SE_CLASS1
1685  checkType<T>();
1686 #endif
1687 
1688  // Create one request
1689  req.add();
1690 
1691  // Number of processors
1692  v.resize(getProcessingUnits());
1693 
1694  // gather
1695  MPI_IAllGatherW<T>::gather(&send,1,v.getPointer(),1,req.last());
1696 
1697  return true;
1698  }
1699 
1716  template<typename T, typename Mem, template<typename> class layout_base >
1718  {
1719 #ifdef SE_CLASS1
1720  checkType<T>();
1721 #endif
1722 
1723  b_cast_helper<openfpm::vect_isel<T>::value == STD_VECTOR || is_layout_mlin<layout_base<T>>::value >::bcast_(req,v,root);
1724 
1725  return true;
1726  }
1727 
1731  void execute()
1732  {
1733  // if req == 0 return
1734  if (req.size() == 0)
1735  return;
1736 
1737  // Wait for all the requests
1738  stat.resize(req.size());
1739 
1740  MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));
1741 
1742  // Remove executed request and status
1743  req.clear();
1744  stat.clear();
1745  }
1746 
1751  void clear()
1752  {
1753  for (size_t i = 0 ; i < NQUEUE ; i++)
1754  {recv_buf[i].clear();}
1755  }
1756 };
1757 
1758 
1759 
1760 
1761 #endif
1762 
MPI_Request bar_req
barrier request
void progressCommunication()
In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function progress th...
float f
float
Vcluster_base(const Vcluster_base &)
disable copy constructor
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
char c
char
void clear()
Eliminate all elements.
Definition: map_vector.hpp:972
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
Vcluster_base(int *argc, char ***argv)
Virtual cluster constructor.
size_t getProcessUnitID()
Get the process unit id.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
openfpm::vector_fr< BMemory< InternalMemory > > recv_buf[NQUEUE]
Receive buffers.
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
openfpm::vector< size_t > sz_send[NQUEUE]
vector of the size of send buffers
openfpm::vector< size_t > proc_com
MPI_Status bar_stat
barrier status
Set of wrapping classing for MPI_Iallreduce.
std::vector< red > r
int numPE
number of processing unit per process
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], size_t sz_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
int shmrank
rank within the node
size_t size()
Stub size.
Definition: map_vector.hpp:211
mgpu::ofp_context_t * context
standard context for mgpu (if cuda is detected otherwise is unused)
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
std::vector< int > post_exe
vector of functions to execute after all the request has been performed
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages Asynchronous version.
openfpm::vector< size_t > tags[NQUEUE]
tags receiving
double getwct()
Return the elapsed real time.
Definition: timer.hpp:130
void clear()
Release the buffer used for communication.
openfpm::vector< MPI_Status > stat
vector of MPI status
This class check if the type T has pointers inside.
void execute()
Execute all the requests.
Implementation of 1-D std::vector like structure.
Definition: map_vector.hpp:920
unsigned short us
unsigned short
This class virtualize the cluster of PC as a set of processes that communicate.
unsigned int ui
unsigned integer
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
static void recv(size_t proc, size_t tag, void *buf, size_t sz, MPI_Request &req)
General recv for general buffer.
Definition: MPI_IrecvW.hpp:25
bool NBX_prc_reached_bar_req[NQUEUE]
Is the barrier request reached.
bool send(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Send data to a processor.
General recv for vector of.
Definition: MPI_IrecvW.hpp:37
size_t rank()
Get the process unit id.
int i
integer
openfpm::vector< void * > ptr_send[NQUEUE]
vector of pointers of send buffers
Vcluster_base & operator=(const Vcluster_base &)
disable operator=
int nbx_cycle
NBX_cycle.
double d
double
void start()
Start the timer.
Definition: timer.hpp:90
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
size_t getProcessingUnits()
Get the total number of processors.
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
int NBX_prc_qcnt
NBX comunication on queue (-1 mean 0, 0 mean 1, 1 mean 2, .... )
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
Vcluster_log log
log file
unsigned char uc
unsigned char
void sum(T &num)
Sum the numbers across all processors and get the result.
mgpu::ofp_context_t & getmgpuContext(bool iw=true)
If nvidia cuda is activated return a mgpu context.
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
int m_rank
actual rank
bool recv(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Recv data from a processor.
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], size_t sz_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
openfpm::vector< int > map_scatter
vector that contain the scatter map (it is basically an array of one)
size_t size()
Get the total number of processors.
bool Bcast(openfpm::vector< T, Mem, layout_base > &v, size_t root)
Broadcast the data to all processors.
openfpm::vector< MPI_Request > req
vector of MPI requests
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
temporal buffer for reductions
Definition: ids.hpp:18
General recv for vector of.
int m_size
number of processes
Vcluster log.
Class for cpu time benchmarking.
Definition: timer.hpp:27
void stop()
Stop the timer.
Definition: timer.hpp:119
General send for a vector of any type.
Definition: MPI_IsendW.hpp:31
short s
signed
void min(T &num)
Get the minimum number across all processors (or reduction with insinity norm)