OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
 
Loading...
Searching...
No Matches
VCluster_unit_test_util.hpp
1/*
2 * VCluster_unit_test_util.hpp
3 *
4 * Created on: May 30, 2015
5 * Author: i-bird
6 */
7
8#ifndef VCLUSTER_UNIT_TEST_UTIL_HPP_
9#define VCLUSTER_UNIT_TEST_UTIL_HPP_
10
11#include "Point_test.hpp"
12#include "VCluster_base.hpp"
13#include "Vector/vector_test_util.hpp"
14#include "VCluster/VCluster.hpp"
15
16constexpr int RECEIVE_UNKNOWN = 1;
17constexpr int RECEIVE_SIZE_UNKNOWN = 2;
18
19constexpr int NBX = 1;
20constexpr int NBX_ASYNC = 2;
21constexpr int KNOWN_PRC = 3;
22
23constexpr int N_TRY = 2;
24constexpr int N_LOOP = 67108864;
25constexpr int BUFF_STEP = 524288;
26constexpr int P_STRIDE = 17;
27
28bool totp_check;
29size_t global_step = 0;
30size_t global_rank;
31
32struct rcv_rm
33{
34 openfpm::vector<size_t> * prc_recv;
36};
37
44int mod(int x, int m) {
45 return (x%m + m)%m;
46}
47
48// Alloc the buffer to receive the messages
49
51
52void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i,size_t ri, size_t tag, void * ptr)
53{
54 // convert the void pointer argument into a pointer to receiving buffers
56
59
60 if (create_vcluster().getProcessingUnits() <= 8)
61 {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,create_vcluster().getProcessingUnits()-1);}
62 else
63 {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,(size_t)8);}
64
65 BOOST_REQUIRE_EQUAL(msg_i, global_step);
66
68
69 // Create the memory to receive the message
70 // msg_i contain the size of the message to receive
71 // i contain the processor id
72 v->get(i).resize(msg_i);
73
74 // return the pointer of the allocated memory
75 return &(v->get(i).get(0));
76}
77
79
80// Alloc the buffer to receive the messages
81
82size_t id = 0;
84
85void * msg_alloc2(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
86{
88
89 v->resize(total_p);
90 prc_recv.resize(total_p);
91
92 BOOST_REQUIRE_EQUAL(msg_i, global_step);
93
94 id++;
95 v->get(id-1).resize(msg_i);
96 prc_recv.get(id-1) = i;
97 return &(v->get(id-1).get(0));
98}
99
100void * msg_alloc3(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
101{
103
104 v->add();
105
106 prc_recv.add();
107
108 BOOST_REQUIRE_EQUAL(msg_i, global_step);
109
110 v->last().resize(msg_i);
111 prc_recv.last() = i;
112 return &(v->last().get(0));
113}
114
115void * msg_alloc4(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
116{
117 rcv_rm * v = static_cast<rcv_rm *>(ptr);
118
119 v->recv_message->add();
120
121 v->prc_recv->add();
122
123 BOOST_REQUIRE_EQUAL(msg_i, global_step);
124
125 v->recv_message->last().resize(msg_i);
126 v->prc_recv->last() = i;
127 return &(v->recv_message->last().get(0));
128}
129
130template<unsigned int ip, typename T> void commFunc(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
131{
132 if (ip == NBX)
133 {vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);}
134 else
135 {
136 vcl.sendrecvMultipleMessagesNBXAsync(prc,data,msg_alloc,ptr_arg);
137
139 usleep(1000);
141 usleep(1000);
143 usleep(1000);
145 usleep(1000);
146
148 }
149}
150
151
152template<unsigned int ip>
153void commFunc_low(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector<size_t> & sz_send ,
155 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
156 void * ptr_arg)
157{
158 // Send and receive
159 vcl.sendrecvMultipleMessagesNBX(prc.size(),&sz_send.get(0),&prc.get(0),
160 &ptr.get(0),prc_recv.size(),&prc_recv.get(0),msg_alloc,ptr_arg);
161}
162
163template<unsigned int ip, typename T>
164void commFunc_null_odd(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
165{
166 if (vcl.getProcessUnitID() % 2 == 0)
167 vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);
168 else
169 {
170 // No send check if passing null to sendrecv work
171 vcl.sendrecvMultipleMessagesNBX(prc.size(),(size_t *)NULL,(size_t *)NULL,(void **)NULL,msg_alloc,ptr_arg,NONE);
172 }
173}
174
175template<unsigned int ip>
176void commFunc_kn(Vcluster<> & vcl,
178 openfpm::vector<size_t> & recv_sz,
179 void * ptr)
180{
181 if (ip == NBX)
182 {
183 vcl.sendrecvMultipleMessagesNBX(prc,message,prc_recv,recv_sz,msg_alloc,ptr);
184 }
185 else
186 {
187 vcl.sendrecvMultipleMessagesNBXAsync(prc,message,prc_recv,recv_sz,msg_alloc,ptr);
188
190 usleep(1000);
192 usleep(1000);
194 usleep(1000);
196 usleep(1000);
197
199 }
200}
201
202template<unsigned int ip>
203void commFunc_kn_prc(Vcluster<> & vcl,
206 openfpm::vector<size_t> & prc_recv,
207 openfpm::vector<size_t> & recv_sz,
208 void * ptr_arg)
209{
212
213 ptr.resize(message.size());
214 sz.resize(message.size());
215
216 for (size_t i = 0 ; i < ptr.size() ; i++)
217 {
218 ptr.get(i) = message.get(i).getPointer();
219 sz.get(i) = message.get(i).size();
220 }
221
222 if (ip == NBX)
223 {
224 vcl.sendrecvMultipleMessagesNBX(ptr.size(),(size_t *)sz.getPointer(),(size_t *)prc.getPointer(),(void **)ptr.getPointer(),
225 prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc,ptr_arg);
226 }
227 else
228 {
229 vcl.sendrecvMultipleMessagesNBXAsync(ptr.size(),(size_t *)sz.getPointer(),(size_t *)prc.getPointer(),(void **)ptr.getPointer(),
230 prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc,ptr_arg);
231
233 usleep(1000);
235 usleep(1000);
237 usleep(1000);
239 usleep(1000);
240
242 }
243}
244
245template <unsigned int ip> std::string method()
246{
247 return std::string("NBX");
248}
249
250template<unsigned int ip> void test_no_send_some_peer()
251{
252 Vcluster<> & vcl = create_vcluster();
253
254 size_t n_proc = vcl.getProcessingUnits();
255
256 // Check long communication with some peer not comunication
257
258 size_t j = 4567;
259
260 global_step = j;
261 // Processor step
262 long int ps = n_proc / (8 + 1);
263
264 // send message
266 // recv message
268
270
271 // only even communicate
272
273 if (vcl.getProcessUnitID() % 2 == 0)
274 {
275 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
276 {
277 size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc;
278 if (p_id != vcl.getProcessUnitID())
279 {
280 prc.add(p_id);
281 message.add();
282 std::ostringstream msg;
283 msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
284 std::string str(msg.str());
285 message.last().resize(j);
286 memset(message.last().getPointer(),0,j);
287 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
288 }
289 }
290 }
291
292 recv_message.resize(n_proc);
293
294#ifdef VERBOSE_TEST
295 timer t;
296 t.start();
297#endif
298
299 commFunc_null_odd<ip>(vcl,prc,message,msg_alloc,&recv_message);
300
301#ifdef VERBOSE_TEST
302 t.stop();
303 double clk = t.getwct();
304 double clk_max = clk;
305
306 size_t size_send_recv = 2 * j * (prc.size());
307 vcl.sum(size_send_recv);
308 vcl.max(clk_max);
309 vcl.execute();
310
311 if (vcl.getProcessUnitID() == 0)
312 std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
313#endif
314
315 // Check the message
316 for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++)
317 {
318 long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
319 if (p_id < 0)
320 p_id += n_proc;
321 else
322 p_id = p_id % n_proc;
323
324 if (p_id != (long int)vcl.getProcessUnitID())
325 {
326 // only even processor communicate
327 if (p_id % 2 == 1)
328 continue;
329
330 std::ostringstream msg;
331 msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
332 std::string str(msg.str());
333 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
334 }
335 else
336 {
337 BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
338 }
339 }
340}
341
342template<unsigned int ip> void test_known(size_t opt)
343{
344 Vcluster<> & vcl = create_vcluster();
345
346 // send/recv messages
347
348 global_rank = vcl.getProcessUnitID();
349 size_t n_proc = vcl.getProcessingUnits();
350
351 // Checking short communication pattern
352
353 for (size_t s = 0 ; s < N_TRY ; s++)
354 {
355 for (size_t j = 32 ; j < N_LOOP ; j*=2)
356 {
357 global_step = j;
358 // send message
360 // recv message
362 recv_message.reserve(n_proc);
363
366
368
369 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
370 {
371 size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
372 if (p_id != vcl.getProcessUnitID())
373 {
374 prc.add(p_id);
375 message.add();
376 std::ostringstream msg;
377 msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
378 std::string str(msg.str());
379 message.last().resize(j);
380 memset(message.last().getPointer(),0,j);
381 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
382 }
383 }
384
385 recv_message.resize(n_proc);
386 // The pattern is not really random preallocate the receive buffer
387 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
388 {
389 long int p_id = vcl.getProcessUnitID() - i - 1;
390 if (p_id < 0)
391 p_id += n_proc;
392 else
393 p_id = p_id % n_proc;
394
395 if (p_id != (long int)vcl.getProcessUnitID())
396 {
397 prc_recv.add(p_id);
398 recv_message.get(p_id).resize(j);
399 recv_sz.add(j);
400 }
401 }
402
403#ifdef VERBOSE_TEST
404 timer t;
405 t.start();
406#endif
407
408 if (opt == KNOWN_PRC)
409 {commFunc_kn_prc<ip>(vcl,prc,message,prc_recv,recv_sz,&recv_message);}
410 else
411 {commFunc_kn<ip>(vcl,prc,message,prc_recv,recv_sz,&recv_message);}
412
413#ifdef VERBOSE_TEST
414 t.stop();
415
416 double clk = t.getwct();
417 double clk_max = clk;
418
419 size_t size_send_recv = 2 * j * (prc.size());
420 vcl.sum(size_send_recv);
421 vcl.max(clk_max);
422 vcl.execute();
423
424 if (vcl.getProcessUnitID() == 0)
425 std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
426#endif
427
428 // Check the message
429 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
430 {
431 long int p_id = vcl.getProcessUnitID() - i - 1;
432 if (p_id < 0)
433 p_id += n_proc;
434 else
435 p_id = p_id % n_proc;
436
437 if (p_id != (long int)vcl.getProcessUnitID())
438 {
439 std::ostringstream msg;
440 msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
441 std::string str(msg.str());
442 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
443 }
444 else
445 {
446 BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
447 }
448 }
449 }
450 }
451}
452
453template<unsigned int ip> void test_known_multiple(size_t opt)
454{
455 Vcluster<> & vcl = create_vcluster();
456
457 // send/recv messages
458
459 global_rank = vcl.getProcessUnitID();
460 size_t n_proc = vcl.getProcessingUnits();
461
462 // Checking short communication pattern
463
464 for (size_t s = 0 ; s < N_TRY ; s++)
465 {
466 for (size_t j = 32 ; j < N_LOOP ; j*=2)
467 {
468 global_step = j;
469 // send message
471 // recv message
473
474 openfpm::vector<size_t> prc_recv[NQUEUE];
475 openfpm::vector<size_t> recv_sz[NQUEUE];
476
477 openfpm::vector<void *> ptr[NQUEUE];
478 openfpm::vector<size_t> sz[NQUEUE];
479
480
482
483 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
484 {
485 size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
486 if (p_id != vcl.getProcessUnitID())
487 {
488 prc.add(p_id);
489 for (size_t k = 0 ; k < NQUEUE ; k++)
490 {
491 message[k].add();
492 std::ostringstream msg;
493 msg << "Hello " << k << " from " << vcl.getProcessUnitID() << " to " << p_id;
494 std::string str(msg.str());
495 message[k].last().resize(j);
496 memset(message[k].last().getPointer(),0,j);
497 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
498 }
499 }
500 }
501
502 for (size_t k = 0 ; k < NQUEUE ; k++)
503 {
504 recv_message[k].resize(n_proc);
505 // The pattern is not really random preallocate the receive buffer
506 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
507 {
508 long int p_id = vcl.getProcessUnitID() - i - 1;
509 if (p_id < 0)
510 p_id += n_proc;
511 else
512 p_id = p_id % n_proc;
513
514 if (p_id != (long int)vcl.getProcessUnitID())
515 {
516 prc_recv[k].add(p_id);
517 recv_message[k].get(p_id).resize(j);
518 recv_sz[k].add(j);
519 }
520 }
521 }
522
523#ifdef VERBOSE_TEST
524 timer t;
525 t.start();
526#endif
527
528 for (size_t k = 0 ; k < NQUEUE ; k++)
529 {
530 if (opt == KNOWN_PRC)
531 {
532 ptr[k].resize(message[k].size());
533 sz[k].resize(message[k].size());
534
535 for (size_t i = 0 ; i < ptr[k].size() ; i++)
536 {
537 ptr[k].get(i) = message[k].get(i).getPointer();
538 sz[k].get(i) = message[k].get(i).size();
539 }
540
541 vcl.sendrecvMultipleMessagesNBXAsync(ptr[k].size(),(size_t *)sz[k].getPointer(),(size_t *)prc.getPointer(),(void **)ptr[k].getPointer(),
542 prc_recv[k].size(),(size_t *)prc_recv[k].getPointer(),msg_alloc,&recv_message[k]);
543
544 }
545 else
546 {
547 vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],prc_recv[k],recv_sz[k],msg_alloc,&recv_message[k]);
548 }
549 }
550
551
553 usleep(1000);
555 usleep(1000);
557 usleep(1000);
559 usleep(1000);
560
562
563#ifdef VERBOSE_TEST
564 t.stop();
565
566 double clk = t.getwct();
567 double clk_max = clk;
568
569 size_t size_send_recv = 2 * j * (prc.size());
570 vcl.sum(size_send_recv);
571 vcl.max(clk_max);
572 vcl.execute();
573
574 if (vcl.getProcessUnitID() == 0)
575 std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
576#endif
577
578 for (size_t k = 0 ; k < NQUEUE ; k++)
579 {
580 // Check the message
581 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
582 {
583 long int p_id = vcl.getProcessUnitID() - i - 1;
584 if (p_id < 0)
585 p_id += n_proc;
586 else
587 p_id = p_id % n_proc;
588
589 if (p_id != (long int)vcl.getProcessUnitID())
590 {
591 std::ostringstream msg;
592 msg << "Hello " << k << " from " << p_id << " to " << vcl.getProcessUnitID();
593 std::string str(msg.str());
594 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(p_id).get(0))),true);
595 }
596 else
597 {
598 BOOST_REQUIRE_EQUAL((size_t)0,recv_message[k].get(p_id).size());
599 }
600 }
601 }
602 }
603 }
604}
605
606template<unsigned int ip> void test_short(unsigned int opt)
607{
608 Vcluster<> & vcl = create_vcluster();
609
610 // send/recv messages
611
612 global_rank = vcl.getProcessUnitID();
613 size_t n_proc = vcl.getProcessingUnits();
614
615 // Checking short communication pattern
616
617 for (size_t s = 0 ; s < N_TRY ; s++)
618 {
619 for (size_t j = 32 ; j < N_LOOP ; j*=2)
620 {
621 global_step = j;
622
624
625 // We send one message for each processor (one message is an openfpm::vector<unsigned char>)
626 // or an array of bytes
628
629 // receving messages. Each receiving message is an openfpm::vector<unsigned char>
630 // or an array if bytes
632
633 // each processor communicate based on a list of processor
635
636 // We construct the processor list in particular in this case
637 // each processor communicate with the 8 next (in id) processors
638 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
639 {
640 size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
641
642 // avoid to communicate with yourself
643 if (p_id != vcl.getProcessUnitID())
644 {
645 // Create an hello message
646 prc.add(p_id);
647 message.add();
648 std::ostringstream msg;
649 msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
650 std::string str(msg.str());
651 message.last().resize(j);
652 memset(message.last().getPointer(),0,j);
653 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
654 }
655 }
656
657 // For simplicity we create in advance a receiving buffer for all processors
658 recv_message.resize(n_proc);
659
660 // The pattern is not really random preallocate the receive buffer
661 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
662 {
663 long int p_id = vcl.getProcessUnitID() - i - 1;
664 if (p_id < 0)
665 p_id += n_proc;
666 else
667 p_id = p_id % n_proc;
668
669 if (p_id != (long int)vcl.getProcessUnitID())
670 recv_message.get(p_id).resize(j);
671 }
672
673 if (opt == RECEIVE_UNKNOWN)
674 {
675 // Send and receive
676 commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message);
677
678 }
680 else if (opt == RECEIVE_SIZE_UNKNOWN)
681 {
684
686
687 sz_send.resize(prc.size());
688 ptr.resize(prc.size());
689
690 for (size_t i = 0 ; i < prc.size() ; i++)
691 {
692 sz_send.get(i) = message.get(i).size();
693 ptr.get(i) = &message.get(i).get(0);
694 }
695
696 // Calculate the receiving part
697
698 // Check the message
699 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
700 {
701 long int p_id = vcl.getProcessUnitID() - i - 1;
702 if (p_id < 0)
703 {p_id += n_proc;}
704 else
705 {p_id = p_id % n_proc;}
706
707 if (p_id != (long int)vcl.getProcessUnitID())
708 {
709 prc_recv.add(p_id);
710 }
711 }
712
714
715 commFunc_low<ip>(vcl,prc,sz_send,ptr,prc_recv,msg_alloc,&recv_message);
716 }
717
718#ifdef VERBOSE_TEST
719 timer t;
720 t.start();
721#endif
722
723
724#ifdef VERBOSE_TEST
725 t.stop();
726
727 double clk = t.getwct();
728 double clk_max = clk;
729
730 size_t size_send_recv = 2 * j * (prc.size());
731 vcl.sum(size_send_recv);
732 vcl.max(clk_max);
733 vcl.execute();
734
735 if (vcl.getProcessUnitID() == 0)
736 {std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";}
737#endif
738
739 // Check the message
740 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
741 {
742 long int p_id = vcl.getProcessUnitID() - i - 1;
743 if (p_id < 0)
744 p_id += n_proc;
745 else
746 p_id = p_id % n_proc;
747
748 if (p_id != (long int)vcl.getProcessUnitID())
749 {
750 std::ostringstream msg;
751 msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
752 std::string str(msg.str());
753 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
754 }
755 else
756 {
757 BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
758 }
759 }
760 }
761 }
762}
763
764template<unsigned int ip> void test_short_multiple(unsigned int opt)
765{
766 Vcluster<> & vcl = create_vcluster();
767
768 // send/recv messages
769
770 global_rank = vcl.getProcessUnitID();
771 size_t n_proc = vcl.getProcessingUnits();
772
773 // Checking short communication pattern
774
775 for (size_t s = 0 ; s < N_TRY ; s++)
776 {
777 for (size_t j = 32 ; j < N_LOOP ; j*=2)
778 {
779 global_step = j;
780
782
783 // We send one message for each processor (one message is an openfpm::vector<unsigned char>)
784 // or an array of bytes
786
787 // receving messages. Each receiving message is an openfpm::vector<unsigned char>
788 // or an array if bytes
789
791
792 for (size_t i = 0 ; i < NQUEUE ; i++)
793 {recv_message[i].resize(n_proc);}
794
795 // each processor communicate based on a list of processor
797
798 // We construct the processor list in particular in this case
799 // each processor communicate with the 8 next (in id) processors
800 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
801 {
802 size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
803
804 // avoid to communicate with yourself
805 if (p_id != vcl.getProcessUnitID())
806 {
807 // Create an hello message
808 prc.add(p_id);
809 for (size_t k = 0 ; k < NQUEUE ; k++)
810 {
811 message[k].add();
812 std::ostringstream msg;
813 msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << p_id;
814 std::string str(msg.str());
815 message[k].last().resize(j);
816 memset(message[k].last().getPointer(),0,j);
817 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
818 }
819 }
820 }
821
822 openfpm::vector<size_t> sz_send[NQUEUE];
823 openfpm::vector<void *> ptr[NQUEUE];
824
825 openfpm::vector<size_t> prc_recv[NQUEUE];
826
827 // For simplicity we create in advance a receiving buffer for all processors
828 for (size_t k = 0 ; k < NQUEUE ; k++)
829 {
830 recv_message[k].resize(n_proc);
831
832 // The pattern is not really random preallocate the receive buffer
833 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
834 {
835 long int p_id = vcl.getProcessUnitID() - i - 1;
836 if (p_id < 0)
837 p_id += n_proc;
838 else
839 p_id = p_id % n_proc;
840
841 if (p_id != (long int)vcl.getProcessUnitID())
842 recv_message[k].get(p_id).resize(j);
843 }
844
845 if (opt == RECEIVE_UNKNOWN)
846 {
847 vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc,&recv_message[k]);
848 }
850 else if (opt == RECEIVE_SIZE_UNKNOWN)
851 {
852 sz_send[k].resize(prc.size());
853 ptr[k].resize(prc.size());
854
855 for (size_t i = 0 ; i < prc.size() ; i++)
856 {
857 sz_send[k].get(i) = message[k].get(i).size();
858 ptr[k].get(i) = &message[k].get(i).get(0);
859 }
860
861 // Calculate the receiving part
862
863 // Check the message
864 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
865 {
866 long int p_id = vcl.getProcessUnitID() - i - 1;
867 if (p_id < 0)
868 {p_id += n_proc;}
869 else
870 {p_id = p_id % n_proc;}
871
872 if (p_id != (long int)vcl.getProcessUnitID())
873 {
874 prc_recv[k].add(p_id);
875 }
876 }
877
879
880 vcl.sendrecvMultipleMessagesNBXAsync(prc.size(),&sz_send[k].get(0),&prc.get(0),
881 &ptr[k].get(0),prc_recv[k].size(),&prc_recv[k].get(0),msg_alloc,&recv_message[k]);
882 }
883
884 }
885
887 usleep(1000);
889 usleep(1000);
891 usleep(1000);
893 usleep(1000);
894
896
897#ifdef VERBOSE_TEST
898 timer t;
899 t.start();
900#endif
901
902
903#ifdef VERBOSE_TEST
904 t.stop();
905
906 double clk = t.getwct();
907 double clk_max = clk;
908
909 size_t size_send_recv = 2 * j * (prc.size());
910 vcl.sum(size_send_recv);
911 vcl.max(clk_max);
912 vcl.execute();
913
914 if (vcl.getProcessUnitID() == 0)
915 {std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";}
916#endif
917
918 for (size_t k = 0 ; k < NQUEUE ; k++)
919 {
920 // Check the message
921 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
922 {
923 long int p_id = vcl.getProcessUnitID() - i - 1;
924 if (p_id < 0)
925 p_id += n_proc;
926 else
927 p_id = p_id % n_proc;
928
929 if (p_id != (long int)vcl.getProcessUnitID())
930 {
931 std::ostringstream msg;
932 msg << "H" << k << " from " << p_id << " to " << vcl.getProcessUnitID();
933 std::string str(msg.str());
934 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(p_id).get(0))),true);
935 }
936 else
937 {
938 BOOST_REQUIRE_EQUAL((size_t)0,recv_message[k].get(p_id).size());
939 }
940 }
941 }
942 }
943 }
944}
945
946template<unsigned int ip> void test_random(unsigned int opt)
947{
948 Vcluster<> & vcl = create_vcluster();
949
950 // send/recv messages
951
952 global_rank = vcl.getProcessUnitID();
953 size_t n_proc = vcl.getProcessingUnits();
954
955 for (size_t s = 0 ; s < N_TRY ; s++)
956 {
957 if (opt == RECEIVE_SIZE_UNKNOWN)
958 {return;}
959
960 std::srand(create_vcluster().getProcessUnitID());
961 std::default_random_engine eg;
962 std::uniform_int_distribution<int> d(0,n_proc/8);
963
964 // Check random pattern (maximum 16 processors)
965
966 for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2)
967 {
968 global_step = j;
969 // original send
971 // send message
973 // recv message
975// recv_message.reserve(n_proc);
976
978
979 for (size_t i = 0 ; i < n_proc ; i++)
980 {
981 // randomly with which processor communicate
982 if (d(eg) == 0)
983 {
984 prc.add(i);
985 o_send.add(i);
986 message.add();
987 message.last().fill(0);
988 std::ostringstream msg;
989 msg << "Hello from " << vcl.getProcessUnitID() << " to " << i;
990 std::string str(msg.str());
991 message.last().resize(str.size());
992 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
993 message.last().resize(j);
994 }
995 }
996
997 id = 0;
998 prc_recv.clear();
999
1000
1001#ifdef VERBOSE_TEST
1002 timer t;
1003 t.start();
1004#endif
1005
1006 commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message);
1007
1008#ifdef VERBOSE_TEST
1009 t.stop();
1010 double clk = t.getwct();
1011 double clk_max = clk;
1012
1013 size_t size_send_recv = (prc.size() + recv_message.size()) * j;
1014 vcl.sum(size_send_recv);
1015 vcl.sum(clk);
1016 vcl.max(clk_max);
1017 vcl.execute();
1018 clk /= vcl.getProcessingUnits();
1019
1020 if (vcl.getProcessUnitID() == 0)
1021 std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n";
1022#endif
1023
1024 // Check the message
1025
1026 for (size_t i = 0 ; i < recv_message.size() ; i++)
1027 {
1028 std::ostringstream msg;
1029 msg << "Hello from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
1030 std::string str(msg.str());
1031 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
1032 }
1033
1034 // Reply back
1035
1036 // Create the message
1037
1038 prc.clear();
1039 message.clear();
1040 for (size_t i = 0 ; i < prc_recv.size() ; i++)
1041 {
1042 prc.add(prc_recv.get(i));
1043 message.add();
1044 std::ostringstream msg;
1045 msg << "Hey from " << vcl.getProcessUnitID() << " to " << prc_recv.get(i);
1046 std::string str(msg.str());
1047 message.last().resize(str.size());
1048 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
1049 message.last().resize(j);
1050 }
1051
1052 id = 0;
1053 prc_recv.clear();
1054 recv_message.clear();
1055
1056 commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message);
1057
1058 // Check if the received hey message match the original send
1059
1060 BOOST_REQUIRE_EQUAL(o_send.size(),prc_recv.size());
1061
1062 for (size_t i = 0 ; i < o_send.size() ; i++)
1063 {
1064 size_t j = 0;
1065 for ( ; j < prc_recv.size() ; j++)
1066 {
1067 if (o_send.get(i) == prc_recv.get(j))
1068 {
1069 // found the message check it
1070
1071 std::ostringstream msg;
1072 msg << "Hey from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
1073 std::string str(msg.str());
1074 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
1075 break;
1076 }
1077 }
1078 // Check that we find always a match
1079 BOOST_REQUIRE_EQUAL(j != prc_recv.size(),true);
1080 }
1081 }
1082
1083 // Check long communication pattern
1084
1085 for (size_t j = 32 ; j < N_LOOP ; j*=2)
1086 {
1087 global_step = j;
1088 // Processor step
1089 long int ps = n_proc / (8 + 1);
1090
1091 // send message
1093 // recv message
1095
1097
1098 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
1099 {
1100 size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc;
1101 if (p_id != vcl.getProcessUnitID())
1102 {
1103 prc.add(p_id);
1104 message.add();
1105 std::ostringstream msg;
1106 msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
1107 std::string str(msg.str());
1108 message.last().resize(j);
1109 memset(message.last().getPointer(),0,j);
1110 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
1111 }
1112 }
1113
1114 recv_message.resize(n_proc);
1115 // The pattern is not really random preallocate the receive buffer
1116 for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
1117 {
1118 long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
1119 if (p_id < 0)
1120 p_id += n_proc;
1121 else
1122 p_id = p_id % n_proc;
1123
1124 if (p_id != (long int)vcl.getProcessUnitID())
1125 recv_message.get(p_id).resize(j);
1126 }
1127
1128#ifdef VERBOSE_TEST
1129 timer t;
1130 t.start();
1131#endif
1132
1133 commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message);
1134
1135#ifdef VERBOSE_TEST
1136 t.stop();
1137 double clk = t.getwct();
1138 double clk_max = clk;
1139
1140 size_t size_send_recv = 2 * j * (prc.size());
1141 vcl.sum(size_send_recv);
1142 vcl.max(clk_max);
1143 vcl.execute();
1144
1145 if (vcl.getProcessUnitID() == 0)
1146 std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
1147#endif
1148
1149 // Check the message
1150 for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++)
1151 {
1152 long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
1153 if (p_id < 0)
1154 p_id += n_proc;
1155 else
1156 p_id = p_id % n_proc;
1157
1158 if (p_id != (long int)vcl.getProcessUnitID())
1159 {
1160 std::ostringstream msg;
1161 msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
1162 std::string str(msg.str());
1163 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
1164 }
1165 else
1166 {
1167 BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
1168 }
1169 }
1170 }
1171 }
1172}
1173
1174template<unsigned int ip> void test_random_multiple(unsigned int opt)
1175{
1176 Vcluster<> & vcl = create_vcluster();
1177
1178 // send/recv messages
1179
1180 global_rank = vcl.getProcessUnitID();
1181 size_t n_proc = vcl.getProcessingUnits();
1182
1183 for (size_t s = 0 ; s < N_TRY ; s++)
1184 {
1185 if (opt == RECEIVE_SIZE_UNKNOWN)
1186 {return;}
1187
1188 std::srand(create_vcluster().getProcessUnitID());
1189 std::default_random_engine eg;
1190 std::uniform_int_distribution<int> d(0,n_proc/8);
1191
1192 rcv_rm rcv[NQUEUE];
1193
1194 // Check random pattern (maximum 16 processors)
1195
1196 for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2)
1197 {
1198 global_step = j;
1199 // original send
1200 openfpm::vector<size_t> o_send[NQUEUE];
1201 // send message
1203 // recv message
1205// recv_message.reserve(n_proc);
1206 openfpm::vector<size_t> prc_recv[NQUEUE];
1207
1209
1210 for (size_t i = 0 ; i < n_proc ; i++)
1211 {
1212 // randomly with which processor communicate
1213 if (d(eg) == 0)
1214 {
1215 prc.add(i);
1216 for (size_t k = 0 ; k < NQUEUE ; k++)
1217 {
1218 o_send[k].add(i);
1219 message[k].add();
1220 message[k].last().fill(0);
1221 std::ostringstream msg;
1222 msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << i;
1223 std::string str(msg.str());
1224 message[k].last().resize(str.size());
1225 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
1226 message[k].last().resize(j);
1227 }
1228 }
1229 }
1230
1231 id = 0;
1232
1233
1234#ifdef VERBOSE_TEST
1235 timer t;
1236 t.start();
1237#endif
1238
1239 for (size_t k = 0 ; k < NQUEUE ; k++)
1240 {
1241 rcv[k].prc_recv = &prc_recv[k];
1242 rcv[k].recv_message = &recv_message[k];
1243
1244 vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc4,&rcv[k]);
1245 }
1246
1247#ifdef VERBOSE_TEST
1248 t.stop();
1249 double clk = t.getwct();
1250 double clk_max = clk;
1251
1252 size_t size_send_recv = (prc.size() + recv_message.size()) * j;
1253 vcl.sum(size_send_recv);
1254 vcl.sum(clk);
1255 vcl.max(clk_max);
1256 vcl.execute();
1257 clk /= vcl.getProcessingUnits();
1258
1259 if (vcl.getProcessUnitID() == 0)
1260 std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n";
1261#endif
1262
1264 usleep(1000);
1266 usleep(1000);
1268 usleep(1000);
1270 usleep(1000);
1271
1273
1274 for (size_t k = 0 ; k < NQUEUE ; k++)
1275 {
1276 // Check the message
1277
1278 for (size_t i = 0 ; i < recv_message[k].size() ; i++)
1279 {
1280 std::ostringstream msg;
1281 msg << "H" << k << " from " << prc_recv[k].get(i) << " to " << vcl.getProcessUnitID();
1282 std::string str(msg.str());
1283 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(i).get(0))),true);
1284 }
1285
1286 // Reply back
1287
1288 // Create the message
1289
1290 prc.clear();
1291 message[k].clear();
1292 for (size_t i = 0 ; i < prc_recv[k].size() ; i++)
1293 {
1294 prc.add(prc_recv[k].get(i));
1295 message[k].add();
1296 std::ostringstream msg;
1297 msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << prc_recv[k].get(i);
1298 std::string str(msg.str());
1299 message[k].last().resize(str.size());
1300 std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
1301 message[k].last().resize(j);
1302 }
1303
1304 id = 0;
1305 prc_recv[k].clear();
1306 recv_message[k].clear();
1307
1308 rcv_rm rr;
1309 rr.prc_recv = &prc_recv[k];
1310 rr.recv_message = &recv_message[k];
1311
1312 vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc4,&rr);
1313
1315 usleep(1000);
1317 usleep(1000);
1319 usleep(1000);
1321 usleep(1000);
1322
1324
1325 // Check if the received hey message match the original send
1326
1327 BOOST_REQUIRE_EQUAL(o_send[k].size(),prc_recv[k].size());
1328
1329 for (size_t i = 0 ; i < o_send[k].size() ; i++)
1330 {
1331 size_t j = 0;
1332 for ( ; j < prc_recv[k].size() ; j++)
1333 {
1334 if (o_send[k].get(i) == prc_recv[k].get(j))
1335 {
1336 // found the message check it
1337
1338 std::ostringstream msg;
1339 msg << "H" << k << " from " << prc_recv[k].get(i) << " to " << vcl.getProcessUnitID();
1340 std::string str(msg.str());
1341 BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(i).get(0))),true);
1342 break;
1343 }
1344 }
1345 // Check that we find always a match
1346 BOOST_REQUIRE_EQUAL(j != prc_recv[k].size(),true);
1347 }
1348 }
1349 }
1350 }
1351}
1352
1359void test_send_recv_complex(const size_t n, Vcluster<> & vcl)
1360{
1362
1363 // Point test typedef
1364 typedef Point_test<float> p;
1365
1366 openfpm::vector<Point_test<float>> v_send = allocate_openfpm_fill(n,vcl.getProcessUnitID());
1367
1368 // Send to 8 processors
1369 for (size_t i = 0 ; i < 8 ; i++)
1370 vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send);
1371
1373 pt_buf.resize(8);
1374
1375 // Recv from 8 processors
1376 for (size_t i = 0 ; i < 8 ; i++)
1377 {
1378 pt_buf.get(i).resize(n);
1379 vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i));
1380 }
1381
1382 vcl.execute();
1383
1385
1386 // Check the received buffers (careful at negative modulo)
1387 for (size_t i = 0 ; i < 8 ; i++)
1388 {
1389 for (size_t j = 0 ; j < n ; j++)
1390 {
1391 Point_test<float> pt = pt_buf.get(i).get(j);
1392
1393 size_t p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1394
1395 BOOST_REQUIRE_EQUAL(pt.template get<p::x>(),p_recv);
1396 BOOST_REQUIRE_EQUAL(pt.template get<p::y>(),p_recv);
1397 BOOST_REQUIRE_EQUAL(pt.template get<p::z>(),p_recv);
1398 BOOST_REQUIRE_EQUAL(pt.template get<p::s>(),p_recv);
1399 BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[0],p_recv);
1400 BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[1],p_recv);
1401 BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[2],p_recv);
1402 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][0],p_recv);
1403 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][1],p_recv);
1404 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][2],p_recv);
1405 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][0],p_recv);
1406 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][1],p_recv);
1407 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][2],p_recv);
1408 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][0],p_recv);
1409 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][1],p_recv);
1410 BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][2],p_recv);
1411 }
1412 }
1413}
1414
1422template<typename T> void test_send_recv_primitives(size_t n, Vcluster<> & vcl)
1423{
1424 openfpm::vector<T> v_send = allocate_openfpm_primitive<T>(n,vcl.getProcessUnitID());
1425
1426 {
1428
1429 // Send to 8 processors
1430 for (size_t i = 0 ; i < 8 ; i++)
1431 vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send);
1432
1434 pt_buf.resize(8);
1435
1436 // Recv from 8 processors
1437 for (size_t i = 0 ; i < 8 ; i++)
1438 {
1439 pt_buf.get(i).resize(n);
1440 vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i));
1441 }
1442
1443 vcl.execute();
1444
1446
1447 // Check the received buffers (careful at negative modulo)
1448 for (size_t i = 0 ; i < 8 ; i++)
1449 {
1450 for (size_t j = 0 ; j < n ; j++)
1451 {
1452 T pt = pt_buf.get(i).get(j);
1453
1454 T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1455
1456 BOOST_REQUIRE_EQUAL(pt,p_recv);
1457 }
1458 }
1459
1460 }
1461
1462 {
1464
1465 // Send to 8 processors
1466 for (size_t i = 0 ; i < 8 ; i++)
1467 vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send.getPointer(),v_send.size()*sizeof(T));
1468
1470 pt_buf.resize(8);
1471
1472 // Recv from 8 processors
1473 for (size_t i = 0 ; i < 8 ; i++)
1474 {
1475 pt_buf.get(i).resize(n);
1476 vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i).getPointer(),pt_buf.get(i).size()*sizeof(T));
1477 }
1478
1479 vcl.execute();
1480
1482
1483 // Check the received buffers (careful at negative modulo)
1484 for (size_t i = 0 ; i < 8 ; i++)
1485 {
1486 for (size_t j = 0 ; j < n ; j++)
1487 {
1488 T pt = pt_buf.get(i).get(j);
1489
1490 T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1491
1492 BOOST_REQUIRE_EQUAL(pt,p_recv);
1493 }
1494 }
1495
1496 }
1497}
1498
1499template<typename T> void test_single_all_gather_primitives(Vcluster<> & vcl)
1500{
1502
1504 T data = vcl.getProcessUnitID();
1505
1506 vcl.allGather(data,clt);
1507 vcl.execute();
1508
1509 for (size_t i = 0 ; i < vcl.getProcessingUnits() ; i++)
1510 BOOST_REQUIRE_EQUAL(i,(size_t)clt.get(i));
1511
1513
1514}
1515
1516#endif /* VCLUSTER_UNIT_TEST_UTIL_HPP_ */
Test structure used for several test.
void progressCommunication()
In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function progress th...
void execute()
Execute all the requests.
void sum(T &num)
Sum the numbers across all processors and get the result.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t getProcessUnitID()
Get the process unit id.
size_t getProcessingUnits()
Get the total number of processors.
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Implementation of VCluster class.
Definition VCluster.hpp:59
Implementation of 1-D std::vector like structure.
size_t size()
Stub size.
Class for cpu time benchmarking.
Definition timer.hpp:28
void stop()
Stop the timer.
Definition timer.hpp:119
void start()
Start the timer.
Definition timer.hpp:90
double getwct()
Return the elapsed real time.
Definition timer.hpp:130
KeyT const ValueT ValueT OffsetIteratorT OffsetIteratorT int
[in] The number of segments that comprise the sorting data