OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
 
Loading...
Searching...
No Matches
VCluster_base.hpp
1#ifndef VCLUSTER_BASE_HPP_
2#define VCLUSTER_BASE_HPP_
3
4
5#include "util/cuda_util.hpp"
6#ifdef OPENMPI
7#include <mpi.h>
8#include <mpi-ext.h>
9#else
10#include <mpi.h>
11#endif
12#include "MPI_wrapper/MPI_util.hpp"
13#include "Vector/map_vector.hpp"
14#include "MPI_wrapper/MPI_IallreduceW.hpp"
15#include "MPI_wrapper/MPI_IrecvW.hpp"
16#include "MPI_wrapper/MPI_IsendW.hpp"
17#include "MPI_wrapper/MPI_IAllGather.hpp"
18#include "MPI_wrapper/MPI_IBcastW.hpp"
19#include <exception>
20#include "Vector/map_vector.hpp"
21#ifdef DEBUG
22#include "util/check_no_pointers.hpp"
23#include "util/util_debug.hpp"
24#endif
25#include "util/Vcluster_log.hpp"
26#include "memory/BHeapMemory.hpp"
27#include "Packer_Unpacker/has_max_prop.hpp"
28#include "data_type/aggregate.hpp"
29#include "util/ofp_context.hpp"
30
31#ifdef HAVE_PETSC
32#include <petscvec.h>
33#endif
34
35extern double time_spent;
36
37enum NBX_Type
38{
39 NBX_UNACTIVE,
40 NBX_UNKNOWN,
41 NBX_KNOWN,
42 NBX_KNOWN_PRC
43};
44
45constexpr int MSG_LENGTH = 1024;
46constexpr int MSG_SEND_RECV = 1025;
47constexpr int SEND_SPARSE = 8192;
48constexpr int NONE = 1;
49constexpr int NEED_ALL_SIZE = 2;
50
51constexpr int SERIVCE_MESSAGE_TAG = 16384;
52constexpr int SEND_RECV_BASE = 4096;
53constexpr int GATHER_BASE = 24576;
54
55constexpr int RECEIVE_KNOWN = 4;
56constexpr int KNOWN_ELEMENT_OR_BYTE = 8;
57constexpr int MPI_GPU_DIRECT = 16;
58
59constexpr int NQUEUE = 4;
60
61// number of vcluster instances
62extern size_t n_vcluster;
63// Global MPI initialization
64extern bool global_mpi_init;
65// initialization flag
66extern bool ofp_initialized;
67extern size_t tot_sent;
68extern size_t tot_recv;
69
71
72extern size_t NBX_cnt;
73
74template<typename T> void assign(T * ptr1, T * ptr2)
75{
76 *ptr1 = *ptr2;
77};
78
79
81union red
82{
84 char c;
86 unsigned char uc;
88 short s;
90 unsigned short us;
92 int i;
94 unsigned int ui;
96 float f;
98 double d;
99};
100
124template<typename InternalMemory>
126{
129
133
136
139
142
144 std::vector<int> post_exe;
145
148
149 // Single objects
150
155
157 int numPE = 1;
158
160
161 NBX_Type NBX_active[NQUEUE];
162
164 size_t rid[NQUEUE];
165
167 int NBX_prc_qcnt = -1;
168
171
173
174 int NBX_prc_cnt_base = 0;
175 size_t NBX_prc_n_send[NQUEUE];
176 size_t * NBX_prc_prc[NQUEUE];
177 void ** NBX_prc_ptr[NQUEUE];
178 size_t * NBX_prc_sz[NQUEUE];
179 size_t NBX_prc_n_recv[NQUEUE];
180 void * (* NBX_prc_msg_alloc[NQUEUE])(size_t,size_t,size_t,size_t,size_t,size_t,void *);
181 size_t * NBX_prc_prc_recv[NQUEUE];
182 void * NBX_prc_ptr_arg[NQUEUE];
183
185
193 std::vector<red> r;
194
197
200
202 MPI_Request bar_req;
203
205 MPI_Status bar_stat;
206
208 Vcluster_base & operator=(const Vcluster_base &) {return *this;};
209
212
215
218 {};
219
220 void queue_all_sends(size_t n_send , size_t sz[],
221 size_t prc[], void * ptr[])
222 {
223 if (stat.size() != 0 || (req.size() != 0 && NBX_prc_qcnt == 0))
224 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";}
225
226
227 if (NBX_prc_qcnt == 0)
228 {
229 stat.clear();
230 req.clear();
231 // Do MPI_Issend
232 }
233
234 for (size_t i = 0 ; i < n_send ; i++)
235 {
236 if (sz[i] != 0)
237 {
238 req.add();
239
240#ifdef SE_CLASS2
241 check_valid(ptr[i],sz[i]);
242#endif
243
244 tot_sent += sz[i];
245
246// std::cout << "TAG: " << SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i << " " << NBX_cnt << " " << NBX_prc_qcnt << " " << " rank: " << rank() << " " << NBX_prc_cnt_base << " nbx_cycle: " << nbx_cycle << std::endl;
247
248 if (sz[i] > 2147483647)
249 {MPI_SAFE_CALL(MPI_Issend(ptr[i], (sz[i] >> 3) + 1 , MPI_DOUBLE, prc[i], SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i, MPI_COMM_WORLD,&req.last()));}
250 else
251 {MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i, MPI_COMM_WORLD,&req.last()));}
252 log.logSend(prc[i]);
253 }
254 }
255 }
256
257protected:
258
260 openfpm::vector_fr<BMemory<InternalMemory>> recv_buf[NQUEUE];
261
264
265public:
266
267 // Finalize the MPI program
269 {
270#ifdef SE_CLASS2
271 check_delete(this);
272#endif
273 n_vcluster--;
274
275 // if there are no other vcluster instances finalize
276 if (n_vcluster == 0)
277 {
278 int already_finalised;
279
280 MPI_Finalized(&already_finalised);
281 if (!already_finalised)
282 {
283 if (MPI_Finalize() != 0)
284 {
285 std::cerr << __FILE__ << ":" << __LINE__ << " MPI_Finalize FAILED \n";
286 }
287 }
288 }
289
290 delete context;
291 }
292
299 Vcluster_base(int *argc, char ***argv)
300 {
301 // reset NBX_Active
302
303 for (unsigned int i = 0 ; i < NQUEUE ; i++)
304 {
305 NBX_active[i] = NBX_Type::NBX_UNACTIVE;
306 rid[i] = 0;
307 }
308
309#ifdef SE_CLASS2
310 check_new(this,8,VCLUSTER_EVENT,PRJ_VCLUSTER);
311#endif
312
313 n_vcluster++;
314
315 int already_initialised;
316 MPI_Initialized(&already_initialised);
317
318 // Check if MPI is already initialized
319 if (!already_initialised)
320 {
321 MPI_Init(argc,argv);
322 }
323
324 // We try to get the local processors rank
325
326 MPI_Comm shmcomm;
327 MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0,
328 MPI_INFO_NULL, &shmcomm);
329
330 MPI_Comm_rank(shmcomm, &shmrank);
331 MPI_Comm_free(&shmcomm);
332
333 // Get the total number of process
334 // and the rank of this process
335
336 MPI_Comm_size(MPI_COMM_WORLD, &m_size);
337 MPI_Comm_rank(MPI_COMM_WORLD, &m_rank);
338
339#ifdef SE_CLASS2
340 process_v_cl = m_rank;
341#endif
342
343 // create and fill map scatter with one
344 map_scatter.resize(m_size);
345
346 for (size_t i = 0 ; i < map_scatter.size() ; i++)
347 {
348 map_scatter.get(i) = 1;
349 }
350
351 // open the log file
352 log.openLog(m_rank);
353
354 // Initialize bar_req
355 bar_req = MPI_Request();
356 bar_stat = MPI_Status();
357
358#ifdef EXTERNAL_SET_GPU
359 int dev;
360 cudaGetDevice(&dev);
361 context = new gpu::ofp_context_t(gpu::gpu_context_opt::no_print_props,dev);
362#else
363 context = new gpu::ofp_context_t(gpu::gpu_context_opt::no_print_props,shmrank);
364#endif
365
366
367#if defined(PRINT_RANK_TO_GPU) && defined(CUDA_GPU)
368
369 char node_name[MPI_MAX_PROCESSOR_NAME];
370 int len;
371
372 MPI_Get_processor_name(node_name,&len);
373
374 std::cout << "Rank: " << m_rank << " on host: " << node_name << " work on GPU: " << context->getDevice() << "/" << context->getNDevice() << std::endl;
375#endif
376
377 int flag;
378 void *tag_ub_v;
379 int tag_ub;
380
381 MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &tag_ub_v, &flag);
382 tag_ub = *(int*)tag_ub_v;
383
384 if (flag == true)
385 {
386 nbx_cycle = (tag_ub - SEND_SPARSE - 131072 - NQUEUE*131072) / 131072;
387
388 if (nbx_cycle < NQUEUE*2)
389 {std::cerr << __FILE__ << ":" << __LINE__ << " Error MPI_TAG_UB is too small for OpenFPM" << std::endl;}
390 }
391 else
392 {nbx_cycle = 2048;}
393 }
394
395#ifdef SE_CLASS1
396
407 template<typename T> void checkType()
408 {
409 // if T is a primitive like int, long int, float, double, ... make sense
410 // (pointers, l-references and r-references are not fundamentals)
411 if (std::is_fundamental<T>::value == true)
412 {return;}
413
414 // if it is a pointer make no sense
415 if (std::is_pointer<T>::value == true)
416 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
417
418 // if it is an l-value reference make no send
419 if (std::is_lvalue_reference<T>::value == true)
420 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
421
422 // if it is an r-value reference make no send
423 if (std::is_rvalue_reference<T>::value == true)
424 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
425
426 // ... if not, check that T has a method called noPointers
428 {
429 case PNP::UNKNOWN:
430 {
431 std::cerr << "Warning: " << __FILE__ << ":" << __LINE__ << " impossible to check the type " << demangle(typeid(T).name()) << " please consider to add a static method \"static bool noPointers()\" \n" ;
432 break;
433 }
434 case PNP::POINTERS:
435 {
436 std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " has pointers inside, sending pointers values has no sense\n";
437 break;
438 }
439 default:
440 {
441
442 }
443 }
444 }
445
446#endif
447
454 {
455 if (context == NULL && iw == true)
456 {
457 std::cout << __FILE__ << ":" << __LINE__ << " Warning: it seem that modern gpu context is not initialized."
458 "Either a compatible working cuda device has not been found, either openfpm_init has been called in a file that not compiled with NVCC" << std::endl;
459 }
460
461 return *context;
462 }
463
469 MPI_Comm getMPIComm()
470 {
471 return MPI_COMM_WORLD;
472 }
473
480 {
481 return m_size*numPE;
482 }
483
493 size_t size()
494 {
495 return this->m_size*numPE;
496 }
497
498 void print_stats()
499 {
500#ifdef VCLUSTER_PERF_REPORT
501 std::cout << "-- REPORT COMMUNICATIONS -- " << std::endl;
502
503 std::cout << "Processor " << this->rank() << " sent: " << tot_sent << std::endl;
504 std::cout << "Processor " << this->rank() << " received: " << tot_recv << std::endl;
505
506 std::cout << "Processor " << this->rank() << " time spent: " << time_spent << std::endl;
507 std::cout << "Processor " << this->rank() << " Bandwidth: S:" << (double)tot_sent / time_spent * 1e-9 << "GB/s R:" << (double)tot_recv / time_spent * 1e-9 << "GB/s" << std::endl;
508#else
509
510 std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
511
512#endif
513 }
514
515 void clear_stats()
516 {
517#ifdef VCLUSTER_PERF_REPORT
518
519 tot_sent = 0;
520 tot_recv = 0;
521
522 time_spent = 0;
523#else
524
525 std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
526
527#endif
528 }
529
536 {
537 return m_rank;
538 }
539
549 size_t rank()
550 {
551 return m_rank;
552 }
553
554
561 template<typename T> void sum(T & num)
562 {
563#ifdef SE_CLASS1
564 checkType<T>();
565#endif
566
567 // reduce over MPI
568
569 // Create one request
570 req.add();
571
572 // reduce
573 MPI_IallreduceW<T>::reduce(num,MPI_SUM,req.last());
574 }
575
581 template<typename T> void max(T & num)
582 {
583#ifdef SE_CLASS1
584 checkType<T>();
585#endif
586 // reduce over MPI
587
588 // Create one request
589 req.add();
590
591 // reduce
592 MPI_IallreduceW<T>::reduce(num,MPI_MAX,req.last());
593 }
594
601 template<typename T> void min(T & num)
602 {
603#ifdef SE_CLASS1
604 checkType<T>();
605#endif
606 // reduce over MPI
607
608 // Create one request
609 req.add();
610
611 // reduce
612 MPI_IallreduceW<T>::reduce(num,MPI_MIN,req.last());
613 }
614
621 {
622 MPI_Status stat_t;
623 int stat = false;
624 MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&stat,&stat_t));
625
626 // If I have an incoming message and is related to this NBX communication
627 if (stat == true)
628 {
629 unsigned int i = (stat_t.MPI_TAG - SEND_SPARSE) / 131072 - NBX_prc_cnt_base;
630
631 if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
632 {return;}
633
634 int msize_;
635 long int msize;
636 bool big_data = true;
637
638 // Get the message tag and size
639
640 MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_DOUBLE,&msize_));
641 if (msize_ == MPI_UNDEFINED)
642 {
643 big_data = false;
644 MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize_));
645 msize = msize_;
646 }
647 else
648 {
649 msize = ((size_t)msize_) << 3;
650 }
651
652 // Ok we check if the TAG come from one of our send TAG
653 if (stat_t.MPI_TAG >= (int)(SEND_SPARSE + NBX_prc_cnt_base*131072) && stat_t.MPI_TAG < (int)(SEND_SPARSE + (NBX_prc_cnt_base + NBX_prc_qcnt + 1)*131072))
654 {
655 // Get the pointer to receive the message
656 void * ptr = this->NBX_prc_msg_alloc[i](msize,0,0,stat_t.MPI_SOURCE,rid[i],stat_t.MPI_TAG,this->NBX_prc_ptr_arg[i]);
657
658 // Log the receiving request
659 log.logRecv(stat_t);
660
661 rid[i]++;
662
663 // Check the pointer
664#ifdef SE_CLASS2
665 check_valid(ptr,msize);
666#endif
667 tot_recv += msize;
668 #ifdef VCLUSTER_GARBAGE_INJECTOR
669 #if defined (__NVCC__) && !defined(CUDA_ON_CPU)
670 cudaPointerAttributes cpa;
671 auto error = cudaPointerGetAttributes(&cpa,ptr);
672 if (error == cudaSuccess)
673 {
674 if(cpa.type == cudaMemoryTypeDevice)
675 {cudaMemset(ptr,0xFF,msize);}
676 else
677 {memset(ptr,0xFF,msize);}
678 }
679 #else
680 memset(ptr,0xFF,msize);
681 #endif
682 #endif
683 if (big_data == true)
684 {
685// std::cout << "RECEVING BIG MESSAGE " << msize_ << " " << msize << std::endl;
686 MPI_SAFE_CALL(MPI_Recv(ptr,msize >> 3,MPI_DOUBLE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,MPI_COMM_WORLD,&stat_t));
687 }
688 else
689 {
690 MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,MPI_COMM_WORLD,&stat_t));
691 }
692#ifdef SE_CLASS2
693 check_valid(ptr,msize);
694#endif
695 }
696 }
697
698 // Check the status of all the MPI_issend and call the barrier if finished
699
700 for (unsigned int i = 0 ; i < NQUEUE ; i++)
701 {
702 if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
703 {continue;}
704
705 if (NBX_prc_reached_bar_req[i] == false)
706 {
707 int flag = false;
708 if (req.size() != 0)
709 {MPI_SAFE_CALL(MPI_Testall(req.size(),&req.get(0),&flag,MPI_STATUSES_IGNORE));}
710 else
711 {flag = true;}
712
713 // If all send has been completed
714 if (flag == true)
715 {MPI_SAFE_CALL(MPI_Ibarrier(MPI_COMM_WORLD,&bar_req));NBX_prc_reached_bar_req[i] = true;}
716 }
717 }
718 }
719
765 openfpm::vector< size_t > & prc_recv,
766 openfpm::vector< size_t > & recv_sz ,
767 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
768 void * ptr_arg,
769 long int opt=NONE)
770 {
771 #ifdef VCLUSTER_PERF_REPORT
772 timer nbx_timer;
773 nbx_timer.start();
774
775 #endif
776
777 // Allocate the buffers
778
779 for (size_t i = 0 ; i < prc.size() ; i++)
780 {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
781
782 for (size_t i = 0 ; i < prc_recv.size() ; i++)
783 {
784 void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
785
786 recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
787 }
788
789 execute();
790
791 // Circular counter
792 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
793
794 #ifdef VCLUSTER_PERF_REPORT
795 nbx_timer.stop();
796 time_spent += nbx_timer.getwct();
797 #endif
798 }
799
845 openfpm::vector< size_t > & prc_recv,
846 openfpm::vector< size_t > & recv_sz ,
847 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
848 void * ptr_arg,
849 long int opt=NONE)
850 {
851 NBX_prc_qcnt++;
852 if (NBX_prc_qcnt >= NQUEUE)
853 {
854 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
855 return;
856 }
857
858 // Allocate the buffers
859
860 for (size_t i = 0 ; i < prc.size() ; i++)
861 {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
862
863 for (size_t i = 0 ; i < prc_recv.size() ; i++)
864 {
865 void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
866
867 recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
868 }
869
870 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
871 if (NBX_prc_qcnt == 0)
872 {NBX_prc_cnt_base = NBX_cnt;}
873 }
874
910 template<typename T>
913 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
914 void * ptr_arg, long int opt=NONE)
915 {
916#ifdef SE_CLASS1
917 checkType<typename T::value_type>();
918#endif
919
920 // resize the pointer list
921 ptr_send[NBX_prc_qcnt+1].resize(prc.size());
922 sz_send[NBX_prc_qcnt+1].resize(prc.size());
923
924 for (size_t i = 0 ; i < prc.size() ; i++)
925 {
926 ptr_send[NBX_prc_qcnt+1].get(i) = data.get(i).getPointer();
927 sz_send[NBX_prc_qcnt+1].get(i) = data.get(i).size() * sizeof(typename T::value_type);
928 }
929
930 sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send[NBX_prc_qcnt+1].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt+1].getPointer(),msg_alloc,ptr_arg,opt);
931 }
932
972 template<typename T>
975 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
976 void * ptr_arg, long int opt=NONE)
977 {
978#ifdef SE_CLASS1
979 checkType<typename T::value_type>();
980#endif
981 // resize the pointer list
982 ptr_send[NBX_prc_qcnt+1].resize(prc.size());
983 sz_send[NBX_prc_qcnt+1].resize(prc.size());
984
985 for (size_t i = 0 ; i < prc.size() ; i++)
986 {
987 ptr_send[NBX_prc_qcnt+1].get(i) = data.get(i).getPointer();
988 sz_send[NBX_prc_qcnt+1].get(i) = data.get(i).size() * sizeof(typename T::value_type);
989 }
990
991 sendrecvMultipleMessagesNBXAsync(prc.size(),(size_t *)sz_send[NBX_prc_qcnt+1].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt+1].getPointer(),msg_alloc,ptr_arg,opt);
992 }
993
1037 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
1038 size_t prc[] , void * ptr[],
1039 size_t n_recv, size_t prc_recv[] ,
1040 size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1041 void * ptr_arg, long int opt=NONE)
1042 {
1043 #ifdef VCLUSTER_PERF_REPORT
1044 timer nbx_timer;
1045 nbx_timer.start();
1046
1047 #endif
1048
1049 // Allocate the buffers
1050
1051 for (size_t i = 0 ; i < n_send ; i++)
1052 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1053
1054 for (size_t i = 0 ; i < n_recv ; i++)
1055 {
1056 void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1057
1058 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1059 }
1060
1061 execute();
1062
1063 // Circular counter
1064 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1065
1066 #ifdef VCLUSTER_PERF_REPORT
1067 nbx_timer.stop();
1068 time_spent += nbx_timer.getwct();
1069 #endif
1070 }
1071
1115 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1116 size_t prc[] , void * ptr[],
1117 size_t n_recv, size_t prc_recv[] ,
1118 size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1119 void * ptr_arg, long int opt=NONE)
1120 {
1121 NBX_prc_qcnt++;
1122 if (NBX_prc_qcnt >= NQUEUE)
1123 {
1124 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1125 return;
1126 }
1127
1128 // Allocate the buffers
1129
1130 for (size_t i = 0 ; i < n_send ; i++)
1131 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1132
1133 for (size_t i = 0 ; i < n_recv ; i++)
1134 {
1135 void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1136
1137 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1138 }
1139
1140 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
1141 if (NBX_prc_qcnt == 0)
1142 {NBX_prc_cnt_base = NBX_cnt;}
1143 }
1144
1145 openfpm::vector<size_t> sz_recv_tmp;
1146
1189 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
1190 void * ptr[], size_t n_recv, size_t prc_recv[] ,
1191 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1192 void * ptr_arg, long int opt=NONE)
1193 {
1194 #ifdef VCLUSTER_PERF_REPORT
1195 timer nbx_timer;
1196 nbx_timer.start();
1197 #endif
1198
1199 sz_recv_tmp.resize(n_recv);
1200
1201 // First we understand the receive size for each processor
1202
1203 for (size_t i = 0 ; i < n_send ; i++)
1204 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1205
1206 for (size_t i = 0 ; i < n_recv ; i++)
1207 {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1208
1209 execute();
1210
1211 // Circular counter
1212 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1213
1214 // Allocate the buffers
1215
1216 for (size_t i = 0 ; i < n_send ; i++)
1217 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1218
1219 for (size_t i = 0 ; i < n_recv ; i++)
1220 {
1221 void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,0,ptr_arg);
1222
1223 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1224 }
1225
1226 execute();
1227
1228 // Circular counter
1229 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1230
1231 #ifdef VCLUSTER_PERF_REPORT
1232 nbx_timer.stop();
1233 time_spent += nbx_timer.getwct();
1234 #endif
1235 }
1236
1279 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[], size_t prc[] ,
1280 void * ptr[], size_t n_recv, size_t prc_recv[] ,
1281 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1282 void * ptr_arg, long int opt=NONE)
1283 {
1284 NBX_prc_qcnt++;
1285 if (NBX_prc_qcnt >= NQUEUE)
1286 {
1287 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1288 return;
1289 }
1290
1291 sz_recv_tmp.resize(n_recv);
1292
1293 // First we understand the receive size for each processor
1294
1295 for (size_t i = 0 ; i < n_send ; i++)
1296 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1297
1298 for (size_t i = 0 ; i < n_recv ; i++)
1299 {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1300
1302
1303 NBX_prc_n_send[NBX_prc_qcnt] = n_send;
1304 NBX_prc_prc[NBX_prc_qcnt] = prc;
1305 NBX_prc_ptr[NBX_prc_qcnt] = ptr;
1306 NBX_prc_sz[NBX_prc_qcnt] = sz;
1307 NBX_prc_n_recv[NBX_prc_qcnt] = n_recv;
1308 NBX_prc_prc_recv[NBX_prc_qcnt] = prc_recv;
1309 NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1310 NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1311
1312 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN_PRC;
1313 if (NBX_prc_qcnt == 0)
1314 {NBX_prc_cnt_base = NBX_cnt;}
1315 }
1316
1357 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
1358 size_t prc[] , void * ptr[],
1359 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1360 void * ptr_arg, long int opt = NONE)
1361 {
1362 #ifdef VCLUSTER_PERF_REPORT
1363 timer nbx_timer;
1364 nbx_timer.start();
1365
1366 #endif
1367
1368 NBX_prc_qcnt++;
1369 if (NBX_prc_qcnt != 0)
1370 {
1371 std::cout << __FILE__ << ":" << __LINE__ << " error there are some asynchronous call running you have to complete them before go back to synchronous" << std::endl;
1372 return;
1373 }
1374
1375 queue_all_sends(n_send,sz,prc,ptr);
1376
1377 this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1378 this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1379
1380 rid[NBX_prc_qcnt] = 0;
1381 int flag = false;
1382
1384 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1385 NBX_prc_cnt_base = NBX_cnt;
1386
1387 log.start(10);
1388
1389 // Wait that all the send are acknowledge
1390 do
1391 {
1393
1394 // Check if all processor reached the async barrier
1396 {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1397
1398 // produce a report if communication get stuck
1400
1401 } while (flag == false);
1402
1403 // Remove the executed request
1404
1405 req.clear();
1406 stat.clear();
1407 log.clear();
1408
1409 // Circular counter
1410 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1411 NBX_prc_qcnt = -1;
1412
1413 #ifdef VCLUSTER_PERF_REPORT
1414 nbx_timer.stop();
1415 time_spent += nbx_timer.getwct();
1416 #endif
1417 }
1418
1463 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1464 size_t prc[] , void * ptr[],
1465 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1466 void * ptr_arg, long int opt = NONE)
1467 {
1468 NBX_prc_qcnt++;
1469 queue_all_sends(n_send,sz,prc,ptr);
1470
1471 this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1472 this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1473
1474 rid[NBX_prc_qcnt] = 0;
1475
1477 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1478
1479 log.start(10);
1480 if (NBX_prc_qcnt == 0)
1481 {NBX_prc_cnt_base = NBX_cnt;}
1482
1483 return;
1484 }
1485
1491 {
1492 for (unsigned int j = 0 ; j < NQUEUE ; j++)
1493 {
1494 if (NBX_active[j] == NBX_Type::NBX_UNACTIVE)
1495 {continue;}
1496
1497 if (NBX_active[j] == NBX_Type::NBX_KNOWN_PRC)
1498 {
1499 execute();
1500
1501 // Circular counter
1502 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1503
1504 // Allocate the buffers
1505
1506 for (size_t i = 0 ; i < NBX_prc_n_send[j] ; i++)
1507 {send(NBX_prc_prc[j][i],SEND_SPARSE + NBX_cnt*131072,NBX_prc_ptr[j][i],NBX_prc_sz[j][i]);}
1508
1509 for (size_t i = 0 ; i < NBX_prc_n_recv[j] ; i++)
1510 {
1511 void * ptr_recv = NBX_prc_msg_alloc[j](sz_recv_tmp.get(i),0,0,NBX_prc_prc_recv[j][i],i,0,this->NBX_prc_ptr_arg[j]);
1512
1513 recv(NBX_prc_prc_recv[j][i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1514 }
1515
1516 NBX_active[j] = NBX_Type::NBX_KNOWN;
1517 }
1518
1519 if (NBX_active[j] == NBX_Type::NBX_KNOWN)
1520 {
1521 execute();
1522
1523 // Circular counter
1524 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1525 NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1526
1527 continue;
1528 }
1529
1530 int flag = false;
1531
1532 // Wait that all the send are acknowledge
1533 do
1534 {
1536
1537 // Check if all processor reached the async barrier
1539 {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1540
1541 // produce a report if communication get stuck
1542 log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[j],bar_stat);
1543
1544 } while (flag == false);
1545
1546 // Remove the executed request
1547
1548 req.clear();
1549 stat.clear();
1550 log.clear();
1551
1552 // Circular counter
1553 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1554 NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1555
1556 }
1557
1558 NBX_prc_qcnt = -1;
1559 return;
1560 }
1561
1580 bool send(size_t proc, size_t tag, const void * mem, size_t sz)
1581 {
1582 // send over MPI
1583
1584 // Create one request
1585 req.add();
1586
1587 // send
1588 MPI_IsendWB::send(proc,SEND_RECV_BASE + tag,mem,sz,req.last());
1589
1590 return true;
1591 }
1592
1593
1611 template<typename T, typename Mem, template<typename> class gr> bool send(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1612 {
1613#ifdef SE_CLASS1
1614 checkType<T>();
1615#endif
1616
1617 // send over MPI
1618
1619 // Create one request
1620 req.add();
1621
1622 // send
1623 MPI_IsendW<T,Mem,gr>::send(proc,SEND_RECV_BASE + tag,v,req.last());
1624
1625 return true;
1626 }
1627
1646 bool recv(size_t proc, size_t tag, void * v, size_t sz)
1647 {
1648 // recv over MPI
1649
1650 // Create one request
1651 req.add();
1652
1653 // receive
1654 MPI_IrecvWB::recv(proc,SEND_RECV_BASE + tag,v,sz,req.last());
1655
1656 return true;
1657 }
1658
1676 template<typename T, typename Mem, template<typename> class gr> bool recv(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1677 {
1678#ifdef SE_CLASS1
1679 checkType<T>();
1680#endif
1681
1682 // recv over MPI
1683
1684 // Create one request
1685 req.add();
1686
1687 // receive
1688 MPI_IrecvW<T>::recv(proc,SEND_RECV_BASE + tag,v,req.last());
1689
1690 return true;
1691 }
1692
1705 template<typename T, typename Mem, template<typename> class gr> bool allGather(T & send, openfpm::vector<T,Mem,gr> & v)
1706 {
1707#ifdef SE_CLASS1
1708 checkType<T>();
1709#endif
1710
1711 // Create one request
1712 req.add();
1713
1714 // Number of processors
1715 v.resize(getProcessingUnits());
1716
1717 // gather
1718 MPI_IAllGatherW<T>::gather(&send,1,v.getPointer(),1,req.last());
1719
1720 return true;
1721 }
1722
1739 template<typename T, typename Mem, template<typename> class layout_base >
1741 {
1742#ifdef SE_CLASS1
1743 checkType<T>();
1744#endif
1745
1746 b_cast_helper<openfpm::vect_isel<T>::value == STD_VECTOR || is_layout_mlin<layout_base<T>>::value >::bcast_(req,v,root);
1747
1748 return true;
1749 }
1750
1754 void execute()
1755 {
1756 // if req == 0 return
1757 if (req.size() == 0)
1758 return;
1759
1760 // Wait for all the requests
1761 stat.resize(req.size());
1762
1763 MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));
1764
1765 // Remove executed request and status
1766 req.clear();
1767 stat.clear();
1768 }
1769
1774 void clear()
1775 {
1776 for (size_t i = 0 ; i < NQUEUE ; i++)
1777 {recv_buf[i].clear();}
1778 }
1779};
1780
1781
1782
1783
1784#endif
1785
General recv for vector of.
Set of wrapping classing for MPI_Iallreduce.
static void recv(size_t proc, size_t tag, void *buf, size_t sz, MPI_Request &req)
General recv for general buffer.
General recv for vector of.
General send for a vector of any type.
This class virtualize the cluster of PC as a set of processes that communicate.
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
void progressCommunication()
In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function progress th...
MPI_Request bar_req
barrier request
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
void execute()
Execute all the requests.
int NBX_prc_qcnt
NBX comunication on queue (-1 mean 0, 0 mean 1, 1 mean 2, .... )
int nbx_cycle
NBX_cycle.
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages Asynchronous version.
size_t rank()
Get the process unit id.
size_t size()
Get the total number of processors.
Vcluster_log log
log file
void sum(T &num)
Sum the numbers across all processors and get the result.
openfpm::vector_fr< BMemory< InternalMemory > > recv_buf[NQUEUE]
Receive buffers.
bool send(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Send data to a processor.
Vcluster_base(const Vcluster_base &)
disable copy constructor
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
int m_size
number of processes
void clear()
Release the buffer used for communication.
bool Bcast(openfpm::vector< T, Mem, layout_base > &v, size_t root)
Broadcast the data to all processors.
Vcluster_base(int *argc, char ***argv)
Virtual cluster constructor.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
void sendrecvMultipleMessagesNBXAsync(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], size_t sz_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
size_t getProcessUnitID()
Get the process unit id.
openfpm::vector< size_t > proc_com
openfpm::vector< size_t > sz_send[NQUEUE]
vector of the size of send buffers
void min(T &num)
Get the minimum number across all processors (or reduction with insinity norm)
size_t getProcessingUnits()
Get the total number of processors.
openfpm::vector< void * > ptr_send[NQUEUE]
vector of pointers of send buffers
std::vector< red > r
gpu::ofp_context_t & getgpuContext(bool iw=true)
If nvidia cuda is activated return a gpu context.
Vcluster_base & operator=(const Vcluster_base &)
disable operator=
bool recv(size_t proc, size_t tag, openfpm::vector< T, Mem, gr > &v)
Recv data from a processor.
int m_rank
actual rank
openfpm::vector< int > map_scatter
vector that contain the scatter map (it is basically an array of one)
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
openfpm::vector< size_t > tags[NQUEUE]
tags receiving
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
int shmrank
rank within the node
std::vector< int > post_exe
vector of functions to execute after all the request has been performed
openfpm::vector< MPI_Request > req
vector of MPI requests
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
int numPE
number of processing unit per process
bool NBX_prc_reached_bar_req[NQUEUE]
Is the barrier request reached.
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
gpu::ofp_context_t * context
standard context for gpu (if cuda is detected otherwise is unused)
openfpm::vector< MPI_Status > stat
vector of MPI status
void sendrecvMultipleMessagesNBX(size_t n_send, size_t sz[], size_t prc[], void *ptr[], size_t n_recv, size_t prc_recv[], size_t sz_recv[], void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
MPI_Status bar_stat
barrier status
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Vcluster log.
Implementation of 1-D std::vector like structure.
size_t size()
Stub size.
Class for cpu time benchmarking.
Definition timer.hpp:28
void stop()
Stop the timer.
Definition timer.hpp:119
void start()
Start the timer.
Definition timer.hpp:90
double getwct()
Return the elapsed real time.
Definition timer.hpp:130
This class check if the type T has pointers inside.
Definition ids.hpp:19
temporal buffer for reductions
double d
double
float f
float
char c
char
short s
signed
unsigned int ui
unsigned integer
unsigned short us
unsigned short
int i
integer
unsigned char uc
unsigned char