OpenFPM  5.2.0
Project that contain the implementation of distributed structures
VCluster.hpp
1 /*
2  * Vcluster.hpp
3  *
4  * Created on: Feb 8, 2016
5  * Author: Pietro Incardona
6  */
7 
8 #ifndef VCLUSTER_HPP
9 #define VCLUSTER_HPP
10 
11 #include <signal.h>
12 
13 #include "VCluster_base.hpp"
14 #include "VCluster_meta_function.hpp"
15 #include "util/math_util_complex.hpp"
16 #include "memory/mem_conf.hpp"
17 #include "util/cuda_util.hpp"
18 
19 #ifdef CUDA_GPU
20 extern CudaMemory mem_tmp;
21 
22 #ifndef MAX_NUMER_OF_PROPERTIES
23 #define MAX_NUMER_OF_PROPERTIES 20
24 #endif
25 
26 extern CudaMemory exp_tmp;
27 extern CudaMemory exp_tmp2[MAX_NUMER_OF_PROPERTIES];
28 
29 extern CudaMemory rem_tmp;
30 extern CudaMemory rem_tmp2[MAX_NUMER_OF_PROPERTIES];
31 
32 #endif
33 
34 extern size_t NBX_cnt;
35 
36 void bt_sighandler(int sig, siginfo_t * info, void * ctx);
37 
57 template<typename InternalMemory = HeapMemory>
58 class Vcluster: public Vcluster_base<InternalMemory>
59 {
60  // Internal memory
61  ExtPreAlloc<HeapMemory> * mem[NQUEUE];
62 
63  // Buffer that store the received bytes
64  openfpm::vector<size_t> sz_recv_byte[NQUEUE];
65 
66  // The sending buffer used by semantic calls
68  openfpm::vector<size_t> send_sz_byte;
69  openfpm::vector<size_t> prc_send_;
70 
71  unsigned int NBX_prc_scnt = 0;
72  unsigned int NBX_prc_pcnt = 0;
73 
75 
76  // Internal Heap memory
77  HeapMemory * pmem[NQUEUE];
78 
86  template<typename Memory>
87  struct base_info
88  {
90  openfpm::vector_fr<BMemory<Memory>> * recv_buf;
97 
99  size_t opt;
100 
103  {}
104 
108  {}
109 
111  {
112  this->recv_buf = recv_buf;
113  this->prc = &prc;
114  this->sz = &sz;
115  this->tags = &tags;
116  this->opt = opt;
117  }
118  };
119 
120  // Internal temporaty buffer
121  base_info<InternalMemory> NBX_prc_bi[NQUEUE];
122 
123  typedef Vcluster_base<InternalMemory> self_base;
124 
125  template<typename T>
126  struct index_gen {};
127 
129  template<int ... prp>
130  struct index_gen<index_tuple<prp...>>
131  {
133  template<typename op,
134  typename T,
135  typename S,
136  template <typename> class layout_base = memory_traits_lin>
137  inline static void process_recv(
138  Vcluster & vcl, S & recv,
139  openfpm::vector<size_t> * sz_recv,
140  openfpm::vector<size_t> * sz_recv_byte,
141  op & op_param,size_t opt
142  ) {
143  if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value)
144  {
145  // In order to have this option activated InternalMemory must be CudaMemory
146 
147  std::cout << __FILE__ << ":" << __LINE__ << " error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" <<
148  " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl;
149  }
150 
151  vcl.process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,sz_recv,sz_recv_byte,op_param,opt);
152  }
153  };
154 
173  template<typename op, typename T, typename S, template <typename> class layout_base>
176  S & recv,
177  openfpm::vector<size_t> & prc_send,
178  openfpm::vector<size_t> & prc_recv,
179  openfpm::vector<size_t> & sz_recv,
180  size_t opt
181  ) {
182  sz_recv_byte[NBX_prc_scnt].resize(sz_recv.size());
183 
184  // Reset the receive buffer
185  reset_recv_buf();
186 
187 #ifdef SE_CLASS1
188 
189  if (send.size() != prc_send.size())
190  std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
191 
192 #endif
193 
194  // Prepare the sending buffer
195  send_buf.resize(0);
196  send_sz_byte.resize(0);
197  prc_send_.resize(0);
198 
199  size_t tot_size = 0;
200 
201  for (size_t i = 0; i < send.size() ; i++)
202  {
203  size_t req = 0;
204 
205  //Pack requesting
206  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base>::packingRequest(send.get(i), req, send_sz_byte);
207  tot_size += req;
208  }
209 
211 
241 
242  pmem[NBX_prc_scnt] = new HeapMemory;
243 
244  mem[NBX_prc_scnt] = new ExtPreAlloc<HeapMemory>(tot_size,*pmem[NBX_prc_scnt]);
245  mem[NBX_prc_scnt]->incRef();
246 
247  for (size_t i = 0; i < send.size() ; i++)
248  {
249  //Packing
250 
251  Pack_stat sts;
252 
253  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value, op, T, S, layout_base>::packing(*mem[NBX_prc_scnt], send.get(i), sts, send_buf,opt);
254  }
255 
256  // receive information
257  NBX_prc_bi[NBX_prc_scnt].set(&this->recv_buf[NBX_prc_scnt],prc_recv,sz_recv_byte[NBX_prc_scnt],this->tags[NBX_prc_scnt],opt);
258 
259  // Send and recv multiple messages
260  if (opt & RECEIVE_KNOWN)
261  {
262  // We we are passing the number of element but not the byte, calculate the byte
263  if (opt & KNOWN_ELEMENT_OR_BYTE)
264  {
265  // We know the number of element convert to byte (ONLY if it is possible)
267  {
268  for (size_t i = 0 ; i < sz_recv.size() ; i++)
269  {sz_recv_byte[NBX_prc_scnt].get(i) = sz_recv.get(i) * sizeof(typename T::value_type);}
270  }
271  else
272  {
273 #ifndef DISABLE_ALL_RTTI
274  std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;
275 #endif
276  }
277 
278  self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
279  prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte[NBX_prc_scnt].getPointer(),msg_alloc_known,(void *)&NBX_prc_bi);
280  }
281  else
282  {
283  self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
284  prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&NBX_prc_bi);
285  sz_recv_byte[NBX_prc_scnt] = self_base::sz_recv_tmp;
286  }
287  }
288  else
289  {
290  self_base::tags[NBX_prc_scnt].clear();
291  prc_recv.clear();
292  self_base::sendrecvMultipleMessagesNBXAsync(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&NBX_prc_bi[NBX_prc_scnt]);
293  }
294  }
295 
296 
302  {
303  for (size_t i = 0 ; i < self_base::recv_buf[NBX_prc_scnt].size() ; i++)
304  {self_base::recv_buf[NBX_prc_scnt].get(i).resize(0);}
305 
306  self_base::recv_buf[NBX_prc_scnt].resize(0);
307  }
308 
322  static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
323  {
325 
326  if (rinfo.recv_buf == NULL)
327  {
328  std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
329  return NULL;
330  }
331 
332  rinfo.recv_buf->resize(ri+1);
333 
334  rinfo.recv_buf->get(ri).resize(msg_i);
335 
336  // Receive info
337  rinfo.prc->add(i);
338  rinfo.sz->add(msg_i);
339  rinfo.tags->add(tag);
340 
341  // return the pointer
342 
343  // If we have GPU direct activated use directly the cuda buffer
344  if (rinfo.opt & MPI_GPU_DIRECT)
345  {
346 #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
347  return rinfo.recv_buf->last().getDevicePointer();
348 #else
349  return rinfo.recv_buf->last().getPointer();
350 #endif
351  }
352 
353  return rinfo.recv_buf->last().getPointer();
354  }
355 
356 
370  static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
371  {
373 
374  if (rinfo.recv_buf == NULL)
375  {
376  std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
377  return NULL;
378  }
379 
380  rinfo.recv_buf->resize(ri+1);
381 
382  rinfo.recv_buf->get(ri).resize(msg_i);
383 
384  // return the pointer
385  return rinfo.recv_buf->last().getPointer();
386  }
387 
401  template<typename op, typename T, typename S, template <typename> class layout_base ,unsigned int ... prp >
403  S & recv,
405  openfpm::vector<size_t> * sz_byte,
406  op & op_param,
407  size_t opt
408  ) {
409  if (sz != NULL)
410  {sz->resize(self_base::recv_buf[NBX_prc_pcnt].size());}
411 
412  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv, self_base::recv_buf[NBX_prc_pcnt], sz, sz_byte, op_param,opt);
413  }
414 
415  public:
416 
423  Vcluster(int *argc, char ***argv, MPI_Comm ext_comm = MPI_COMM_WORLD)
424  :Vcluster_base<InternalMemory>(argc,argv, ext_comm)
425  {
426  }
427 
455  template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SGather(T & send, S & recv,size_t root)
456  {
459 
460  return SGather<T,S,layout_base>(send,recv,prc,sz,root);
461  }
462 
464  template<size_t index, size_t N> struct MetaFuncOrd {
465  enum { value = index };
466  };
467 
497  template<typename T,
498  typename S,
499  template <typename> class layout_base = memory_traits_lin>
500  bool SGather(
501  T & send,
502  S & recv,
505  size_t root)
506  {
507 #ifdef SE_CLASS1
508  if (&send == (T *)&recv)
509  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " using SGather in general the sending object and the receiving object must be different" << std::endl;}
510 #endif
511 
512  // Reset the receive buffer
513  reset_recv_buf();
514 
515  // If we are on master collect the information
516  if (self_base::getProcessUnitID() == root)
517  {
518  // send buffer (master does not send anything) so send req and send_buf
519  // remain buffer with size 0
520  openfpm::vector<size_t> send_req;
521 
522  self_base::tags[NBX_prc_scnt].clear();
523 
524  // receive information
525  base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
526 
527  // Send and recv multiple messages
528  self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
529 
530  // we generate the list of the properties to unpack
531  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
532 
533  // operation object
535 
536  // Reorder the buffer
537  reorder_buffer(prc,self_base::tags[NBX_prc_scnt],sz);
538 
539  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz,NULL,opa,0);
540 
541  recv.add(send);
542  prc.add(root);
543  sz.add(send.size());
544  }
545  else
546  {
547  // send buffer (master does not send anything) so send req and send_buf
548  // remain buffer with size 0
549  openfpm::vector<size_t> send_prc;
550  openfpm::vector<size_t> send_prc_;
551  send_prc.add(root);
552 
554 
556 
557  //Pack requesting
558 
559  size_t tot_size = 0;
560 
561  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packingRequest(send, tot_size, sz);
562 
563  HeapMemory pmem;
564 
565  ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
566  mem.incRef();
567 
568  //Packing
569 
570  Pack_stat sts;
571 
572  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf);
573 
575 
576  self_base::tags[NBX_prc_scnt].clear();
577 
578  // receive information
579  base_info<InternalMemory> bi(NULL,prc,sz,self_base::tags[NBX_prc_scnt],0);
580 
581  // Send and recv multiple messages
582  self_base::sendrecvMultipleMessagesNBX(send_prc_.size(),(size_t *)sz.getPointer(),(size_t *)send_prc_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);
583 
584  mem.decRef();
585  delete &mem;
586  }
587 
588  return true;
589  }
590 
595  void barrier()
596  {
597  MPI_Barrier(this->getMPIComm());
598  }
599 
626  template<typename T, typename S, template <typename> class layout_base=memory_traits_lin>
627  bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
628  {
629  // Reset the receive buffer
630  reset_recv_buf();
631 
632  // If we are on master scatter the information
633  if (self_base::getProcessUnitID() == root)
634  {
635  // Prepare the sending buffer
637 
638 
639  openfpm::vector<size_t> sz_byte;
640  sz_byte.resize(sz.size());
641 
642  size_t ptr = 0;
643 
644  for (size_t i = 0; i < sz.size() ; i++)
645  {
646  send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
647  sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
648  ptr += sz.get(i);
649  }
650 
651  self_base::tags[NBX_prc_scnt].clear();
652 
653  // receive information
654  base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
655 
656  // Send and recv multiple messages
657  self_base::sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
658 
659  // we generate the list of the properties to pack
660  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
661 
662  // operation object
664 
665  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
666  }
667  else
668  {
669  // The non-root receive
670  openfpm::vector<size_t> send_req;
671 
672  self_base::tags[NBX_prc_scnt].clear();
673 
674  // receive information
675  base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
676 
677  // Send and recv multiple messages
678  self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
679 
680  // we generate the list of the properties to pack
681  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
682 
683  // operation object
685 
686  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
687  }
688 
689  return true;
690  }
691 
699  {
700 
701  struct recv_buff_reorder
702  {
704  size_t proc;
705 
706  size_t tag;
707 
709  size_t pos;
710 
712  recv_buff_reorder()
713  :proc(0),tag(0),pos(0)
714  {};
715 
717  bool operator<(const recv_buff_reorder & rd) const
718  {
719  if (proc == rd.proc)
720  {return tag < rd.tag;}
721 
722  return (proc < rd.proc);
723  }
724  };
725 
727 
728  rcv.resize(self_base::recv_buf[NBX_prc_pcnt].size());
729 
730  for (size_t i = 0 ; i < rcv.size() ; i++)
731  {
732  rcv.get(i).proc = prc.get(i);
733  if (i < tags.size())
734  {rcv.get(i).tag = tags.get(i);}
735  else
736  {rcv.get(i).tag = (unsigned int)-1;}
737  rcv.get(i).pos = i;
738  }
739 
740  // we sort based on processor
741  rcv.sort();
742 
743  openfpm::vector_fr<BMemory<InternalMemory>> recv_ord;
744  recv_ord.resize(rcv.size());
745 
746  openfpm::vector<size_t> prc_ord;
747  prc_ord.resize(rcv.size());
748 
749  openfpm::vector<size_t> sz_recv_ord;
750  sz_recv_ord.resize(rcv.size());
751 
752  // Now we reorder rcv
753  for (size_t i = 0 ; i < rcv.size() ; i++)
754  {
755  recv_ord.get(i).swap(self_base::recv_buf[NBX_prc_pcnt].get(rcv.get(i).pos));
756  prc_ord.get(i) = rcv.get(i).proc;
757  sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
758  }
759 
760  // move rcv into recv
761  // Now we swap back to recv_buf in an ordered way
762  for (size_t i = 0 ; i < rcv.size() ; i++)
763  {
764  self_base::recv_buf[NBX_prc_pcnt].get(i).swap(recv_ord.get(i));
765  }
766 
767  prc.swap(prc_ord);
768  sz_recv.swap(sz_recv_ord);
769 
770  // reorder prc_recv and recv_sz
771  }
772 
800  template<typename T,
801  typename S,
802  template <typename> class layout_base = memory_traits_lin>
803  bool SSendRecv(
805  S & recv,
806  openfpm::vector<size_t> & prc_send,
807  openfpm::vector<size_t> & prc_recv,
808  openfpm::vector<size_t> & sz_recv,
809  size_t opt = NONE)
810  {
811  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
812 
814 
815  // Reorder the buffer
816  reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
817 
818  mem[NBX_prc_scnt]->decRef();
819  delete mem[NBX_prc_scnt];
820  delete pmem[NBX_prc_scnt];
821 
822  // we generate the list of the properties to pack
823  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
824 
826 
827  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
828 
829  return true;
830  }
831 
862  template<typename T,
863  typename S,
864  template <typename> class layout_base = memory_traits_lin>
867  S & recv,
868  openfpm::vector<size_t> & prc_send,
869  openfpm::vector<size_t> & prc_recv,
870  openfpm::vector<size_t> & sz_recv,
871  size_t opt = NONE
872  ) {
873  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
874 
875  NBX_prc_scnt++;
876 
877  return true;
878  }
879 
908  template<typename T, typename S, template <typename> class layout_base, int ... prp>
911  S & recv,
912  openfpm::vector<size_t> & prc_send,
913  openfpm::vector<size_t> & prc_recv,
914  openfpm::vector<size_t> & sz_recv,
915  openfpm::vector<size_t> & sz_recv_byte_out,
916  size_t opt = NONE
917  ) {
918  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
919 
921 
922  // Reorder the buffer
923  reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
924 
925  mem[NBX_prc_scnt]->decRef();
926  delete mem[NBX_prc_scnt];
927  delete pmem[NBX_prc_scnt];
928 
929  // operation object
931 
932  // process the received information
933  process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt);
934 
935  return true;
936  }
937 
969  template<typename T, typename S, template <typename> class layout_base, int ... prp>
972  S & recv,
973  openfpm::vector<size_t> & prc_send,
974  openfpm::vector<size_t> & prc_recv,
975  openfpm::vector<size_t> & sz_recv,
976  openfpm::vector<size_t> & sz_recv_byte_out,
977  size_t opt = NONE
978  ) {
979  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
980 
981  NBX_prc_scnt++;
982 
983  return true;
984  }
985 
1013  template<typename T, typename S, template <typename> class layout_base, int ... prp>
1016  S & recv,
1017  openfpm::vector<size_t> & prc_send,
1018  openfpm::vector<size_t> & prc_recv,
1019  openfpm::vector<size_t> & sz_recv,
1020  size_t opt = NONE
1021  ) {
1022  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
1023 
1025 
1026  // Reorder the buffer
1027  reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
1028 
1029  mem[NBX_prc_scnt]->decRef();
1030  delete mem[NBX_prc_scnt];
1031  delete pmem[NBX_prc_scnt];
1032 
1033  // operation object
1035 
1036  // process the received information
1037  process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
1038 
1039  return true;
1040  }
1041 
1072  template<typename T, typename S, template <typename> class layout_base, int ... prp>
1075  S & recv,
1076  openfpm::vector<size_t> & prc_send,
1077  openfpm::vector<size_t> & prc_recv,
1078  openfpm::vector<size_t> & sz_recv,
1079  size_t opt = NONE
1080  ) {
1081  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
1082 
1083  NBX_prc_scnt++;
1084 
1085  return true;
1086  }
1087 
1124  template<typename op,
1125  typename T,
1126  typename S,
1127  template <typename> class layout_base,
1128  int ... prp>
1131  S & recv,
1132  openfpm::vector<size_t> & prc_send,
1133  op & op_param,
1134  openfpm::vector<size_t> & prc_recv,
1135  openfpm::vector<size_t> & recv_sz,
1136  size_t opt = NONE
1137  ) {
1138  prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
1139 
1141 
1142  // Reorder the buffer
1143  reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
1144 
1145  mem[NBX_prc_scnt]->decRef();
1146  delete mem[NBX_prc_scnt];
1147  delete pmem[NBX_prc_scnt];
1148 
1149  // process the received information
1150  process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
1151 
1152  return true;
1153  }
1154 
1193  template<typename op,
1194  typename T,
1195  typename S,
1196  template <typename> class layout_base,
1197  int ... prp>
1200  S & recv,
1201  openfpm::vector<size_t> & prc_send,
1202  op & op_param,
1203  openfpm::vector<size_t> & prc_recv,
1204  openfpm::vector<size_t> & recv_sz,
1205  size_t opt = NONE
1206  ) {
1207  prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
1208 
1209  NBX_prc_scnt++;
1210 
1211  return true;
1212  }
1213 
1219  template<typename T,
1220  typename S,
1221  template <typename> class layout_base = memory_traits_lin>
1224  S & recv,
1225  openfpm::vector<size_t> & prc_send,
1226  openfpm::vector<size_t> & prc_recv,
1227  openfpm::vector<size_t> & sz_recv,
1228  size_t opt = NONE
1229  ) {
1231 
1232  // Reorder the buffer
1233  reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1234 
1235  mem[NBX_prc_pcnt]->decRef();
1236  delete mem[NBX_prc_pcnt];
1237  delete pmem[NBX_prc_pcnt];
1238 
1239  // we generate the list of the properties to pack
1240  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
1241 
1243 
1244  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
1245 
1246  NBX_prc_pcnt++;
1247  if (NBX_prc_scnt == NBX_prc_pcnt)
1248  {
1249  NBX_prc_scnt = 0;
1250  NBX_prc_pcnt = 0;
1251  }
1252 
1253  return true;
1254  }
1255 
1261  template<typename T, typename S, template <typename> class layout_base, int ... prp>
1264  S & recv,
1265  openfpm::vector<size_t> & prc_send,
1266  openfpm::vector<size_t> & prc_recv,
1267  openfpm::vector<size_t> & sz_recv,
1268  openfpm::vector<size_t> & sz_recv_byte_out,
1269  size_t opt = NONE
1270  ) {
1272 
1273  // Reorder the buffer
1274  reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1275 
1276  mem[NBX_prc_pcnt]->decRef();
1277  delete mem[NBX_prc_pcnt];
1278  delete pmem[NBX_prc_pcnt];
1279 
1280  // operation object
1282 
1283  // process the received information
1284  process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt);
1285 
1286  NBX_prc_pcnt++;
1287  if (NBX_prc_scnt == NBX_prc_pcnt)
1288  {
1289  NBX_prc_scnt = 0;
1290  NBX_prc_pcnt = 0;
1291  }
1292 
1293  return true;
1294  }
1295 
1301  template<typename T, typename S, template <typename> class layout_base, int ... prp>
1304  S & recv,
1305  openfpm::vector<size_t> & prc_send,
1306  openfpm::vector<size_t> & prc_recv,
1307  openfpm::vector<size_t> & sz_recv,
1308  size_t opt = NONE
1309  ) {
1311 
1312  // Reorder the buffer
1313  reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1314 
1315  mem[NBX_prc_pcnt]->decRef();
1316  delete mem[NBX_prc_pcnt];
1317  delete pmem[NBX_prc_pcnt];
1318 
1319  // operation object
1321 
1322  // process the received information
1323  process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
1324 
1325  NBX_prc_pcnt++;
1326  if (NBX_prc_scnt == NBX_prc_pcnt)
1327  {
1328  NBX_prc_scnt = 0;
1329  NBX_prc_pcnt = 0;
1330  }
1331 
1332  return true;
1333  }
1334 
1340  template<typename op,
1341  typename T,
1342  typename S,
1343  template <typename> class layout_base,
1344  int ... prp>
1347  S & recv,
1348  openfpm::vector<size_t> & prc_send,
1349  op & op_param,
1350  openfpm::vector<size_t> & prc_recv,
1351  openfpm::vector<size_t> & recv_sz,
1352  size_t opt = NONE
1353  ) {
1355 
1356  // Reorder the buffer
1357  reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1358 
1359  mem[NBX_prc_pcnt]->decRef();
1360  delete mem[NBX_prc_pcnt];
1361  delete pmem[NBX_prc_pcnt];
1362 
1363  // process the received information
1364  process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
1365 
1366  NBX_prc_pcnt++;
1367  if (NBX_prc_scnt == NBX_prc_pcnt)
1368  {
1369  NBX_prc_scnt = 0;
1370  NBX_prc_pcnt = 0;
1371  }
1372 
1373  return true;
1374  }
1375 
1376 };
1377 
1378 // Function to initialize the global VCluster //
1379 
1380 extern Vcluster<> * global_v_cluster_private_heap;
1381 extern Vcluster<CudaMemory> * global_v_cluster_private_cuda;
1382 
1389 static inline void init_global_v_cluster_private(int *argc, char ***argv, MPI_Comm ext_comm)
1390 {
1391  if (global_v_cluster_private_heap == NULL)
1392  {global_v_cluster_private_heap = new Vcluster<>(argc,argv,ext_comm);}
1393 
1394  if (global_v_cluster_private_cuda == NULL)
1395  {global_v_cluster_private_cuda = new Vcluster<CudaMemory>(argc,argv,ext_comm);}
1396 }
1397 
1398 static inline void delete_global_v_cluster_private()
1399 {
1400  delete global_v_cluster_private_heap;
1401  delete global_v_cluster_private_cuda;
1402 }
1403 
1404 template<typename Memory>
1405 struct get_vcl
1406 {
1407  static Vcluster<Memory> & get()
1408  {
1409  return *global_v_cluster_private_heap;
1410  }
1411 };
1412 
1413 template<>
1415 {
1416  static Vcluster<CudaMemory> & get()
1417  {
1418  return *global_v_cluster_private_cuda;
1419  }
1420 };
1421 
1422 template<typename Memory = HeapMemory>
1423 static inline Vcluster<Memory> & create_vcluster()
1424 {
1425  if (global_v_cluster_private_heap == NULL)
1426  {std::cerr << __FILE__ << ":" << __LINE__ << " Error you must call openfpm_init before using any distributed data structures";}
1427 
1428  return get_vcl<Memory>::get();
1429 }
1430 
1431 
1432 
1438 static inline bool is_openfpm_init()
1439 {
1440  return ofp_initialized;
1441 }
1442 
1443 
1449 void openfpm_init_vcl(int *argc, char ***argv, MPI_Comm ext_comm);
1450 
1451 size_t openfpm_vcluster_compilation_mask();
1452 
1458 void openfpm_finalize();
1459 
1465 static void openfpm_init(int *argc, char ***argv, MPI_Comm ext_comm=MPI_COMM_WORLD)
1466 {
1467  if (ofp_initialized)
1468  {
1469  return;
1470  }
1471  openfpm_init_vcl(argc,argv, ext_comm);
1472 
1473  size_t compiler_mask = CUDA_ON_BACKEND;
1474 
1475  init_wrappers();
1476 
1477  if (compiler_mask != openfpm_vcluster_compilation_mask() || compiler_mask != openfpm_ofpmmemory_compilation_mask())
1478  {
1479  std::cout << __FILE__ << ":" << __LINE__ << " Error: the program has been compiled with CUDA_ON_BACKEND: " << compiler_mask << " but libvcluster has been compiled with CUDA_ON_BACKEND: " <<
1480  openfpm_vcluster_compilation_mask() << ", and libofpmmemory has been compiled with CUDA_ON_BACKEND: " << openfpm_ofpmmemory_compilation_mask() <<
1481  " recompile the library with the right CUDA_ON_BACKEND" << std::endl;
1482  }
1483 }
1484 
1485 #endif
1486 
It override the behavior if size()
Definition: BHeapMemory.hpp:47
virtual void decRef()
Decrement the reference counter.
virtual void incRef()
Increment the reference counter.
This class allocate, and destroy CPU memory.
Definition: HeapMemory.hpp:40
Packing status object.
Definition: Pack_stat.hpp:61
This class virtualize the cluster of PC as a set of processes that communicate.
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
size_t size()
Get the total number of processors.
openfpm::vector_fr< BMemory< HeapMemory > > recv_buf[NQUEUE]
Receive buffers.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t getProcessUnitID()
Get the process unit id.
openfpm::vector< size_t > tags[NQUEUE]
tags receiving
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
openfpm::vector< MPI_Request > req
vector of MPI requests
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
MPI_Comm ext_comm
external communicator
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Implementation of VCluster class.
Definition: VCluster.hpp:59
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition: VCluster.hpp:1073
bool SGather(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Gather, gather the data from all processors into one node.
Definition: VCluster.hpp:500
Vcluster(int *argc, char ***argv, MPI_Comm ext_comm=MPI_COMM_WORLD)
Constructor.
Definition: VCluster.hpp:423
static void * msg_alloc(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
Definition: VCluster.hpp:322
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition: VCluster.hpp:1014
bool SSendRecvP_op(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
Definition: VCluster.hpp:1129
void barrier()
Just a call to mpi_barrier.
Definition: VCluster.hpp:595
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Synchronize with SSendRecvP.
Definition: VCluster.hpp:1262
void prepare_send_buffer(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt)
Prepare the send buffer and send the message to other processors.
Definition: VCluster.hpp:174
bool SSendRecvP_opAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
Definition: VCluster.hpp:1198
static void * msg_alloc_known(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
Definition: VCluster.hpp:370
bool SScatter(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Scatter, scatter the data from one processor to the other node.
Definition: VCluster.hpp:627
bool SSendRecvWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecv.
Definition: VCluster.hpp:1222
bool SGather(T &send, S &recv, size_t root)
Semantic Gather, gather the data from all processors into one node.
Definition: VCluster.hpp:455
bool SSendRecvP_opWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Synchronize with SSendRecvP_op.
Definition: VCluster.hpp:1345
void process_receive_buffer_with_prp(S &recv, openfpm::vector< size_t > *sz, openfpm::vector< size_t > *sz_byte, op &op_param, size_t opt)
Process the receive buffer.
Definition: VCluster.hpp:402
void reset_recv_buf()
Reset the receive buffer.
Definition: VCluster.hpp:301
void reorder_buffer(openfpm::vector< size_t > &prc, const openfpm::vector< size_t > &tags, openfpm::vector< size_t > &sz_recv)
reorder the receiving buffer
Definition: VCluster.hpp:698
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition: VCluster.hpp:909
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition: VCluster.hpp:970
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecvP.
Definition: VCluster.hpp:1302
bool SSendRecvAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
Definition: VCluster.hpp:865
bool SSendRecv(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
Definition: VCluster.hpp:803
size_t size()
Stub size.
Definition: map_vector.hpp:212
KeyT const ValueT ValueT OffsetIteratorT OffsetIteratorT int
[in] The number of segments that comprise the sorting data
size_t opt
options
Definition: VCluster.hpp:99
openfpm::vector< size_t > * tags
tags
Definition: VCluster.hpp:96
base_info(openfpm::vector_fr< BMemory< Memory >> *recv_buf, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, openfpm::vector< size_t > &tags, size_t opt)
constructor
Definition: VCluster.hpp:106
openfpm::vector< size_t > * sz
size of each message
Definition: VCluster.hpp:94
openfpm::vector< size_t > * prc
receiving processor list
Definition: VCluster.hpp:92
openfpm::vector_fr< BMemory< Memory > > * recv_buf
Receive buffer.
Definition: VCluster.hpp:90
base_info()
default constructor
Definition: VCluster.hpp:102
static void process_recv(Vcluster &vcl, S &recv, openfpm::vector< size_t > *sz_recv, openfpm::vector< size_t > *sz_recv_byte, op &op_param, size_t opt)
Process the receive buffer.
Definition: VCluster.hpp:137
It return true if the object T require complex serialization.
These set of classes generate an array definition at compile-time.
Definition: ct_array.hpp:26
Transform the boost::fusion::vector into memory specification (memory_traits)
Helper class to add data.
There is max_prop inside.