OpenFPM  5.2.0
Project that contain the implementation of distributed structures
VCluster_base.hpp
1 #ifndef VCLUSTER_BASE_HPP_
2 #define VCLUSTER_BASE_HPP_
3 
4 
5 #include "util/cuda_util.hpp"
6 #ifdef OPENMPI
7 #include <mpi.h>
8 #include <mpi-ext.h>
9 #else
10 #include <mpi.h>
11 #endif
12 #include "MPI_wrapper/MPI_util.hpp"
13 #include "Vector/map_vector.hpp"
14 #include "MPI_wrapper/MPI_IallreduceW.hpp"
15 #include "MPI_wrapper/MPI_IrecvW.hpp"
16 #include "MPI_wrapper/MPI_IsendW.hpp"
17 #include "MPI_wrapper/MPI_IAllGather.hpp"
18 #include "MPI_wrapper/MPI_IBcastW.hpp"
19 #include <exception>
20 #include "Vector/map_vector.hpp"
21 #ifdef DEBUG
22 #include "util/check_no_pointers.hpp"
23 #include "util/util_debug.hpp"
24 #endif
25 #include "util/Vcluster_log.hpp"
26 #include "memory/BHeapMemory.hpp"
27 #include "Packer_Unpacker/has_max_prop.hpp"
28 #include "data_type/aggregate.hpp"
29 #include "util/ofp_context.hpp"
30 
31 #if defined (ENABLE_NUMERICS) && defined (HAVE_PETSC)
32 #include <petscvec.h>
33 #endif
34 
35 extern double time_spent;
36 
37 enum NBX_Type
38 {
39  NBX_UNACTIVE,
40  NBX_UNKNOWN,
41  NBX_KNOWN,
42  NBX_KNOWN_PRC
43 };
44 
45 constexpr int MSG_LENGTH = 1024;
46 constexpr int MSG_SEND_RECV = 1025;
47 constexpr int SEND_SPARSE = 8192;
48 constexpr int NONE = 1;
49 constexpr int NEED_ALL_SIZE = 2;
50 
51 constexpr int SERIVCE_MESSAGE_TAG = 16384;
52 constexpr int SEND_RECV_BASE = 4096;
53 constexpr int GATHER_BASE = 24576;
54 
55 constexpr int RECEIVE_KNOWN = 4;
56 constexpr int KNOWN_ELEMENT_OR_BYTE = 8;
57 constexpr int MPI_GPU_DIRECT = 16;
58 
59 constexpr int NQUEUE = 4;
60 
61 // number of vcluster instances
62 extern size_t n_vcluster;
63 // Global MPI initialization
64 extern bool global_mpi_init;
65 // initialization flag
66 extern bool ofp_initialized;
67 extern size_t tot_sent;
68 extern size_t tot_recv;
69 
71 
72 //extern size_t NBX_cnt;
73 
74 template<typename T> void assign(T * ptr1, T * ptr2)
75 {
76  *ptr1 = *ptr2;
77 };
78 
79 
81 union red
82 {
84  char c;
86  unsigned char uc;
88  short s;
90  unsigned short us;
92  int i;
94  unsigned int ui;
96  float f;
98  double d;
99 };
100 
124 template<typename InternalMemory>
126 {
128  MPI_Comm ext_comm;
129 
144  size_t NBX_cnt;
145 
148 
152 
155 
158 
161 
163  std::vector<int> post_exe;
164 
167 
168  // Single objects
169 
171  int m_size;
173  int m_rank;
174 
176  int numPE = 1;
177 
179 
180  NBX_Type NBX_active[NQUEUE];
181 
183  size_t rid[NQUEUE];
184 
186  int NBX_prc_qcnt = -1;
187 
190 
192 
193  int NBX_prc_cnt_base = 0;
194  size_t NBX_prc_n_send[NQUEUE];
195  size_t * NBX_prc_prc[NQUEUE];
196  void ** NBX_prc_ptr[NQUEUE];
197  size_t * NBX_prc_sz[NQUEUE];
198  size_t NBX_prc_n_recv[NQUEUE];
199  void * (* NBX_prc_msg_alloc[NQUEUE])(size_t,size_t,size_t,size_t,size_t,size_t,void *);
200  size_t * NBX_prc_prc_recv[NQUEUE];
201  void * NBX_prc_ptr_arg[NQUEUE];
202 
204 
212  std::vector<red> r;
213 
216 
219 
221  MPI_Request bar_req;
222 
224  MPI_Status bar_stat;
225 
227  Vcluster_base & operator=(const Vcluster_base &) {return *this;};
228 
230  int shmrank;
231 
234 
237  {};
238 
239  void queue_all_sends(size_t n_send , size_t sz[],
240  size_t prc[], void * ptr[])
241  {
242  if (stat.size() != 0 || (req.size() != 0 && NBX_prc_qcnt == 0))
243  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";}
244 
245 
246  if (NBX_prc_qcnt == 0)
247  {
248  stat.clear();
249  req.clear();
250  // Do MPI_Issend
251  }
252 
253  for (size_t i = 0 ; i < n_send ; i++)
254  {
255  if (sz[i] != 0)
256  {
257  req.add();
258 
259 #ifdef SE_CLASS2
260  check_valid(ptr[i],sz[i]);
261 #endif
262 
263  tot_sent += sz[i];
264 
265 // std::cout << "TAG: " << SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i << " " << NBX_cnt << " " << NBX_prc_qcnt << " " << " rank: " << rank() << " " << NBX_prc_cnt_base << " nbx_cycle: " << nbx_cycle << std::endl;
266 
267  if (sz[i] > 2147483647)
268  {MPI_SAFE_CALL(MPI_Issend(ptr[i], (sz[i] >> 3) + 1 , MPI_DOUBLE, prc[i], SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i, ext_comm,&req.last()));}
269  else
270  {MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i, ext_comm,&req.last()));}
271  log.logSend(prc[i]);
272  }
273  }
274  }
275 
276 protected:
277 
279  openfpm::vector_fr<BMemory<InternalMemory>> recv_buf[NQUEUE];
280 
283 
284 public:
285 
286  // Finalize the MPI program
287  ~Vcluster_base()
288  {
289 #ifdef SE_CLASS2
290  check_delete(this);
291 #endif
292  n_vcluster--;
293 
294  // if there are no other vcluster instances finalize
295  if (n_vcluster == 0)
296  {
297  int already_finalised;
298 
299  MPI_Finalized(&already_finalised);
300  if (!already_finalised && ext_comm == MPI_COMM_WORLD)
301  {
302  if (MPI_Finalize() != 0)
303  {
304  std::cerr << __FILE__ << ":" << __LINE__ << " MPI_Finalize FAILED \n";
305  }
306  }
307  }
308 
309  delete gpuContext;
310  }
317  Vcluster_base(int *argc, char ***argv, MPI_Comm ext_comm)
319  {
320  // reset NBX_Active
321 
322  for (unsigned int i = 0 ; i < NQUEUE ; i++)
323  {
324  NBX_active[i] = NBX_Type::NBX_UNACTIVE;
325  rid[i] = 0;
326  }
327 
328 #ifdef SE_CLASS2
329  check_new(this,8,VCLUSTER_EVENT,PRJ_VCLUSTER);
330 #endif
331 
332  n_vcluster++;
333 
334  int already_initialised;
335  MPI_Initialized(&already_initialised);
336 
337  // Check if MPI is already initialized
338  if (!already_initialised)
339  {
340  MPI_Init(argc,argv);
341  }
342 
343  // We try to get the local processors rank
344 
345  MPI_Comm shmcomm;
346  MPI_Comm_split_type(ext_comm, MPI_COMM_TYPE_SHARED, 0,
347  MPI_INFO_NULL, &shmcomm);
348 
349  MPI_Comm_rank(shmcomm, &shmrank);
350  MPI_Comm_free(&shmcomm);
351 
352  // Get the total number of process
353  // and the rank of this process
354 
355  MPI_Comm_size(ext_comm, &m_size);
356  MPI_Comm_rank(ext_comm, &m_rank);
357 
358 #ifdef SE_CLASS2
359  process_v_cl = m_rank;
360 #endif
361 
362  // create and fill map scatter with one
363  map_scatter.resize(m_size);
364 
365  for (size_t i = 0 ; i < map_scatter.size() ; i++)
366  {
367  map_scatter.get(i) = 1;
368  }
369 
370  // open the log file
371  log.openLog(m_rank);
372 
373  // Initialize bar_req
374  bar_req = MPI_Request();
375  bar_stat = MPI_Status();
376 
377 #ifdef EXTERNAL_SET_GPU
378  int dev;
379  cudaGetDevice(&dev);
380  gpuContext = new gpu::ofp_context_t(gpu::gpu_context_opt::no_print_props,dev);
381 #else
382  gpuContext = new gpu::ofp_context_t(gpu::gpu_context_opt::no_print_props,shmrank);
383 #endif
384 
385 
386 #if defined(PRINT_RANK_TO_GPU) && defined(CUDA_GPU)
387 
388  char node_name[MPI_MAX_PROCESSOR_NAME];
389  int len;
390 
391  MPI_Get_processor_name(node_name,&len);
392 
393  std::cout << "Rank: " << m_rank << " on host: " << node_name << " work on GPU: " << gpuContext->getDevice() << "/" << gpuContext->getNDevice() << std::endl;
394 #endif
395 
396  int flag;
397  void *tag_ub_v;
398  int tag_ub;
399 
400  MPI_Comm_get_attr(ext_comm, MPI_TAG_UB, &tag_ub_v, &flag);
401 
402  if (flag == true)
403  {
404  tag_ub = *(int*)tag_ub_v;
405  nbx_cycle = (tag_ub - SEND_SPARSE - 131072 - NQUEUE*131072) / 131072;
406 
407  if (nbx_cycle < NQUEUE*2) {
408  std::cerr << __FILE__ << ":" << __LINE__ << " Error MPI_TAG_UB is too small for OpenFPM" << std::endl;
409  }
410  }
411  else
412  {
413  nbx_cycle = 2048;
414  }
415  }
416 
417 #ifdef SE_CLASS1
418 
429  template<typename T> void checkType()
430  {
431  // if T is a primitive like int, long int, float, double, ... make sense
432  // (pointers, l-references and r-references are not fundamentals)
433  if (std::is_fundamental<T>::value == true)
434  {return;}
435 
436  // if it is a pointer make no sense
437  if (std::is_pointer<T>::value == true)
438  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
439 
440  // if it is an l-value reference make no send
441  if (std::is_lvalue_reference<T>::value == true)
442  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
443 
444  // if it is an r-value reference make no send
445  if (std::is_rvalue_reference<T>::value == true)
446  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
447 
448  // ... if not, check that T has a method called noPointers
449  switch (check_no_pointers<T>::value())
450  {
451  case PNP::UNKNOWN:
452  {
453  std::cerr << "Warning: " << __FILE__ << ":" << __LINE__ << " impossible to check the type " << demangle(typeid(T).name()) << " please consider to add a static method \"static bool noPointers()\" \n" ;
454  break;
455  }
456  case PNP::POINTERS:
457  {
458  std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " has pointers inside, sending pointers values has no sense\n";
459  break;
460  }
461  default:
462  {
463 
464  }
465  }
466  }
467 
468 #endif
469 
476  {
477  if (gpuContext == NULL && iw == true)
478  {
479  std::cout << __FILE__ << ":" << __LINE__ << " Warning: it seem that a gpu context is not initialized."
480  "Either a compatible working cuda device has not been found, either openfpm_init has been called in a file that not compiled with NVCC" << std::endl;
481  }
482 
483  return *gpuContext;
484  }
485 
491  MPI_Comm getMPIComm()
492  {
493  return ext_comm;
494  }
495 
502  {
503  return m_size*numPE;
504  }
505 
515  size_t size()
516  {
517  return this->m_size*numPE;
518  }
519 
520  void print_stats()
521  {
522 #ifdef VCLUSTER_PERF_REPORT
523  std::cout << "-- REPORT COMMUNICATIONS -- " << std::endl;
524 
525  std::cout << "Processor " << this->rank() << " sent: " << tot_sent << std::endl;
526  std::cout << "Processor " << this->rank() << " received: " << tot_recv << std::endl;
527 
528  std::cout << "Processor " << this->rank() << " time spent: " << time_spent << std::endl;
529  std::cout << "Processor " << this->rank() << " Bandwidth: S:" << (double)tot_sent / time_spent * 1e-9 << "GB/s R:" << (double)tot_recv / time_spent * 1e-9 << "GB/s" << std::endl;
530 #else
531 
532  std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
533 
534 #endif
535  }
536 
537  void clear_stats()
538  {
539 #ifdef VCLUSTER_PERF_REPORT
540 
541  tot_sent = 0;
542  tot_recv = 0;
543 
544  time_spent = 0;
545 #else
546 
547  std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
548 
549 #endif
550  }
551 
558  {
559  return m_rank;
560  }
561 
571  size_t rank()
572  {
573  return m_rank;
574  }
575 
576 
583  template<typename T> void sum(T & num)
584  {
585 #ifdef SE_CLASS1
586  checkType<T>();
587 #endif
588 
589  // reduce over MPI
590 
591  // Create one request
592  req.add();
593 
594  // reduce
595  MPI_IallreduceW<T>::reduce(num,MPI_SUM,req.last(), ext_comm);
596  }
597 
603  template<typename T> void max(T & num)
604  {
605 #ifdef SE_CLASS1
606  checkType<T>();
607 #endif
608  // reduce over MPI
609 
610  // Create one request
611  req.add();
612 
613  // reduce
614  MPI_IallreduceW<T>::reduce(num,MPI_MAX,req.last(), ext_comm);
615  }
616 
623  template<typename T> void min(T & num)
624  {
625 #ifdef SE_CLASS1
626  checkType<T>();
627 #endif
628  // reduce over MPI
629 
630  // Create one request
631  req.add();
632 
633  // reduce
634  MPI_IallreduceW<T>::reduce(num,MPI_MIN,req.last(), ext_comm);
635  }
636 
643  {
644  MPI_Status stat_t;
645  int stat = false;
646  MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG, ext_comm,&stat,&stat_t));
647 
648  // If I have an incoming message and is related to this NBX communication
649  if (stat == true)
650  {
651  unsigned int i = (stat_t.MPI_TAG - SEND_SPARSE) / 131072 - NBX_prc_cnt_base;
652 
653  if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
654  {return;}
655 
656  int msize_;
657  long int msize;
658  bool big_data = true;
659 
660  // Get the message tag and size
661 
662  MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_DOUBLE,&msize_));
663  if (msize_ == MPI_UNDEFINED)
664  {
665  big_data = false;
666  MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize_));
667  msize = msize_;
668  }
669  else
670  {
671  msize = ((size_t)msize_) << 3;
672  }
673 
674  // Ok we check if the TAG come from one of our send TAG
675  if (stat_t.MPI_TAG >= (int)(SEND_SPARSE + NBX_prc_cnt_base*131072) && stat_t.MPI_TAG < (int)(SEND_SPARSE + (NBX_prc_cnt_base + NBX_prc_qcnt + 1)*131072))
676  {
677  // Get the pointer to receive the message
678  void * ptr = this->NBX_prc_msg_alloc[i](msize,0,0,stat_t.MPI_SOURCE,rid[i],stat_t.MPI_TAG,this->NBX_prc_ptr_arg[i]);
679 
680  // Log the receiving request
681  log.logRecv(stat_t);
682 
683  rid[i]++;
684 
685  // Check the pointer
686 #ifdef SE_CLASS2
687  check_valid(ptr,msize);
688 #endif
689  tot_recv += msize;
690 #ifdef VCLUSTER_GARBAGE_INJECTOR
691 #if defined (__NVCC__) && !defined(CUDA_ON_CPU)
692  cudaPointerAttributes cpa;
693  auto error = cudaPointerGetAttributes(&cpa,ptr);
694  if (error == cudaSuccess)
695  {
696  if(cpa.type == cudaMemoryTypeDevice)
697  {cudaMemset(ptr,0xFF,msize);}
698  else
699  {memset(ptr,0xFF,msize);}
700  }
701 #else
702  memset(ptr,0xFF,msize);
703 #endif
704 #endif
705  if (big_data == true)
706  {
707 // std::cout << "RECEVING BIG MESSAGE " << msize_ << " " << msize << std::endl;
708  MPI_SAFE_CALL(MPI_Recv(ptr,msize >> 3,MPI_DOUBLE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,ext_comm,&stat_t));
709  }
710  else
711  {
712  MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,ext_comm,&stat_t));
713  }
714 #ifdef SE_CLASS2
715  check_valid(ptr,msize);
716 #endif
717  }
718  }
719 
720  // Check the status of all the MPI_issend and call the barrier if finished
721 
722  for (unsigned int i = 0 ; i < NQUEUE ; i++)
723  {
724  if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
725  {continue;}
726 
727  if (NBX_prc_reached_bar_req[i] == false)
728  {
729  int flag = false;
730  if (req.size() != 0)
731  {MPI_SAFE_CALL(MPI_Testall(req.size(),&req.get(0),&flag,MPI_STATUSES_IGNORE));}
732  else
733  {flag = true;}
734 
735  // If all send has been completed
736  if (flag == true)
737  {MPI_SAFE_CALL(MPI_Ibarrier(ext_comm,&bar_req));NBX_prc_reached_bar_req[i] = true;}
738  }
739  }
740  }
741 
785  template<typename T> void sendrecvMultipleMessagesNBX(
787  openfpm::vector< T > & data,
788  openfpm::vector< size_t > & prc_recv,
789  openfpm::vector< size_t > & recv_sz ,
790  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
791  void * ptr_arg,
792  long int opt=NONE
793  ) {
794 #ifdef VCLUSTER_PERF_REPORT
795  timer nbx_timer;
796  nbx_timer.start();
797 
798 #endif
799 
800  // Allocate the buffers
801 
802  for (size_t i = 0 ; i < prc.size() ; i++)
803  {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
804 
805  for (size_t i = 0 ; i < prc_recv.size() ; i++)
806  {
807  void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
808 
809  recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
810  }
811 
812  execute();
813 
814  // Circular counter
815  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
816 
817 #ifdef VCLUSTER_PERF_REPORT
818  nbx_timer.stop();
819  time_spent += nbx_timer.getwct();
820 #endif
821  }
822 
866  template<typename T> void sendrecvMultipleMessagesNBXAsync(
868  openfpm::vector< T > & data,
869  openfpm::vector< size_t > & prc_recv,
870  openfpm::vector< size_t > & recv_sz ,
871  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
872  void * ptr_arg,
873  long int opt=NONE
874  ) {
875  NBX_prc_qcnt++;
876  if (NBX_prc_qcnt >= NQUEUE)
877  {
878  std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
879  return;
880  }
881 
882  // Allocate the buffers
883 
884  for (size_t i = 0 ; i < prc.size() ; i++)
885  {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
886 
887  for (size_t i = 0 ; i < prc_recv.size() ; i++)
888  {
889  void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
890 
891  recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
892  }
893 
894  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
895  if (NBX_prc_qcnt == 0)
896  {NBX_prc_cnt_base = NBX_cnt;}
897  }
898 
934  template<typename T>
937  openfpm::vector< T > & data,
938  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
939  void * ptr_arg, long int opt=NONE)
940  {
941 #ifdef SE_CLASS1
942  checkType<typename T::value_type>();
943 #endif
944 
945  // resize the pointer list
946  ptr_send[NBX_prc_qcnt+1].resize(prc.size());
947  sz_send[NBX_prc_qcnt+1].resize(prc.size());
948 
949  for (size_t i = 0 ; i < prc.size() ; i++)
950  {
951  ptr_send[NBX_prc_qcnt+1].get(i) = data.get(i).getPointer();
952  sz_send[NBX_prc_qcnt+1].get(i) = data.get(i).size() * sizeof(typename T::value_type);
953  }
954 
955  sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send[NBX_prc_qcnt+1].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt+1].getPointer(),msg_alloc,ptr_arg,opt);
956  }
957 
997  template<typename T>
999  openfpm::vector< T > & data,
1000  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1001  void * ptr_arg, long int opt=NONE)
1002  {
1003 #ifdef SE_CLASS1
1004  checkType<typename T::value_type>();
1005 #endif
1006  // resize the pointer list
1007  ptr_send[NBX_prc_qcnt+1].resize(prc.size());
1008  sz_send[NBX_prc_qcnt+1].resize(prc.size());
1009 
1010  for (size_t i = 0 ; i < prc.size() ; i++)
1011  {
1012  ptr_send[NBX_prc_qcnt+1].get(i) = data.get(i).getPointer();
1013  sz_send[NBX_prc_qcnt+1].get(i) = data.get(i).size() * sizeof(typename T::value_type);
1014  }
1015 
1016  sendrecvMultipleMessagesNBXAsync(prc.size(),(size_t *)sz_send[NBX_prc_qcnt+1].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt+1].getPointer(),msg_alloc,ptr_arg,opt);
1017  }
1018 
1062  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
1063  size_t prc[] , void * ptr[],
1064  size_t n_recv, size_t prc_recv[] ,
1065  size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1066  void * ptr_arg, long int opt=NONE)
1067  {
1068 #ifdef VCLUSTER_PERF_REPORT
1069  timer nbx_timer;
1070  nbx_timer.start();
1071 
1072 #endif
1073 
1074  // Allocate the buffers
1075 
1076  for (size_t i = 0 ; i < n_send ; i++)
1077  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1078 
1079  for (size_t i = 0 ; i < n_recv ; i++)
1080  {
1081  void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1082 
1083  recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1084  }
1085 
1086  execute();
1087 
1088  // Circular counter
1089  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1090 
1091 #ifdef VCLUSTER_PERF_REPORT
1092  nbx_timer.stop();
1093  time_spent += nbx_timer.getwct();
1094 #endif
1095  }
1096 
1140  void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1141  size_t prc[] , void * ptr[],
1142  size_t n_recv, size_t prc_recv[] ,
1143  size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1144  void * ptr_arg, long int opt=NONE)
1145  {
1146  NBX_prc_qcnt++;
1147  if (NBX_prc_qcnt >= NQUEUE)
1148  {
1149  std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1150  return;
1151  }
1152 
1153  // Allocate the buffers
1154 
1155  for (size_t i = 0 ; i < n_send ; i++)
1156  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1157 
1158  for (size_t i = 0 ; i < n_recv ; i++)
1159  {
1160  void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1161 
1162  recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1163  }
1164 
1165  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
1166  if (NBX_prc_qcnt == 0)
1167  {NBX_prc_cnt_base = NBX_cnt;}
1168  }
1169 
1170  openfpm::vector<size_t> sz_recv_tmp;
1171 
1214  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
1215  void * ptr[], size_t n_recv, size_t prc_recv[] ,
1216  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1217  void * ptr_arg, long int opt=NONE)
1218  {
1219 #ifdef VCLUSTER_PERF_REPORT
1220  timer nbx_timer;
1221  nbx_timer.start();
1222 #endif
1223 
1224  sz_recv_tmp.resize(n_recv);
1225 
1226  // First we understand the receive size for each processor
1227 
1228  for (size_t i = 0 ; i < n_send ; i++)
1229  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1230 
1231  for (size_t i = 0 ; i < n_recv ; i++)
1232  {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1233 
1234  execute();
1235 
1236  // Circular counter
1237  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1238 
1239  // Allocate the buffers
1240 
1241  for (size_t i = 0 ; i < n_send ; i++)
1242  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1243 
1244  for (size_t i = 0 ; i < n_recv ; i++)
1245  {
1246  void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,0,ptr_arg);
1247 
1248  recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1249  }
1250 
1251  execute();
1252 
1253  // Circular counter
1254  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1255 
1256 #ifdef VCLUSTER_PERF_REPORT
1257  nbx_timer.stop();
1258  time_spent += nbx_timer.getwct();
1259 #endif
1260  }
1261 
1304  void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[], size_t prc[] ,
1305  void * ptr[], size_t n_recv, size_t prc_recv[] ,
1306  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1307  void * ptr_arg, long int opt=NONE)
1308  {
1309  NBX_prc_qcnt++;
1310  if (NBX_prc_qcnt >= NQUEUE)
1311  {
1312  std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1313  return;
1314  }
1315 
1316  sz_recv_tmp.resize(n_recv);
1317 
1318  // First we understand the receive size for each processor
1319 
1320  for (size_t i = 0 ; i < n_send ; i++)
1321  {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1322 
1323  for (size_t i = 0 ; i < n_recv ; i++)
1324  {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1325 
1327 
1328  NBX_prc_n_send[NBX_prc_qcnt] = n_send;
1329  NBX_prc_prc[NBX_prc_qcnt] = prc;
1330  NBX_prc_ptr[NBX_prc_qcnt] = ptr;
1331  NBX_prc_sz[NBX_prc_qcnt] = sz;
1332  NBX_prc_n_recv[NBX_prc_qcnt] = n_recv;
1333  NBX_prc_prc_recv[NBX_prc_qcnt] = prc_recv;
1334  NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1335  NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1336 
1337  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN_PRC;
1338  if (NBX_prc_qcnt == 0)
1339  {NBX_prc_cnt_base = NBX_cnt;}
1340  }
1341 
1382  void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
1383  size_t prc[] , void * ptr[],
1384  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1385  void * ptr_arg, long int opt = NONE)
1386  {
1387 #ifdef VCLUSTER_PERF_REPORT
1388  timer nbx_timer;
1389  nbx_timer.start();
1390 
1391 #endif
1392 
1393  NBX_prc_qcnt++;
1394  if (NBX_prc_qcnt != 0)
1395  {
1396  std::cout << __FILE__ << ":" << __LINE__ << " error there are some asynchronous call running you have to complete them before go back to synchronous" << std::endl;
1397  return;
1398  }
1399 
1400  queue_all_sends(n_send,sz,prc,ptr);
1401 
1402  this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1403  this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1404 
1405  rid[NBX_prc_qcnt] = 0;
1406  int flag = false;
1407 
1409  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1410  NBX_prc_cnt_base = NBX_cnt;
1411 
1412  log.start(10);
1413 
1414  // Wait that all the send are acknowledge
1415  do
1416  {
1418 
1419  // Check if all processor reached the async barrier
1421  {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1422 
1423  // produce a report if communication get stuck
1425 
1426  } while (flag == false);
1427 
1428  // Remove the executed request
1429 
1430  req.clear();
1431  stat.clear();
1432  log.clear();
1433 
1434  // Circular counter
1435  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1436  NBX_prc_qcnt = -1;
1437 
1438 #ifdef VCLUSTER_PERF_REPORT
1439  nbx_timer.stop();
1440  time_spent += nbx_timer.getwct();
1441 #endif
1442  }
1443 
1488  void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1489  size_t prc[] , void * ptr[],
1490  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1491  void * ptr_arg, long int opt = NONE)
1492  {
1493  NBX_prc_qcnt++;
1494  queue_all_sends(n_send,sz,prc,ptr);
1495 
1496  this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1497  this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1498 
1499  rid[NBX_prc_qcnt] = 0;
1500 
1502  NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1503 
1504  log.start(10);
1505  if (NBX_prc_qcnt == 0)
1506  {NBX_prc_cnt_base = NBX_cnt;}
1507 
1508  return;
1509  }
1510 
1516  {
1517  for (unsigned int j = 0 ; j < NQUEUE ; j++)
1518  {
1519  if (NBX_active[j] == NBX_Type::NBX_UNACTIVE)
1520  {continue;}
1521 
1522  if (NBX_active[j] == NBX_Type::NBX_KNOWN_PRC)
1523  {
1524  execute();
1525 
1526  // Circular counter
1527  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1528 
1529  // Allocate the buffers
1530 
1531  for (size_t i = 0 ; i < NBX_prc_n_send[j] ; i++)
1532  {send(NBX_prc_prc[j][i],SEND_SPARSE + NBX_cnt*131072,NBX_prc_ptr[j][i],NBX_prc_sz[j][i]);}
1533 
1534  for (size_t i = 0 ; i < NBX_prc_n_recv[j] ; i++)
1535  {
1536  void * ptr_recv = NBX_prc_msg_alloc[j](sz_recv_tmp.get(i),0,0,NBX_prc_prc_recv[j][i],i,0,this->NBX_prc_ptr_arg[j]);
1537 
1538  recv(NBX_prc_prc_recv[j][i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1539  }
1540 
1541  NBX_active[j] = NBX_Type::NBX_KNOWN;
1542  }
1543 
1544  if (NBX_active[j] == NBX_Type::NBX_KNOWN)
1545  {
1546  execute();
1547 
1548  // Circular counter
1549  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1550  NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1551 
1552  continue;
1553  }
1554 
1555  int flag = false;
1556 
1557  // Wait that all the send are acknowledge
1558  do
1559  {
1561 
1562  // Check if all processor reached the async barrier
1563  if (NBX_prc_reached_bar_req[j])
1564  {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1565 
1566  // produce a report if communication get stuck
1568 
1569  } while (flag == false);
1570 
1571  // Remove the executed request
1572 
1573  req.clear();
1574  stat.clear();
1575  log.clear();
1576 
1577  // Circular counter
1578  NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1579  NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1580 
1581  }
1582 
1583  NBX_prc_qcnt = -1;
1584  return;
1585  }
1586 
1605  bool send(size_t proc, size_t tag, const void * mem, size_t sz)
1606  {
1607  // send over MPI
1608 
1609  // Create one request
1610  req.add();
1611 
1612  // send
1613  MPI_IsendWB::send(proc,SEND_RECV_BASE + tag,mem,sz,req.last(),ext_comm);
1614 
1615  return true;
1616  }
1617 
1618 
1636  template<typename T, typename Mem, template<typename> class gr> bool send(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1637  {
1638 #ifdef SE_CLASS1
1639  checkType<T>();
1640 #endif
1641 
1642  // send over MPI
1643 
1644  // Create one request
1645  req.add();
1646 
1647  // send
1648  MPI_IsendW<T,Mem,gr>::send(proc,SEND_RECV_BASE + tag,v,req.last(),ext_comm);
1649 
1650  return true;
1651  }
1652 
1671  bool recv(size_t proc, size_t tag, void * v, size_t sz)
1672  {
1673  // recv over MPI
1674 
1675  // Create one request
1676  req.add();
1677 
1678  // receive
1679  MPI_IrecvWB::recv(proc,SEND_RECV_BASE + tag,v,sz,req.last(),ext_comm);
1680 
1681  return true;
1682  }
1683 
1701  template<typename T, typename Mem, template<typename> class gr> bool recv(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1702  {
1703 #ifdef SE_CLASS1
1704  checkType<T>();
1705 #endif
1706 
1707  // recv over MPI
1708 
1709  // Create one request
1710  req.add();
1711 
1712  // receive
1713  MPI_IrecvW<T>::recv(proc,SEND_RECV_BASE + tag,v,req.last(),ext_comm);
1714 
1715  return true;
1716  }
1717 
1730  template<typename T, typename Mem, template<typename> class gr> bool allGather(T & send, openfpm::vector<T,Mem,gr> & v)
1731  {
1732 #ifdef SE_CLASS1
1733  checkType<T>();
1734 #endif
1735 
1736  // Create one request
1737  req.add();
1738 
1739  // Number of processors
1740  v.resize(getProcessingUnits());
1741 
1742  // gather
1743  MPI_IAllGatherW<T>::gather(&send,1,v.getPointer(),1,req.last(),ext_comm);
1744 
1745  return true;
1746  }
1747 
1764  template<typename T, typename Mem, template<typename> class layout_base >
1766  {
1767 #ifdef SE_CLASS1
1768  checkType<T>();
1769 #endif
1770 
1771  b_cast_helper<openfpm::vect_isel<T>::value == STD_VECTOR || is_layout_mlin<layout_base<T>>::value >::bcast_(req,v,root, ext_comm);
1772 
1773  return true;
1774  }
1775 
1779  void execute()
1780  {
1781  // if req == 0 return
1782  if (req.size() == 0)
1783  return;
1784 
1785  // Wait for all the requests
1786  stat.resize(req.size());
1787 
1788  MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));
1789 
1790  // Remove executed request and status
1791  req.clear();
1792  stat.clear();
1793  }
1794 
1799  void clear()
1800  {
1801  for (size_t i = 0 ; i < NQUEUE ; i++)
1802  {recv_buf[i].clear();}
1803  }
1804 };
1805 
1806 
1807 
1808 
1809 #endif
1810 
General recv for vector of.
Set of wrapping classing for MPI_Iallreduce.
static void recv(size_t proc, size_t tag, void *buf, size_t sz, MPI_Request &req, MPI_Comm ext_comm)
General recv for general buffer.
Definition: MPI_IrecvW.hpp:25
General recv for vector of.
Definition: MPI_IrecvW.hpp:45
General send for a vector of any type.
Definition: MPI_IsendW.hpp:40
This class virtualize the cluster of PC as a set of processes that communicate.
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
void progressCommunication()
In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function progress th...
MPI_Request bar_req
barrier request
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
void execute()
Execute all the requests.
int NBX_prc_qcnt
NBX comunication on queue (-1 mean 0, 0 mean 1, 1 mean 2, .... )
int nbx_cycle
NBX_cycle.
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
Vcluster_base(int *argc, char ***argv, MPI_Comm ext_comm)
Virtual cluster constructor.
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages Asynchronous version.
size_t rank()
Get the process unit id.
size_t size()
Get the total number of processors.
Vcluster_log log
log file
void sum(T &num)
Sum the numbers across all processors and get the result.
openfpm::vector_fr< BMemory< InternalMemory > > recv_buf[NQUEUE]
Receive buffers.
bool send(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Send data to a processor.
Vcluster_base(const Vcluster_base &)
disable copy constructor
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
int m_size
number of processes
Vcluster_base & operator=(const Vcluster_base &)
disable operator=
void clear()
Release the buffer used for communication.
bool Bcast(openfpm::vector< T, Mem, layout_base > &v, size_t root)
Broadcast the data to all processors.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], size_t sz_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
size_t getProcessUnitID()
Get the process unit id.
openfpm::vector< size_t > proc_com
openfpm::vector< size_t > sz_send[NQUEUE]
vector of the size of send buffers
void min(T &num)
Get the minimum number across all processors (or reduction with insinity norm)
gpu::ofp_context_t & getGpuContext(bool iw=true)
If nvidia cuda is activated return a gpu context.
size_t getProcessingUnits()
Get the total number of processors.
openfpm::vector< void * > ptr_send[NQUEUE]
vector of pointers of send buffers
std::vector< red > r
gpu::ofp_context_t * gpuContext
standard context for gpu (if cuda is detected otherwise is unused)
bool recv(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Recv data from a processor.
int m_rank
actual rank
openfpm::vector< int > map_scatter
vector that contain the scatter map (it is basically an array of one)
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
openfpm::vector< size_t > tags[NQUEUE]
tags receiving
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
int shmrank
rank within the node
std::vector< int > post_exe
vector of functions to execute after all the request has been performed
openfpm::vector< MPI_Request > req
vector of MPI requests
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
int numPE
number of processing unit per process
bool NBX_prc_reached_bar_req[NQUEUE]
Is the barrier request reached.
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
MPI_Comm ext_comm
external communicator
openfpm::vector< MPI_Status > stat
vector of MPI status
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], size_t sz_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
MPI_Status bar_stat
barrier status
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Vcluster log.
size_t size()
Stub size.
Definition: map_vector.hpp:212
Class for cpu time benchmarking.
Definition: timer.hpp:28
void stop()
Stop the timer.
Definition: timer.hpp:119
void start()
Start the timer.
Definition: timer.hpp:90
double getwct()
Return the elapsed real time.
Definition: timer.hpp:130
This class check if the type T has pointers inside.
Definition: ids.hpp:19
temporal buffer for reductions
double d
double
float f
float
char c
char
short s
signed
unsigned int ui
unsigned integer
unsigned short us
unsigned short
int i
integer
unsigned char uc
unsigned char