OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
 
Loading...
Searching...
No Matches
VCluster.hpp
1/*
2 * Vcluster.hpp
3 *
4 * Created on: Feb 8, 2016
5 * Author: Pietro Incardona
6 */
7
8#ifndef VCLUSTER_HPP
9#define VCLUSTER_HPP
10
11#include <signal.h>
12
13#include "VCluster_base.hpp"
14#include "VCluster_meta_function.hpp"
15#include "util/math_util_complex.hpp"
16#include "memory/mem_conf.hpp"
17#include "util/cudify/cudify.hpp"
18
19#ifdef CUDA_GPU
20extern CudaMemory mem_tmp;
21
22#ifndef MAX_NUMER_OF_PROPERTIES
23#define MAX_NUMER_OF_PROPERTIES 20
24#endif
25
26extern CudaMemory exp_tmp;
27extern CudaMemory exp_tmp2[MAX_NUMER_OF_PROPERTIES];
28
29extern CudaMemory rem_tmp;
30extern CudaMemory rem_tmp2[MAX_NUMER_OF_PROPERTIES];
31
32#endif
33
34extern size_t NBX_cnt;
35
36void bt_sighandler(int sig, siginfo_t * info, void * ctx);
37
57template<typename InternalMemory = HeapMemory>
58class Vcluster: public Vcluster_base<InternalMemory>
59{
60 // Internal memory
61 ExtPreAlloc<HeapMemory> * mem[NQUEUE];
62
63 // Buffer that store the received bytes
64 openfpm::vector<size_t> sz_recv_byte[NQUEUE];
65
66 // The sending buffer used by semantic calls
68 openfpm::vector<size_t> send_sz_byte;
70
71 unsigned int NBX_prc_scnt = 0;
72 unsigned int NBX_prc_pcnt = 0;
73
75
76 // Internal Heap memory
77 HeapMemory * pmem[NQUEUE];
78
86 template<typename Memory>
87 struct base_info
88 {
90 openfpm::vector_fr<BMemory<Memory>> * recv_buf;
97
99 size_t opt;
100
103 {}
104
108 {}
109
111 {
112 this->recv_buf = recv_buf;
113 this->prc = &prc;
114 this->sz = &sz;
115 this->tags = &tags;
116 this->opt = opt;
117 }
118 };
119
120 // Internal temporaty buffer
121 base_info<InternalMemory> NBX_prc_bi[NQUEUE];
122
123 typedef Vcluster_base<InternalMemory> self_base;
124
125 template<typename T>
126 struct index_gen {};
127
129 template<int ... prp>
130 struct index_gen<index_tuple<prp...>>
131 {
133 template<typename op,
134 typename T,
135 typename S,
136 template <typename> class layout_base = memory_traits_lin>
137 inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv,
138 openfpm::vector<size_t> * sz_recv_byte, op & op_param,size_t opt)
139 {
140 if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value)
141 {
142 // In order to have this option activated InternalMemory must be CudaMemory
143
144 std::cout << __FILE__ << ":" << __LINE__ << " error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" <<
145 " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl;
146 }
147
148 vcl.process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,sz_recv,sz_recv_byte,op_param,opt);
149 }
150 };
151
170 template<typename op, typename T, typename S, template <typename> class layout_base>
172 S & recv,
173 openfpm::vector<size_t> & prc_send,
174 openfpm::vector<size_t> & prc_recv,
175 openfpm::vector<size_t> & sz_recv,
176 size_t opt)
177 {
178 sz_recv_byte[NBX_prc_scnt].resize(sz_recv.size());
179
180 // Reset the receive buffer
182
183 #ifdef SE_CLASS1
184
185 if (send.size() != prc_send.size())
186 std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
187
188 #endif
189
190 // Prepare the sending buffer
191 send_buf.resize(0);
192 send_sz_byte.resize(0);
193 prc_send_.resize(0);
194
195 size_t tot_size = 0;
196
197 for (size_t i = 0; i < send.size() ; i++)
198 {
199 size_t req = 0;
200
201 //Pack requesting
202 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base>::packingRequest(send.get(i), req, send_sz_byte);
203 tot_size += req;
204 }
205
207
237
238 pmem[NBX_prc_scnt] = new HeapMemory;
239
240 mem[NBX_prc_scnt] = new ExtPreAlloc<HeapMemory>(tot_size,*pmem[NBX_prc_scnt]);
241 mem[NBX_prc_scnt]->incRef();
242
243 for (size_t i = 0; i < send.size() ; i++)
244 {
245 //Packing
246
247 Pack_stat sts;
248
249 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value, op, T, S, layout_base>::packing(*mem[NBX_prc_scnt], send.get(i), sts, send_buf,opt);
250 }
251
252 // receive information
253 NBX_prc_bi[NBX_prc_scnt].set(&this->recv_buf[NBX_prc_scnt],prc_recv,sz_recv_byte[NBX_prc_scnt],this->tags[NBX_prc_scnt],opt);
254
255 // Send and recv multiple messages
256 if (opt & RECEIVE_KNOWN)
257 {
258 // We we are passing the number of element but not the byte, calculate the byte
259 if (opt & KNOWN_ELEMENT_OR_BYTE)
260 {
261 // We know the number of element convert to byte (ONLY if it is possible)
263 {
264 for (size_t i = 0 ; i < sz_recv.size() ; i++)
265 {sz_recv_byte[NBX_prc_scnt].get(i) = sz_recv.get(i) * sizeof(typename T::value_type);}
266 }
267 else
268 {
269#ifndef DISABLE_ALL_RTTI
270 std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;
271#endif
272 }
273
274 self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
275 prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte[NBX_prc_scnt].getPointer(),msg_alloc_known,(void *)&NBX_prc_bi);
276 }
277 else
278 {
279 self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
280 prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&NBX_prc_bi);
281 sz_recv_byte[NBX_prc_scnt] = self_base::sz_recv_tmp;
282 }
283 }
284 else
285 {
286 self_base::tags[NBX_prc_scnt].clear();
287 prc_recv.clear();
288 self_base::sendrecvMultipleMessagesNBXAsync(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&NBX_prc_bi[NBX_prc_scnt]);
289 }
290 }
291
292
298 {
299 for (size_t i = 0 ; i < self_base::recv_buf[NBX_prc_scnt].size() ; i++)
300 {self_base::recv_buf[NBX_prc_scnt].get(i).resize(0);}
301
302 self_base::recv_buf[NBX_prc_scnt].resize(0);
303 }
304
318 static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
319 {
321
322 if (rinfo.recv_buf == NULL)
323 {
324 std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
325 return NULL;
326 }
327
328 rinfo.recv_buf->resize(ri+1);
329
330 rinfo.recv_buf->get(ri).resize(msg_i);
331
332 // Receive info
333 rinfo.prc->add(i);
334 rinfo.sz->add(msg_i);
335 rinfo.tags->add(tag);
336
337 // return the pointer
338
339 // If we have GPU direct activated use directly the cuda buffer
340 if (rinfo.opt & MPI_GPU_DIRECT)
341 {
342#if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
343 return rinfo.recv_buf->last().getDevicePointer();
344#else
345 return rinfo.recv_buf->last().getPointer();
346#endif
347 }
348
349 return rinfo.recv_buf->last().getPointer();
350 }
351
352
366 static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
367 {
369
370 if (rinfo.recv_buf == NULL)
371 {
372 std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
373 return NULL;
374 }
375
376 rinfo.recv_buf->resize(ri+1);
377
378 rinfo.recv_buf->get(ri).resize(msg_i);
379
380 // return the pointer
381 return rinfo.recv_buf->last().getPointer();
382 }
383
397 template<typename op, typename T, typename S, template <typename> class layout_base ,unsigned int ... prp >
400 openfpm::vector<size_t> * sz_byte,
401 op & op_param,
402 size_t opt)
403 {
404 if (sz != NULL)
405 {sz->resize(self_base::recv_buf[NBX_prc_pcnt].size());}
406
407 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv, self_base::recv_buf[NBX_prc_pcnt], sz, sz_byte, op_param,opt);
408 }
409
410 public:
411
418 Vcluster(int *argc, char ***argv)
419 :Vcluster_base<InternalMemory>(argc,argv)
420 {
421 }
422
450 template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SGather(T & send, S & recv,size_t root)
451 {
454
455 return SGather<T,S,layout_base>(send,recv,prc,sz,root);
456 }
457
459 template<size_t index, size_t N> struct MetaFuncOrd {
460 enum { value = index };
461 };
462
492 template<typename T,
493 typename S,
494 template <typename> class layout_base = memory_traits_lin>
495 bool SGather(T & send,
496 S & recv,
499 size_t root)
500 {
501#ifdef SE_CLASS1
502 if (&send == (T *)&recv)
503 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " using SGather in general the sending object and the receiving object must be different" << std::endl;}
504#endif
505
506 // Reset the receive buffer
508
509 // If we are on master collect the information
510 if (self_base::getProcessUnitID() == root)
511 {
512 // send buffer (master does not send anything) so send req and send_buf
513 // remain buffer with size 0
515
516 self_base::tags[NBX_prc_scnt].clear();
517
518 // receive information
519 base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
520
521 // Send and recv multiple messages
522 self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
523
524 // we generate the list of the properties to unpack
525 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
526
527 // operation object
529
530 // Reorder the buffer
531 reorder_buffer(prc,self_base::tags[NBX_prc_scnt],sz);
532
533 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz,NULL,opa,0);
534
535 recv.add(send);
536 prc.add(root);
537 sz.add(send.size());
538 }
539 else
540 {
541 // send buffer (master does not send anything) so send req and send_buf
542 // remain buffer with size 0
544 openfpm::vector<size_t> send_prc_;
545 send_prc.add(root);
546
548
550
551 //Pack requesting
552
553 size_t tot_size = 0;
554
555 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packingRequest(send, tot_size, sz);
556
557 HeapMemory pmem;
558
559 ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
560 mem.incRef();
561
562 //Packing
563
564 Pack_stat sts;
565
566 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf);
567
569
570 self_base::tags[NBX_prc_scnt].clear();
571
572 // receive information
573 base_info<InternalMemory> bi(NULL,prc,sz,self_base::tags[NBX_prc_scnt],0);
574
575 // Send and recv multiple messages
576 self_base::sendrecvMultipleMessagesNBX(send_prc_.size(),(size_t *)sz.getPointer(),(size_t *)send_prc_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);
577
578 mem.decRef();
579 delete &mem;
580 }
581
582 return true;
583 }
584
589 void barrier()
590 {
591 MPI_Barrier(MPI_COMM_WORLD);
592 }
593
620 template<typename T, typename S, template <typename> class layout_base=memory_traits_lin>
621 bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
622 {
623 // Reset the receive buffer
625
626 // If we are on master scatter the information
627 if (self_base::getProcessUnitID() == root)
628 {
629 // Prepare the sending buffer
631
632
634 sz_byte.resize(sz.size());
635
636 size_t ptr = 0;
637
638 for (size_t i = 0; i < sz.size() ; i++)
639 {
640 send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
641 sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
642 ptr += sz.get(i);
643 }
644
645 self_base::tags[NBX_prc_scnt].clear();
646
647 // receive information
648 base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
649
650 // Send and recv multiple messages
651 self_base::sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
652
653 // we generate the list of the properties to pack
654 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
655
656 // operation object
658
659 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
660 }
661 else
662 {
663 // The non-root receive
665
666 self_base::tags[NBX_prc_scnt].clear();
667
668 // receive information
669 base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
670
671 // Send and recv multiple messages
672 self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
673
674 // we generate the list of the properties to pack
675 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
676
677 // operation object
679
680 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
681 }
682
683 return true;
684 }
685
693 {
694
695 struct recv_buff_reorder
696 {
698 size_t proc;
699
700 size_t tag;
701
703 size_t pos;
704
706 recv_buff_reorder()
707 :proc(0),tag(0),pos(0)
708 {};
709
711 bool operator<(const recv_buff_reorder & rd) const
712 {
713 if (proc == rd.proc)
714 {return tag < rd.tag;}
715
716 return (proc < rd.proc);
717 }
718 };
719
721
722 rcv.resize(self_base::recv_buf[NBX_prc_pcnt].size());
723
724 for (size_t i = 0 ; i < rcv.size() ; i++)
725 {
726 rcv.get(i).proc = prc.get(i);
727 if (i < tags.size())
728 {rcv.get(i).tag = tags.get(i);}
729 else
730 {rcv.get(i).tag = (unsigned int)-1;}
731 rcv.get(i).pos = i;
732 }
733
734 // we sort based on processor
735 rcv.sort();
736
737 openfpm::vector_fr<BMemory<InternalMemory>> recv_ord;
738 recv_ord.resize(rcv.size());
739
741 prc_ord.resize(rcv.size());
742
743 openfpm::vector<size_t> sz_recv_ord;
744 sz_recv_ord.resize(rcv.size());
745
746 // Now we reorder rcv
747 for (size_t i = 0 ; i < rcv.size() ; i++)
748 {
749 recv_ord.get(i).swap(self_base::recv_buf[NBX_prc_pcnt].get(rcv.get(i).pos));
750 prc_ord.get(i) = rcv.get(i).proc;
751 sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
752 }
753
754 // move rcv into recv
755 // Now we swap back to recv_buf in an ordered way
756 for (size_t i = 0 ; i < rcv.size() ; i++)
757 {
758 self_base::recv_buf[NBX_prc_pcnt].get(i).swap(recv_ord.get(i));
759 }
760
761 prc.swap(prc_ord);
762 sz_recv.swap(sz_recv_ord);
763
764 // reorder prc_recv and recv_sz
765 }
766
794 template<typename T,
795 typename S,
796 template <typename> class layout_base = memory_traits_lin>
798 S & recv,
799 openfpm::vector<size_t> & prc_send,
800 openfpm::vector<size_t> & prc_recv,
801 openfpm::vector<size_t> & sz_recv,
802 size_t opt = NONE)
803 {
804 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
805
807
808 // Reorder the buffer
809 reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
810
811 mem[NBX_prc_scnt]->decRef();
812 delete mem[NBX_prc_scnt];
813 delete pmem[NBX_prc_scnt];
814
815 // we generate the list of the properties to pack
816 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
817
819
820 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
821
822 return true;
823 }
824
855 template<typename T,
856 typename S,
857 template <typename> class layout_base = memory_traits_lin>
859 S & recv,
860 openfpm::vector<size_t> & prc_send,
861 openfpm::vector<size_t> & prc_recv,
862 openfpm::vector<size_t> & sz_recv,
863 size_t opt = NONE)
864 {
865 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
866
867 NBX_prc_scnt++;
868
869 return true;
870 }
871
900 template<typename T, typename S, template <typename> class layout_base, int ... prp>
902 S & recv,
903 openfpm::vector<size_t> & prc_send,
904 openfpm::vector<size_t> & prc_recv,
905 openfpm::vector<size_t> & sz_recv,
906 openfpm::vector<size_t> & sz_recv_byte_out,
907 size_t opt = NONE)
908 {
909 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
910
912
913 // Reorder the buffer
914 reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
915
916 mem[NBX_prc_scnt]->decRef();
917 delete mem[NBX_prc_scnt];
918 delete pmem[NBX_prc_scnt];
919
920 // operation object
922
923 // process the received information
924 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt);
925
926 return true;
927 }
928
960 template<typename T, typename S, template <typename> class layout_base, int ... prp>
962 S & recv,
963 openfpm::vector<size_t> & prc_send,
964 openfpm::vector<size_t> & prc_recv,
965 openfpm::vector<size_t> & sz_recv,
966 openfpm::vector<size_t> & sz_recv_byte_out,
967 size_t opt = NONE)
968 {
969 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
970
971 NBX_prc_scnt++;
972
973 return true;
974 }
975
1003 template<typename T, typename S, template <typename> class layout_base, int ... prp>
1005 S & recv,
1006 openfpm::vector<size_t> & prc_send,
1007 openfpm::vector<size_t> & prc_recv,
1008 openfpm::vector<size_t> & sz_recv,
1009 size_t opt = NONE)
1010 {
1011 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
1012
1014
1015 // Reorder the buffer
1016 reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
1017
1018 mem[NBX_prc_scnt]->decRef();
1019 delete mem[NBX_prc_scnt];
1020 delete pmem[NBX_prc_scnt];
1021
1022 // operation object
1024
1025 // process the received information
1026 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
1027
1028 return true;
1029 }
1030
1061 template<typename T, typename S, template <typename> class layout_base, int ... prp>
1063 S & recv,
1064 openfpm::vector<size_t> & prc_send,
1065 openfpm::vector<size_t> & prc_recv,
1066 openfpm::vector<size_t> & sz_recv,
1067 size_t opt = NONE)
1068 {
1069 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
1070
1071 NBX_prc_scnt++;
1072
1073 return true;
1074 }
1075
1112 template<typename op,
1113 typename T,
1114 typename S,
1115 template <typename> class layout_base,
1116 int ... prp>
1118 S & recv,
1119 openfpm::vector<size_t> & prc_send,
1120 op & op_param,
1121 openfpm::vector<size_t> & prc_recv,
1122 openfpm::vector<size_t> & recv_sz,
1123 size_t opt = NONE)
1124 {
1125 prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
1126
1128
1129 // Reorder the buffer
1130 reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
1131
1132 mem[NBX_prc_scnt]->decRef();
1133 delete mem[NBX_prc_scnt];
1134 delete pmem[NBX_prc_scnt];
1135
1136 // process the received information
1137 process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
1138
1139 return true;
1140 }
1141
1180 template<typename op,
1181 typename T,
1182 typename S,
1183 template <typename> class layout_base,
1184 int ... prp>
1186 S & recv,
1187 openfpm::vector<size_t> & prc_send,
1188 op & op_param,
1189 openfpm::vector<size_t> & prc_recv,
1190 openfpm::vector<size_t> & recv_sz,
1191 size_t opt = NONE)
1192 {
1193 prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
1194
1195 NBX_prc_scnt++;
1196
1197 return true;
1198 }
1199
1205 template<typename T,
1206 typename S,
1207 template <typename> class layout_base = memory_traits_lin>
1209 S & recv,
1210 openfpm::vector<size_t> & prc_send,
1211 openfpm::vector<size_t> & prc_recv,
1212 openfpm::vector<size_t> & sz_recv,
1213 size_t opt = NONE)
1214 {
1216
1217 // Reorder the buffer
1218 reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1219
1220 mem[NBX_prc_pcnt]->decRef();
1221 delete mem[NBX_prc_pcnt];
1222 delete pmem[NBX_prc_pcnt];
1223
1224 // we generate the list of the properties to pack
1225 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
1226
1228
1229 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
1230
1231 NBX_prc_pcnt++;
1232 if (NBX_prc_scnt == NBX_prc_pcnt)
1233 {
1234 NBX_prc_scnt = 0;
1235 NBX_prc_pcnt = 0;
1236 }
1237
1238 return true;
1239 }
1240
1246 template<typename T, typename S, template <typename> class layout_base, int ... prp>
1248 S & recv,
1249 openfpm::vector<size_t> & prc_send,
1250 openfpm::vector<size_t> & prc_recv,
1251 openfpm::vector<size_t> & sz_recv,
1252 openfpm::vector<size_t> & sz_recv_byte_out,
1253 size_t opt = NONE)
1254 {
1256
1257 // Reorder the buffer
1258 reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1259
1260 mem[NBX_prc_pcnt]->decRef();
1261 delete mem[NBX_prc_pcnt];
1262 delete pmem[NBX_prc_pcnt];
1263
1264 // operation object
1266
1267 // process the received information
1268 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt);
1269
1270 NBX_prc_pcnt++;
1271 if (NBX_prc_scnt == NBX_prc_pcnt)
1272 {
1273 NBX_prc_scnt = 0;
1274 NBX_prc_pcnt = 0;
1275 }
1276
1277 return true;
1278 }
1279
1285 template<typename T, typename S, template <typename> class layout_base, int ... prp>
1287 S & recv,
1288 openfpm::vector<size_t> & prc_send,
1289 openfpm::vector<size_t> & prc_recv,
1290 openfpm::vector<size_t> & sz_recv,
1291 size_t opt = NONE)
1292 {
1294
1295 // Reorder the buffer
1296 reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1297
1298 mem[NBX_prc_pcnt]->decRef();
1299 delete mem[NBX_prc_pcnt];
1300 delete pmem[NBX_prc_pcnt];
1301
1302 // operation object
1304
1305 // process the received information
1306 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
1307
1308 NBX_prc_pcnt++;
1309 if (NBX_prc_scnt == NBX_prc_pcnt)
1310 {
1311 NBX_prc_scnt = 0;
1312 NBX_prc_pcnt = 0;
1313 }
1314
1315 return true;
1316 }
1317
1323 template<typename op,
1324 typename T,
1325 typename S,
1326 template <typename> class layout_base,
1327 int ... prp>
1329 S & recv,
1330 openfpm::vector<size_t> & prc_send,
1331 op & op_param,
1332 openfpm::vector<size_t> & prc_recv,
1333 openfpm::vector<size_t> & recv_sz,
1334 size_t opt = NONE)
1335 {
1337
1338 // Reorder the buffer
1339 reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1340
1341 mem[NBX_prc_pcnt]->decRef();
1342 delete mem[NBX_prc_pcnt];
1343 delete pmem[NBX_prc_pcnt];
1344
1345 // process the received information
1346 process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
1347
1348 NBX_prc_pcnt++;
1349 if (NBX_prc_scnt == NBX_prc_pcnt)
1350 {
1351 NBX_prc_scnt = 0;
1352 NBX_prc_pcnt = 0;
1353 }
1354
1355 return true;
1356 }
1357
1358};
1359
1360
1361
1362// Function to initialize the global VCluster //
1363
1364extern Vcluster<> * global_v_cluster_private_heap;
1365extern Vcluster<CudaMemory> * global_v_cluster_private_cuda;
1366
1373static inline void init_global_v_cluster_private(int *argc, char ***argv)
1374{
1375 if (global_v_cluster_private_heap == NULL)
1376 {global_v_cluster_private_heap = new Vcluster<>(argc,argv);}
1377
1378 if (global_v_cluster_private_cuda == NULL)
1379 {global_v_cluster_private_cuda = new Vcluster<CudaMemory>(argc,argv);}
1380}
1381
1382static inline void delete_global_v_cluster_private()
1383{
1384 delete global_v_cluster_private_heap;
1385 delete global_v_cluster_private_cuda;
1386}
1387
1388template<typename Memory>
1390{
1391 static Vcluster<Memory> & get()
1392 {
1393 return *global_v_cluster_private_heap;
1394 }
1395};
1396
1397template<>
1399{
1400 static Vcluster<CudaMemory> & get()
1401 {
1402 return *global_v_cluster_private_cuda;
1403 }
1404};
1405
1406template<typename Memory = HeapMemory>
1407static inline Vcluster<Memory> & create_vcluster()
1408{
1409 if (global_v_cluster_private_heap == NULL)
1410 {std::cerr << __FILE__ << ":" << __LINE__ << " Error you must call openfpm_init before using any distributed data structures";}
1411
1412 return get_vcl<Memory>::get();
1413}
1414
1415
1416
1422static inline bool is_openfpm_init()
1423{
1424 return ofp_initialized;
1425}
1426
1427
1433void openfpm_init_vcl(int *argc, char ***argv);
1434
1435size_t openfpm_vcluster_compilation_mask();
1436
1442void openfpm_finalize();
1443
1449static void openfpm_init(int *argc, char ***argv)
1450{
1451 openfpm_init_vcl(argc,argv);
1452
1453 size_t compiler_mask = CUDA_ON_BACKEND;
1454
1455 init_wrappers();
1456
1457 if (compiler_mask != openfpm_vcluster_compilation_mask() || compiler_mask != openfpm_ofpmmemory_compilation_mask())
1458 {
1459 std::cout << __FILE__ << ":" << __LINE__ << " Error: the program has been compiled with CUDA_ON_BACKEND: " << compiler_mask << " but libvcluster has been compiled with CUDA_ON_BACKEND: " <<
1460 openfpm_vcluster_compilation_mask() << ", and libofpmmemory has been compiled with CUDA_ON_BACKEND: " << openfpm_ofpmmemory_compilation_mask() <<
1461 " recompile the library with the right CUDA_ON_BACKEND" << std::endl;
1462 }
1463}
1464
1465#endif
1466
It override the behavior if size()
virtual void decRef()
Decrement the reference counter.
virtual void incRef()
Increment the reference counter.
This class allocate, and destroy CPU memory.
Packing status object.
Definition Pack_stat.hpp:61
This class virtualize the cluster of PC as a set of processes that communicate.
size_t size()
Get the total number of processors.
openfpm::vector_fr< BMemory< InternalMemory > > recv_buf[NQUEUE]
Receive buffers.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t getProcessUnitID()
Get the process unit id.
openfpm::vector< size_t > tags[NQUEUE]
tags receiving
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
openfpm::vector< MPI_Request > req
vector of MPI requests
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Implementation of VCluster class.
Definition VCluster.hpp:59
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
bool SGather(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Gather, gather the data from all processors into one node.
Definition VCluster.hpp:495
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Vcluster(int *argc, char ***argv)
Constructor.
Definition VCluster.hpp:418
bool SSendRecvP_op(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
void barrier()
Just a call to mpi_barrier.
Definition VCluster.hpp:589
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Synchronize with SSendRecvP.
void prepare_send_buffer(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt)
Prepare the send buffer and send the message to other processors.
Definition VCluster.hpp:171
static void * msg_alloc_known(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
Definition VCluster.hpp:366
bool SSendRecvP_opAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
bool SScatter(T &send, S &recv, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, size_t root)
Semantic Scatter, scatter the data from one processor to the other node.
Definition VCluster.hpp:621
bool SSendRecvWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecv.
bool SGather(T &send, S &recv, size_t root)
Semantic Gather, gather the data from all processors into one node.
Definition VCluster.hpp:450
bool SSendRecvP_opWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, op &op_param, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, size_t opt=NONE)
Synchronize with SSendRecvP_op.
void process_receive_buffer_with_prp(S &recv, openfpm::vector< size_t > *sz, openfpm::vector< size_t > *sz_byte, op &op_param, size_t opt)
Process the receive buffer.
Definition VCluster.hpp:398
static void * msg_alloc(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void *ptr)
Call-back to allocate buffer to receive data.
Definition VCluster.hpp:318
void reset_recv_buf()
Reset the receive buffer.
Definition VCluster.hpp:297
void reorder_buffer(openfpm::vector< size_t > &prc, const openfpm::vector< size_t > &tags, openfpm::vector< size_t > &sz_recv)
reorder the receiving buffer
Definition VCluster.hpp:692
bool SSendRecvP(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition VCluster.hpp:901
bool SSendRecvPAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, openfpm::vector< size_t > &sz_recv_byte_out, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors (with pr...
Definition VCluster.hpp:961
bool SSendRecvPWait(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Synchronize with SSendRecvP.
bool SSendRecvAsync(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors asynchro...
Definition VCluster.hpp:858
bool SSendRecv(openfpm::vector< T > &send, S &recv, openfpm::vector< size_t > &prc_send, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &sz_recv, size_t opt=NONE)
Semantic Send and receive, send the data to processors and receive from the other processors.
Definition VCluster.hpp:797
Implementation of 1-D std::vector like structure.
size_t size()
Stub size.
size_t opt
options
Definition VCluster.hpp:99
base_info(openfpm::vector_fr< BMemory< Memory > > *recv_buf, openfpm::vector< size_t > &prc, openfpm::vector< size_t > &sz, openfpm::vector< size_t > &tags, size_t opt)
constructor
Definition VCluster.hpp:106
openfpm::vector< size_t > * tags
tags
Definition VCluster.hpp:96
openfpm::vector< size_t > * sz
size of each message
Definition VCluster.hpp:94
openfpm::vector< size_t > * prc
receiving processor list
Definition VCluster.hpp:92
openfpm::vector_fr< BMemory< Memory > > * recv_buf
Receive buffer.
Definition VCluster.hpp:90
base_info()
default constructor
Definition VCluster.hpp:102
static void process_recv(Vcluster &vcl, S &recv, openfpm::vector< size_t > *sz_recv, openfpm::vector< size_t > *sz_recv_byte, op &op_param, size_t opt)
Process the receive buffer.
Definition VCluster.hpp:137
It return true if the object T require complex serialization.
These set of classes generate an array definition at compile-time.
Definition ct_array.hpp:26
Transform the boost::fusion::vector into memory specification (memory_traits)
Helper class to add data.