OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
VCluster.hpp
1 /*
2  * Vcluster.hpp
3  *
4  * Created on: Feb 8, 2016
5  * Author: Pietro Incardona
6  */
7 
8 #ifndef VCLUSTER_HPP
9 #define VCLUSTER_HPP
10 
11 #include <signal.h>
12 
13 #include "VCluster_base.hpp"
14 #include "VCluster_meta_function.hpp"
15 #include "util/math_util_complex.hpp"
16 #include "memory/mem_conf.hpp"
17 #include "util/cudify/cudify.hpp"
18 
19 #ifdef CUDA_GPU
20 extern CudaMemory mem_tmp;
21 
22 #ifndef MAX_NUMER_OF_PROPERTIES
23 #define MAX_NUMER_OF_PROPERTIES 20
24 #endif
25 
26 extern CudaMemory exp_tmp;
27 extern CudaMemory exp_tmp2[MAX_NUMER_OF_PROPERTIES];
28 
29 extern CudaMemory rem_tmp;
30 extern CudaMemory rem_tmp2[MAX_NUMER_OF_PROPERTIES];
31 
32 #endif
33 
34 extern size_t NBX_cnt;
35 
36 void bt_sighandler(int sig, siginfo_t * info, void * ctx);
37 
57 template<typename InternalMemory = HeapMemory>
58 class Vcluster: public Vcluster_base<InternalMemory>
59 {
60  // Internal memory
61  ExtPreAlloc<HeapMemory> * mem[NQUEUE];
62 
63  // Buffer that store the received bytes
64  openfpm::vector<size_t> sz_recv_byte[NQUEUE];
65 
66  // The sending buffer used by semantic calls
68  openfpm::vector<size_t> send_sz_byte;
69  openfpm::vector<size_t> prc_send_;
70 
71  unsigned int NBX_prc_scnt = 0;
72  unsigned int NBX_prc_pcnt = 0;
73 
75 
76  // Internal Heap memory
77  HeapMemory * pmem[NQUEUE];
78 
86  template<typename Memory>
87  struct base_info
88  {
97 
99  size_t opt;
100 
103  {}
104 
108  {}
109 
111  {
112  this->recv_buf = recv_buf;
113  this->prc = &prc;
114  this->sz = &sz;
115  this->tags = &tags;
116  this->opt = opt;
117  }
118  };
119 
120  // Internal temporaty buffer
121  base_info<InternalMemory> NBX_prc_bi[NQUEUE];
122 
123  typedef Vcluster_base<InternalMemory> self_base;
124 
125  template<typename T>
126  struct index_gen {};
127 
129  template<int ... prp>
130  struct index_gen<index_tuple<prp...>>
131  {
133  template<typename op,
134  typename T,
135  typename S,
136  template <typename> class layout_base = memory_traits_lin>
137  inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv,
138  openfpm::vector<size_t> * sz_recv_byte, op & op_param,size_t opt)
139  {
140  if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value)
141  {
142  // In order to have this option activated InternalMemory must be CudaMemory
143 
144  std::cout << __FILE__ << ":" << __LINE__ << " error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" <<
145  " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl;
146  }
147 
148  vcl.process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,sz_recv,sz_recv_byte,op_param,opt);
149  }
150  };
151 
170  template<typename op, typename T, typename S, template <typename> class layout_base>
172  S & recv,
173  openfpm::vector<size_t> & prc_send,
174  openfpm::vector<size_t> & prc_recv,
175  openfpm::vector<size_t> & sz_recv,
176  size_t opt)
177  {
178  sz_recv_byte[NBX_prc_scnt].resize(sz_recv.size());
179 
180  // Reset the receive buffer
181  reset_recv_buf();
182 
183  #ifdef SE_CLASS1
184 
185  if (send.size() != prc_send.size())
186  std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
187 
188  #endif
189 
190  // Prepare the sending buffer
191  send_buf.resize(0);
192  send_sz_byte.resize(0);
193  prc_send_.resize(0);
194 
195  size_t tot_size = 0;
196 
197  for (size_t i = 0; i < send.size() ; i++)
198  {
199  size_t req = 0;
200 
201  //Pack requesting
202  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base>::packingRequest(send.get(i), req, send_sz_byte);
203  tot_size += req;
204  }
205 
207 
237 
238  pmem[NBX_prc_scnt] = new HeapMemory;
239 
240  mem[NBX_prc_scnt] = new ExtPreAlloc<HeapMemory>(tot_size,*pmem[NBX_prc_scnt]);
241  mem[NBX_prc_scnt]->incRef();
242 
243  for (size_t i = 0; i < send.size() ; i++)
244  {
245  //Packing
246 
247  Pack_stat sts;
248 
249  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value, op, T, S, layout_base>::packing(*mem[NBX_prc_scnt], send.get(i), sts, send_buf,opt);
250  }
251 
252  // receive information
253  NBX_prc_bi[NBX_prc_scnt].set(&this->recv_buf[NBX_prc_scnt],prc_recv,sz_recv_byte[NBX_prc_scnt],this->tags[NBX_prc_scnt],opt);
254 
255  // Send and recv multiple messages
256  if (opt & RECEIVE_KNOWN)
257  {
258  // We we are passing the number of element but not the byte, calculate the byte
259  if (opt & KNOWN_ELEMENT_OR_BYTE)
260  {
261  // We know the number of element convert to byte (ONLY if it is possible)
263  {
264  for (size_t i = 0 ; i < sz_recv.size() ; i++)
265  {sz_recv_byte[NBX_prc_scnt].get(i) = sz_recv.get(i) * sizeof(typename T::value_type);}
266  }
267  else
268  {
269 #ifndef DISABLE_ALL_RTTI
270  std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;
271 #endif
272  }
273 
274  self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
275  prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte[NBX_prc_scnt].getPointer(),msg_alloc_known,(void *)&NBX_prc_bi);
276  }
277  else
278  {
279  self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
280  prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&NBX_prc_bi);
281  sz_recv_byte[NBX_prc_scnt] = self_base::sz_recv_tmp;
282  }
283  }
284  else
285  {
286  self_base::tags[NBX_prc_scnt].clear();
287  prc_recv.clear();
288  self_base::sendrecvMultipleMessagesNBXAsync(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&NBX_prc_bi[NBX_prc_scnt]);
289  }
290  }
291 
292 
298  {
299  for (size_t i = 0 ; i < self_base::recv_buf[NBX_prc_scnt].size() ; i++)
300  {self_base::recv_buf[NBX_prc_scnt].get(i).resize(0);}
301 
302  self_base::recv_buf[NBX_prc_scnt].resize(0);
303  }
304 
318  static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
319  {
320  base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr;
321 
322  if (rinfo.recv_buf == NULL)
323  {
324  std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
325  return NULL;
326  }
327 
328  rinfo.recv_buf->resize(ri+1);
329 
330  rinfo.recv_buf->get(ri).resize(msg_i);
331 
332  // Receive info
333  rinfo.prc->add(i);
334  rinfo.sz->add(msg_i);
335  rinfo.tags->add(tag);
336 
337  // return the pointer
338 
339  // If we have GPU direct activated use directly the cuda buffer
340  if (rinfo.opt & MPI_GPU_DIRECT)
341  {
342 #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
343  return rinfo.recv_buf->last().getDevicePointer();
344 #else
345  return rinfo.recv_buf->last().getPointer();
346 #endif
347  }
348 
349  return rinfo.recv_buf->last().getPointer();
350  }
351 
352 
366  static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
367  {
368  base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr;
369 
370  if (rinfo.recv_buf == NULL)
371  {
372  std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
373  return NULL;
374  }
375 
376  rinfo.recv_buf->resize(ri+1);
377 
378  rinfo.recv_buf->get(ri).resize(msg_i);
379 
380  // return the pointer
381  return rinfo.recv_buf->last().getPointer();
382  }
383 
397  template<typename op, typename T, typename S, template <typename> class layout_base ,unsigned int ... prp >
400  openfpm::vector<size_t> * sz_byte,
401  op & op_param,
402  size_t opt)
403  {
404  if (sz != NULL)
405  {sz->resize(self_base::recv_buf[NBX_prc_pcnt].size());}
406 
407  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv, self_base::recv_buf[NBX_prc_pcnt], sz, sz_byte, op_param,opt);
408  }
409 
410  public:
411 
418  Vcluster(int *argc, char ***argv)
419  :Vcluster_base<InternalMemory>(argc,argv)
420  {
421  }
422 
450  template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SGather(T & send, S & recv,size_t root)
451  {
454 
455  return SGather<T,S,layout_base>(send,recv,prc,sz,root);
456  }
457 
459  template<size_t index, size_t N> struct MetaFuncOrd {
460  enum { value = index };
461  };
462 
492  template<typename T,
493  typename S,
494  template <typename> class layout_base = memory_traits_lin>
495  bool SGather(T & send,
496  S & recv,
499  size_t root)
500  {
501 #ifdef SE_CLASS1
502  if (&send == (T *)&recv)
503  {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " using SGather in general the sending object and the receiving object must be different" << std::endl;}
504 #endif
505 
506  // Reset the receive buffer
507  reset_recv_buf();
508 
509  // If we are on master collect the information
510  if (self_base::getProcessUnitID() == root)
511  {
512  // send buffer (master does not send anything) so send req and send_buf
513  // remain buffer with size 0
514  openfpm::vector<size_t> send_req;
515 
516  self_base::tags[NBX_prc_scnt].clear();
517 
518  // receive information
519  base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
520 
521  // Send and recv multiple messages
522  self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
523 
524  // we generate the list of the properties to unpack
525  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
526 
527  // operation object
529 
530  // Reorder the buffer
531  reorder_buffer(prc,self_base::tags[NBX_prc_scnt],sz);
532 
533  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz,NULL,opa,0);
534 
535  recv.add(send);
536  prc.add(root);
537  sz.add(send.size());
538  }
539  else
540  {
541  // send buffer (master does not send anything) so send req and send_buf
542  // remain buffer with size 0
543  openfpm::vector<size_t> send_prc;
544  openfpm::vector<size_t> send_prc_;
545  send_prc.add(root);
546 
548 
550 
551  //Pack requesting
552 
553  size_t tot_size = 0;
554 
555  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packingRequest(send, tot_size, sz);
556 
557  HeapMemory pmem;
558 
559  ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
560  mem.incRef();
561 
562  //Packing
563 
564  Pack_stat sts;
565 
566  pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf);
567 
569 
570  self_base::tags[NBX_prc_scnt].clear();
571 
572  // receive information
573  base_info<InternalMemory> bi(NULL,prc,sz,self_base::tags[NBX_prc_scnt],0);
574 
575  // Send and recv multiple messages
576  self_base::sendrecvMultipleMessagesNBX(send_prc_.size(),(size_t *)sz.getPointer(),(size_t *)send_prc_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);
577 
578  mem.decRef();
579  delete &mem;
580  }
581 
582  return true;
583  }
584 
589  void barrier()
590  {
591  MPI_Barrier(MPI_COMM_WORLD);
592  }
593 
620  template<typename T, typename S, template <typename> class layout_base=memory_traits_lin>
621  bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
622  {
623  // Reset the receive buffer
624  reset_recv_buf();
625 
626  // If we are on master scatter the information
627  if (self_base::getProcessUnitID() == root)
628  {
629  // Prepare the sending buffer
631 
632 
633  openfpm::vector<size_t> sz_byte;
634  sz_byte.resize(sz.size());
635 
636  size_t ptr = 0;
637 
638  for (size_t i = 0; i < sz.size() ; i++)
639  {
640  send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
641  sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
642  ptr += sz.get(i);
643  }
644 
645  self_base::tags[NBX_prc_scnt].clear();
646 
647  // receive information
648  base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
649 
650  // Send and recv multiple messages
651  self_base::sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
652 
653  // we generate the list of the properties to pack
654  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
655 
656  // operation object
658 
659  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
660  }
661  else
662  {
663  // The non-root receive
664  openfpm::vector<size_t> send_req;
665 
666  self_base::tags[NBX_prc_scnt].clear();
667 
668  // receive information
669  base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
670 
671  // Send and recv multiple messages
672  self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
673 
674  // we generate the list of the properties to pack
675  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
676 
677  // operation object
679 
680  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
681  }
682 
683  return true;
684  }
685 
693  {
694 
695  struct recv_buff_reorder
696  {
698  size_t proc;
699 
700  size_t tag;
701 
703  size_t pos;
704 
706  recv_buff_reorder()
707  :proc(0),tag(0),pos(0)
708  {};
709 
711  bool operator<(const recv_buff_reorder & rd) const
712  {
713  if (proc == rd.proc)
714  {return tag < rd.tag;}
715 
716  return (proc < rd.proc);
717  }
718  };
719 
721 
722  rcv.resize(self_base::recv_buf[NBX_prc_pcnt].size());
723 
724  for (size_t i = 0 ; i < rcv.size() ; i++)
725  {
726  rcv.get(i).proc = prc.get(i);
727  if (i < tags.size())
728  {rcv.get(i).tag = tags.get(i);}
729  else
730  {rcv.get(i).tag = (unsigned int)-1;}
731  rcv.get(i).pos = i;
732  }
733 
734  // we sort based on processor
735  rcv.sort();
736 
738  recv_ord.resize(rcv.size());
739 
740  openfpm::vector<size_t> prc_ord;
741  prc_ord.resize(rcv.size());
742 
743  openfpm::vector<size_t> sz_recv_ord;
744  sz_recv_ord.resize(rcv.size());
745 
746  // Now we reorder rcv
747  for (size_t i = 0 ; i < rcv.size() ; i++)
748  {
749  recv_ord.get(i).swap(self_base::recv_buf[NBX_prc_pcnt].get(rcv.get(i).pos));
750  prc_ord.get(i) = rcv.get(i).proc;
751  sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
752  }
753 
754  // move rcv into recv
755  // Now we swap back to recv_buf in an ordered way
756  for (size_t i = 0 ; i < rcv.size() ; i++)
757  {
758  self_base::recv_buf[NBX_prc_pcnt].get(i).swap(recv_ord.get(i));
759  }
760 
761  prc.swap(prc_ord);
762  sz_recv.swap(sz_recv_ord);
763 
764  // reorder prc_recv and recv_sz
765  }
766 
794  template<typename T,
795  typename S,
796  template <typename> class layout_base = memory_traits_lin>
798  S & recv,
799  openfpm::vector<size_t> & prc_send,
800  openfpm::vector<size_t> & prc_recv,
801  openfpm::vector<size_t> & sz_recv,
802  size_t opt = NONE)
803  {
804  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
805 
807 
808  // Reorder the buffer
809  reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
810 
811  mem[NBX_prc_scnt]->decRef();
812  delete mem[NBX_prc_scnt];
813  delete pmem[NBX_prc_scnt];
814 
815  // we generate the list of the properties to pack
816  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
817 
819 
820  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
821 
822  return true;
823  }
824 
855  template<typename T,
856  typename S,
857  template <typename> class layout_base = memory_traits_lin>
859  S & recv,
860  openfpm::vector<size_t> & prc_send,
861  openfpm::vector<size_t> & prc_recv,
862  openfpm::vector<size_t> & sz_recv,
863  size_t opt = NONE)
864  {
865  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
866 
867  NBX_prc_scnt++;
868 
869  return true;
870  }
871 
900  template<typename T, typename S, template <typename> class layout_base, int ... prp>
902  S & recv,
903  openfpm::vector<size_t> & prc_send,
904  openfpm::vector<size_t> & prc_recv,
905  openfpm::vector<size_t> & sz_recv,
906  openfpm::vector<size_t> & sz_recv_byte_out,
907  size_t opt = NONE)
908  {
909  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
910 
912 
913  // Reorder the buffer
914  reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
915 
916  mem[NBX_prc_scnt]->decRef();
917  delete mem[NBX_prc_scnt];
918  delete pmem[NBX_prc_scnt];
919 
920  // operation object
922 
923  // process the received information
924  process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt);
925 
926  return true;
927  }
928 
960  template<typename T, typename S, template <typename> class layout_base, int ... prp>
962  S & recv,
963  openfpm::vector<size_t> & prc_send,
964  openfpm::vector<size_t> & prc_recv,
965  openfpm::vector<size_t> & sz_recv,
966  openfpm::vector<size_t> & sz_recv_byte_out,
967  size_t opt = NONE)
968  {
969  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
970 
971  NBX_prc_scnt++;
972 
973  return true;
974  }
975 
1003  template<typename T, typename S, template <typename> class layout_base, int ... prp>
1005  S & recv,
1006  openfpm::vector<size_t> & prc_send,
1007  openfpm::vector<size_t> & prc_recv,
1008  openfpm::vector<size_t> & sz_recv,
1009  size_t opt = NONE)
1010  {
1011  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
1012 
1014 
1015  // Reorder the buffer
1016  reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
1017 
1018  mem[NBX_prc_scnt]->decRef();
1019  delete mem[NBX_prc_scnt];
1020  delete pmem[NBX_prc_scnt];
1021 
1022  // operation object
1024 
1025  // process the received information
1026  process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
1027 
1028  return true;
1029  }
1030 
1061  template<typename T, typename S, template <typename> class layout_base, int ... prp>
1063  S & recv,
1064  openfpm::vector<size_t> & prc_send,
1065  openfpm::vector<size_t> & prc_recv,
1066  openfpm::vector<size_t> & sz_recv,
1067  size_t opt = NONE)
1068  {
1069  prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
1070 
1071  NBX_prc_scnt++;
1072 
1073  return true;
1074  }
1075 
1112  template<typename op,
1113  typename T,
1114  typename S,
1115  template <typename> class layout_base,
1116  int ... prp>
1118  S & recv,
1119  openfpm::vector<size_t> & prc_send,
1120  op & op_param,
1121  openfpm::vector<size_t> & prc_recv,
1122  openfpm::vector<size_t> & recv_sz,
1123  size_t opt = NONE)
1124  {
1125  prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
1126 
1128 
1129  // Reorder the buffer
1130  reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
1131 
1132  mem[NBX_prc_scnt]->decRef();
1133  delete mem[NBX_prc_scnt];
1134  delete pmem[NBX_prc_scnt];
1135 
1136  // process the received information
1137  process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
1138 
1139  return true;
1140  }
1141 
1180  template<typename op,
1181  typename T,
1182  typename S,
1183  template <typename> class layout_base,
1184  int ... prp>
1186  S & recv,
1187  openfpm::vector<size_t> & prc_send,
1188  op & op_param,
1189  openfpm::vector<size_t> & prc_recv,
1190  openfpm::vector<size_t> & recv_sz,
1191  size_t opt = NONE)
1192  {
1193  prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
1194 
1195  NBX_prc_scnt++;
1196 
1197  return true;
1198  }
1199 
1205  template<typename T,
1206  typename S,
1207  template <typename> class layout_base = memory_traits_lin>
1209  S & recv,
1210  openfpm::vector<size_t> & prc_send,
1211  openfpm::vector<size_t> & prc_recv,
1212  openfpm::vector<size_t> & sz_recv,
1213  size_t opt = NONE)
1214  {
1216 
1217  // Reorder the buffer
1218  reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1219 
1220  mem[NBX_prc_pcnt]->decRef();
1221  delete mem[NBX_prc_pcnt];
1222  delete pmem[NBX_prc_pcnt];
1223 
1224  // we generate the list of the properties to pack
1225  typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
1226 
1228 
1229  index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
1230 
1231  NBX_prc_pcnt++;
1232  if (NBX_prc_scnt == NBX_prc_pcnt)
1233  {
1234  NBX_prc_scnt = 0;
1235  NBX_prc_pcnt = 0;
1236  }
1237 
1238  return true;
1239  }
1240 
1246  template<typename T, typename S, template <typename> class layout_base, int ... prp>
1248  S & recv,
1249  openfpm::vector<size_t> & prc_send,
1250  openfpm::vector<size_t> & prc_recv,
1251  openfpm::vector<size_t> & sz_recv,
1252  openfpm::vector<size_t> & sz_recv_byte_out,
1253  size_t opt = NONE)
1254  {
1256 
1257  // Reorder the buffer
1258  reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1259 
1260  mem[NBX_prc_pcnt]->decRef();
1261  delete mem[NBX_prc_pcnt];
1262  delete pmem[NBX_prc_pcnt];
1263 
1264  // operation object
1266 
1267  // process the received information
1268  process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt);
1269 
1270  NBX_prc_pcnt++;
1271  if (NBX_prc_scnt == NBX_prc_pcnt)
1272  {
1273  NBX_prc_scnt = 0;
1274  NBX_prc_pcnt = 0;
1275  }
1276 
1277  return true;
1278  }
1279 
1285  template<typename T, typename S, template <typename> class layout_base, int ... prp>
1287  S & recv,
1288  openfpm::vector<size_t> & prc_send,
1289  openfpm::vector<size_t> & prc_recv,
1290  openfpm::vector<size_t> & sz_recv,
1291  size_t opt = NONE)
1292  {
1294 
1295  // Reorder the buffer
1296  reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1297 
1298  mem[NBX_prc_pcnt]->decRef();
1299  delete mem[NBX_prc_pcnt];
1300  delete pmem[NBX_prc_pcnt];
1301 
1302  // operation object
1304 
1305  // process the received information
1306  process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
1307 
1308  NBX_prc_pcnt++;
1309  if (NBX_prc_scnt == NBX_prc_pcnt)
1310  {
1311  NBX_prc_scnt = 0;
1312  NBX_prc_pcnt = 0;
1313  }
1314 
1315  return true;
1316  }
1317 
1323  template<typename op,
1324  typename T,
1325  typename S,
1326  template <typename> class layout_base,
1327  int ... prp>
1329  S & recv,
1330  openfpm::vector<size_t> & prc_send,
1331  op & op_param,
1332  openfpm::vector<size_t> & prc_recv,
1333  openfpm::vector<size_t> & recv_sz,
1334  size_t opt = NONE)
1335  {
1337 
1338  // Reorder the buffer
1339  reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1340 
1341  mem[NBX_prc_pcnt]->decRef();
1342  delete mem[NBX_prc_pcnt];
1343  delete pmem[NBX_prc_pcnt];
1344 
1345  // process the received information
1346  process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
1347 
1348  NBX_prc_pcnt++;
1349  if (NBX_prc_scnt == NBX_prc_pcnt)
1350  {
1351  NBX_prc_scnt = 0;
1352  NBX_prc_pcnt = 0;
1353  }
1354 
1355  return true;
1356  }
1357 
1358 };
1359 
1360 
1361 
1362 // Function to initialize the global VCluster //
1363 
1364 extern Vcluster<> * global_v_cluster_private_heap;
1365 extern Vcluster<CudaMemory> * global_v_cluster_private_cuda;
1366 
1373 static inline void init_global_v_cluster_private(int *argc, char ***argv)
1374 {
1375  if (global_v_cluster_private_heap == NULL)
1376  {global_v_cluster_private_heap = new Vcluster<>(argc,argv);}
1377 
1378  if (global_v_cluster_private_cuda == NULL)
1379  {global_v_cluster_private_cuda = new Vcluster<CudaMemory>(argc,argv);}
1380 }
1381 
1382 static inline void delete_global_v_cluster_private()
1383 {
1384  delete global_v_cluster_private_heap;
1385  delete global_v_cluster_private_cuda;
1386 }
1387 
1388 template<typename Memory>
1389 struct get_vcl
1390 {
1391  static Vcluster<Memory> & get()
1392  {
1393  return *global_v_cluster_private_heap;
1394  }
1395 };
1396 
1397 template<>
1399 {
1400  static Vcluster<CudaMemory> & get()
1401  {
1402  return *global_v_cluster_private_cuda;
1403  }
1404 };
1405 
1406 template<typename Memory = HeapMemory>
1407 static inline Vcluster<Memory> & create_vcluster()
1408 {
1409  if (global_v_cluster_private_heap == NULL)
1410  {std::cerr << __FILE__ << ":" << __LINE__ << " Error you must call openfpm_init before using any distributed data structures";}
1411 
1412  return get_vcl<Memory>::get();
1413 }
1414 
1415 
1416 
1422 static inline bool is_openfpm_init()
1423 {
1424  return ofp_initialized;
1425 }
1426 
1427 
1433 void openfpm_init_vcl(int *argc, char ***argv);
1434 
1435 size_t openfpm_vcluster_compilation_mask();
1436 
1442 void openfpm_finalize();
1443 
1449 static void openfpm_init(int *argc, char ***argv)
1450 {
1451  openfpm_init_vcl(argc,argv);
1452 
1453  size_t compiler_mask = CUDA_ON_BACKEND;
1454 
1455  init_wrappers();
1456 
1457  if (compiler_mask != openfpm_vcluster_compilation_mask() || compiler_mask != openfpm_ofpmmemory_compilation_mask())
1458  {
1459  std::cout << __FILE__ << ":" << __LINE__ << " Error: the program has been compiled with CUDA_ON_BACKEND: " << compiler_mask << " but libvcluster has been compiled with CUDA_ON_BACKEND: " <<
1460  openfpm_vcluster_compilation_mask() << ", and libofpmmemory has been compiled with CUDA_ON_BACKEND: " << openfpm_ofpmmemory_compilation_mask() <<
1461  " recompile the library with the right CUDA_ON_BACKEND" << std::endl;
1462  }
1463 }
1464 
1465 #endif
1466 
It override the behavior if size()
Definition: BHeapMemory.hpp:46
void reset_recv_buf()
Reset the receive buffer.
Definition: VCluster.hpp:297
openfpm::vector< size_t > * prc
receiving processor list
Definition: VCluster.hpp:92
Transform the boost::fusion::vector into memory specification (memory_traits)
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition: VCluster.hpp:901
size_t getProcessUnitID()
Get the process unit id.
base_info(openfpm::vector_fr< BMemory< Memory >> *recv_buf, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, openfpm::vector< size_t > &tags, size_t opt)
constructor
Definition: VCluster.hpp:106
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t size()
return the size of the vector
Definition: map_vector.hpp:935
openfpm::vector_fr< BMemory< InternalMemory > > recv_buf[NQUEUE]
Receive buffers.
void process_receive_buffer_with_prp(S &recv, openfpm::vector< size_t > *sz, openfpm::vector< size_t > *sz_byte, op &op_param, size_t opt)
Process the receive buffer.
Definition: VCluster.hpp:398
There is max_prop inside.
bool SSendRecvWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecv.
Definition: VCluster.hpp:1208
Vcluster(int *argc, char ***argv)
Constructor.
Definition: VCluster.hpp:418
size_t size()
Stub size.
Definition: map_vector.hpp:211
bool SScatter(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Scatter, scatter the data from one processor to the other node.
Definition: VCluster.hpp:621
static void * msg_alloc(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
Definition: VCluster.hpp:318
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
openfpm::vector< size_t > tags[NQUEUE]
tags receiving
This class allocate, and destroy CPU memory.
Definition: HeapMemory.hpp:39
void reorder_buffer(openfpm::vector< size_t > &prc, const openfpm::vector< size_t > &tags, openfpm::vector< size_t > &sz_recv)
reorder the receiving buffer
Definition: VCluster.hpp:692
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Synchronize with SSendRecvP.
Definition: VCluster.hpp:1247
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecvP.
Definition: VCluster.hpp:1286
Implementation of VCluster class.
Definition: VCluster.hpp:58
void resize(size_t sz)
resize the vector retaining the objects
Definition: map_vector.hpp:955
bool SSendRecvP_opAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
Definition: VCluster.hpp:1185
Implementation of 1-D std::vector like structure.
Definition: map_vector.hpp:920
bool SSendRecvP_op(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
Definition: VCluster.hpp:1117
This class virtualize the cluster of PC as a set of processes that communicate.
openfpm::vector< size_t > * sz
size of each message
Definition: VCluster.hpp:94
bool SGather(T &send, S &recv, size_t root)
Semantic Gather, gather the data from all processors into one node.
Definition: VCluster.hpp:450
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition: VCluster.hpp:1004
base_info()
default constructor
Definition: VCluster.hpp:102
openfpm::vector< size_t > * tags
tags
Definition: VCluster.hpp:96
bool SSendRecvP_opWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Synchronize with SSendRecvP_op.
Definition: VCluster.hpp:1328
void barrier()
Just a call to mpi_barrier.
Definition: VCluster.hpp:589
KeyT const ValueT ValueT OffsetIteratorT OffsetIteratorT int
[in] The number of segments that comprise the sorting data
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
virtual void incRef()
Increment the reference counter.
Definition: ExtPreAlloc.hpp:98
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition: VCluster.hpp:1062
These set of classes generate an array definition at compile-time.
Definition: ct_array.hpp:26
static void * msg_alloc_known(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
Definition: VCluster.hpp:366
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
It return true if the object T require complex serialization.
static void process_recv(Vcluster &vcl, S &recv, openfpm::vector< size_t > *sz_recv, openfpm::vector< size_t > *sz_recv_byte, op &op_param, size_t opt)
Process the receive buffer.
Definition: VCluster.hpp:137
void prepare_send_buffer(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt)
Prepare the send buffer and send the message to other processors.
Definition: VCluster.hpp:171
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
virtual void decRef()
Decrement the reference counter.
Helper class to add data.
size_t size()
Get the total number of processors.
bool SGather(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Gather, gather the data from all processors into one node.
Definition: VCluster.hpp:495
openfpm::vector< MPI_Request > req
vector of MPI requests
openfpm::vector_fr< BMemory< Memory > > * recv_buf
Receive buffer.
Definition: VCluster.hpp:90
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition: VCluster.hpp:961
Packing status object.
Definition: Pack_stat.hpp:60
bool SSendRecvAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
Definition: VCluster.hpp:858
bool SSendRecv(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
Definition: VCluster.hpp:797
T & get(size_t id)
Get an element of the vector.
size_t opt
options
Definition: VCluster.hpp:99