OpenFPM  5.2.0
Project that contain the implementation of distributed structures
VCluster_unit_test_util.hpp
1 /*
2  * VCluster_unit_test_util.hpp
3  *
4  * Created on: May 30, 2015
5  * Author: i-bird
6  */
7 
8 #ifndef VCLUSTER_UNIT_TEST_UTIL_HPP_
9 #define VCLUSTER_UNIT_TEST_UTIL_HPP_
10 
11 #include "Point_test.hpp"
12 #include "VCluster_base.hpp"
13 #include "Vector/vector_test_util.hpp"
14 #include "VCluster/VCluster.hpp"
15 
16 constexpr int RECEIVE_UNKNOWN = 1;
17 constexpr int RECEIVE_SIZE_UNKNOWN = 2;
18 
19 constexpr int NBX = 1;
20 constexpr int NBX_ASYNC = 2;
21 constexpr int KNOWN_PRC = 3;
22 
23 constexpr int N_TRY = 2;
24 constexpr int N_LOOP = 67108864;
25 constexpr int BUFF_STEP = 524288;
26 constexpr int P_STRIDE = 17;
27 
28 bool totp_check;
29 size_t global_step = 0;
30 size_t global_rank;
31 
32 struct rcv_rm
33 {
34  openfpm::vector<size_t> * prc_recv;
36 };
37 
44 int mod(int x, int m) {
45  return (x%m + m)%m;
46 }
47 
48 // Alloc the buffer to receive the messages
49 
51 
52 void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i,size_t ri, size_t tag, void * ptr)
53 {
54  // convert the void pointer argument into a pointer to receiving buffers
56 
59 
60  if (create_vcluster().getProcessingUnits() <= 8)
61  {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,create_vcluster().getProcessingUnits()-1);}
62  else
63  {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,(size_t)8);}
64 
65  BOOST_REQUIRE_EQUAL(msg_i, global_step);
66 
68 
69  // Create the memory to receive the message
70  // msg_i contain the size of the message to receive
71  // i contain the processor id
72  v->get(i).resize(msg_i);
73 
74  // return the pointer of the allocated memory
75  return &(v->get(i).get(0));
76 }
77 
79 
80 // Alloc the buffer to receive the messages
81 
82 size_t id = 0;
84 
85 void * msg_alloc2(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
86 {
88 
89  v->resize(total_p);
90  prc_recv.resize(total_p);
91 
92  BOOST_REQUIRE_EQUAL(msg_i, global_step);
93 
94  id++;
95  v->get(id-1).resize(msg_i);
96  prc_recv.get(id-1) = i;
97  return &(v->get(id-1).get(0));
98 }
99 
100 void * msg_alloc3(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
101 {
103 
104  v->add();
105 
106  prc_recv.add();
107 
108  BOOST_REQUIRE_EQUAL(msg_i, global_step);
109 
110  v->last().resize(msg_i);
111  prc_recv.last() = i;
112  return &(v->last().get(0));
113 }
114 
115 void * msg_alloc4(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
116 {
117  rcv_rm * v = static_cast<rcv_rm *>(ptr);
118 
119  v->recv_message->add();
120 
121  v->prc_recv->add();
122 
123  BOOST_REQUIRE_EQUAL(msg_i, global_step);
124 
125  v->recv_message->last().resize(msg_i);
126  v->prc_recv->last() = i;
127  return &(v->recv_message->last().get(0));
128 }
129 
130 template<unsigned int ip, typename T> void commFunc(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
131 {
132  if (ip == NBX)
133  {vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);}
134  else
135  {
136  vcl.sendrecvMultipleMessagesNBXAsync(prc,data,msg_alloc,ptr_arg);
137 
138  vcl.progressCommunication();
139  usleep(1000);
140  vcl.progressCommunication();
141  usleep(1000);
142  vcl.progressCommunication();
143  usleep(1000);
144  vcl.progressCommunication();
145  usleep(1000);
146 
148  }
149 }
150 
151 
152 template<unsigned int ip>
153 void commFunc_low(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector<size_t> & sz_send ,
155  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
156  void * ptr_arg)
157 {
158  // Send and receive
159  vcl.sendrecvMultipleMessagesNBX(prc.size(),&sz_send.get(0),&prc.get(0),
160  &ptr.get(0),prc_recv.size(),&prc_recv.get(0),msg_alloc,ptr_arg);
161 }
162 
163 template<unsigned int ip, typename T>
164 void commFunc_null_odd(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
165 {
166  if (vcl.getProcessUnitID() % 2 == 0)
167  vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);
168  else
169  {
170  // No send check if passing null to sendrecv work
171  vcl.sendrecvMultipleMessagesNBX(prc.size(),(size_t *)NULL,(size_t *)NULL,(void **)NULL,msg_alloc,ptr_arg,NONE);
172  }
173 }
174 
175 template<unsigned int ip>
176 void commFunc_kn(Vcluster<> & vcl,
178  openfpm::vector<size_t> & recv_sz,
179  void * ptr)
180 {
181  if (ip == NBX)
182  {
183  vcl.sendrecvMultipleMessagesNBX(prc,message,prc_recv,recv_sz,msg_alloc,ptr);
184  }
185  else
186  {
187  vcl.sendrecvMultipleMessagesNBXAsync(prc,message,prc_recv,recv_sz,msg_alloc,ptr);
188 
189  vcl.progressCommunication();
190  usleep(1000);
191  vcl.progressCommunication();
192  usleep(1000);
193  vcl.progressCommunication();
194  usleep(1000);
195  vcl.progressCommunication();
196  usleep(1000);
197 
199  }
200 }
201 
202 template<unsigned int ip>
203 void commFunc_kn_prc(Vcluster<> & vcl,
206  openfpm::vector<size_t> & prc_recv,
207  openfpm::vector<size_t> & recv_sz,
208  void * ptr_arg)
209 {
212 
213  ptr.resize(message.size());
214  sz.resize(message.size());
215 
216  for (size_t i = 0 ; i < ptr.size() ; i++)
217  {
218  ptr.get(i) = message.get(i).getPointer();
219  sz.get(i) = message.get(i).size();
220  }
221 
222  if (ip == NBX)
223  {
224  vcl.sendrecvMultipleMessagesNBX(ptr.size(),(size_t *)sz.getPointer(),(size_t *)prc.getPointer(),(void **)ptr.getPointer(),
225  prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc,ptr_arg);
226  }
227  else
228  {
229  vcl.sendrecvMultipleMessagesNBXAsync(ptr.size(),(size_t *)sz.getPointer(),(size_t *)prc.getPointer(),(void **)ptr.getPointer(),
230  prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc,ptr_arg);
231 
232  vcl.progressCommunication();
233  usleep(1000);
234  vcl.progressCommunication();
235  usleep(1000);
236  vcl.progressCommunication();
237  usleep(1000);
238  vcl.progressCommunication();
239  usleep(1000);
240 
242  }
243 }
244 
245 template <unsigned int ip> std::string method()
246 {
247  return std::string("NBX");
248 }
249 
250 template<unsigned int ip> void test_no_send_some_peer()
251 {
252  Vcluster<> & vcl = create_vcluster();
253 
254  size_t n_proc = vcl.getProcessingUnits();
255 
256  // Check long communication with some peer not comunication
257 
258  size_t j = 4567;
259 
260  global_step = j;
261  // Processor step
262  long int ps = n_proc / (8 + 1);
263 
264  // send message
266  // recv message
268 
270 
271  // only even communicate
272 
273  if (vcl.getProcessUnitID() % 2 == 0)
274  {
275  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
276  {
277  size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc;
278  if (p_id != vcl.getProcessUnitID())
279  {
280  prc.add(p_id);
281  message.add();
282  std::ostringstream msg;
283  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
284  std::string str(msg.str());
285  message.last().resize(j);
286  memset(message.last().getPointer(),0,j);
287  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
288  }
289  }
290  }
291 
292  recv_message.resize(n_proc);
293 
294 #ifdef VERBOSE_TEST
295  timer t;
296  t.start();
297 #endif
298 
299  commFunc_null_odd<ip>(vcl,prc,message,msg_alloc,&recv_message);
300 
301 #ifdef VERBOSE_TEST
302  t.stop();
303  double clk = t.getwct();
304  double clk_max = clk;
305 
306  size_t size_send_recv = 2 * j * (prc.size());
307  vcl.sum(size_send_recv);
308  vcl.max(clk_max);
309  vcl.execute();
310 
311  if (vcl.getProcessUnitID() == 0)
312  std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
313 #endif
314 
315  // Check the message
316  for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++)
317  {
318  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
319  if (p_id < 0)
320  p_id += n_proc;
321  else
322  p_id = p_id % n_proc;
323 
324  if (p_id != (long int)vcl.getProcessUnitID())
325  {
326  // only even processor communicate
327  if (p_id % 2 == 1)
328  continue;
329 
330  std::ostringstream msg;
331  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
332  std::string str(msg.str());
333  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
334  }
335  else
336  {
337  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
338  }
339  }
340 }
341 
342 
343 template<unsigned int ip> void test_known_vcl(size_t opt, Vcluster<> &vcl)
344 {
345  // send/recv messages
346 
347  global_rank = vcl.getProcessUnitID();
348  size_t n_proc = vcl.getProcessingUnits();
349 
350  // Checking short communication pattern
351 
352  for (size_t s = 0 ; s < N_TRY ; s++)
353  {
354  for (size_t j = 32 ; j < N_LOOP ; j*=2)
355  {
356  global_step = j;
357  // send message
359  // recv message
361  recv_message.reserve(n_proc);
362 
363  openfpm::vector<size_t> prc_recv;
364  openfpm::vector<size_t> recv_sz;
365 
367 
368  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
369  {
370  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
371  if (p_id != vcl.getProcessUnitID())
372  {
373  prc.add(p_id);
374  message.add();
375  std::ostringstream msg;
376  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
377  std::string str(msg.str());
378  message.last().resize(j);
379  memset(message.last().getPointer(),0,j);
380  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
381  }
382  }
383 
384  recv_message.resize(n_proc);
385  // The pattern is not really random preallocate the receive buffer
386  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
387  {
388  long int p_id = vcl.getProcessUnitID() - i - 1;
389  if (p_id < 0)
390  p_id += n_proc;
391  else
392  p_id = p_id % n_proc;
393 
394  if (p_id != (long int)vcl.getProcessUnitID())
395  {
396  prc_recv.add(p_id);
397  recv_message.get(p_id).resize(j);
398  recv_sz.add(j);
399  }
400  }
401 
402 #ifdef VERBOSE_TEST
403  timer t;
404  t.start();
405 #endif
406 
407  if (opt == KNOWN_PRC)
408  {commFunc_kn_prc<ip>(vcl,prc,message,prc_recv,recv_sz,&recv_message);}
409  else
410  {commFunc_kn<ip>(vcl,prc,message,prc_recv,recv_sz,&recv_message);}
411 
412 #ifdef VERBOSE_TEST
413  t.stop();
414 
415  double clk = t.getwct();
416  double clk_max = clk;
417 
418  size_t size_send_recv = 2 * j * (prc.size());
419  vcl.sum(size_send_recv);
420  vcl.max(clk_max);
421  vcl.execute();
422 
423  if (vcl.getProcessUnitID() == 0)
424  std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
425 #endif
426 
427  // Check the message
428  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
429  {
430  long int p_id = vcl.getProcessUnitID() - i - 1;
431  if (p_id < 0)
432  p_id += n_proc;
433  else
434  p_id = p_id % n_proc;
435 
436  if (p_id != (long int)vcl.getProcessUnitID())
437  {
438  std::ostringstream msg;
439  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
440  std::string str(msg.str());
441  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
442  }
443  else
444  {
445  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
446  }
447  }
448  }
449  }
450 }
451 
452 template<unsigned int ip> void test_known(size_t opt)
453 {
454  Vcluster<> & vcl = create_vcluster();
455  test_known_vcl<ip>(opt, vcl);
456 }
457 
458 template<unsigned int ip> void test_known_multiple(size_t opt)
459 {
460  Vcluster<> & vcl = create_vcluster();
461 
462  // send/recv messages
463 
464  global_rank = vcl.getProcessUnitID();
465  size_t n_proc = vcl.getProcessingUnits();
466 
467  // Checking short communication pattern
468 
469  for (size_t s = 0 ; s < N_TRY ; s++)
470  {
471  for (size_t j = 32 ; j < N_LOOP ; j*=2)
472  {
473  global_step = j;
474  // send message
476  // recv message
478 
479  openfpm::vector<size_t> prc_recv[NQUEUE];
480  openfpm::vector<size_t> recv_sz[NQUEUE];
481 
482  openfpm::vector<void *> ptr[NQUEUE];
483  openfpm::vector<size_t> sz[NQUEUE];
484 
485 
487 
488  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
489  {
490  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
491  if (p_id != vcl.getProcessUnitID())
492  {
493  prc.add(p_id);
494  for (size_t k = 0 ; k < NQUEUE ; k++)
495  {
496  message[k].add();
497  std::ostringstream msg;
498  msg << "Hello " << k << " from " << vcl.getProcessUnitID() << " to " << p_id;
499  std::string str(msg.str());
500  message[k].last().resize(j);
501  memset(message[k].last().getPointer(),0,j);
502  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
503  }
504  }
505  }
506 
507  for (size_t k = 0 ; k < NQUEUE ; k++)
508  {
509  recv_message[k].resize(n_proc);
510  // The pattern is not really random preallocate the receive buffer
511  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
512  {
513  long int p_id = vcl.getProcessUnitID() - i - 1;
514  if (p_id < 0)
515  p_id += n_proc;
516  else
517  p_id = p_id % n_proc;
518 
519  if (p_id != (long int)vcl.getProcessUnitID())
520  {
521  prc_recv[k].add(p_id);
522  recv_message[k].get(p_id).resize(j);
523  recv_sz[k].add(j);
524  }
525  }
526  }
527 
528 #ifdef VERBOSE_TEST
529  timer t;
530  t.start();
531 #endif
532 
533  for (size_t k = 0 ; k < NQUEUE ; k++)
534  {
535  if (opt == KNOWN_PRC)
536  {
537  ptr[k].resize(message[k].size());
538  sz[k].resize(message[k].size());
539 
540  for (size_t i = 0 ; i < ptr[k].size() ; i++)
541  {
542  ptr[k].get(i) = message[k].get(i).getPointer();
543  sz[k].get(i) = message[k].get(i).size();
544  }
545 
546  vcl.sendrecvMultipleMessagesNBXAsync(ptr[k].size(),(size_t *)sz[k].getPointer(),(size_t *)prc.getPointer(),(void **)ptr[k].getPointer(),
547  prc_recv[k].size(),(size_t *)prc_recv[k].getPointer(),msg_alloc,&recv_message[k]);
548 
549  }
550  else
551  {
552  vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],prc_recv[k],recv_sz[k],msg_alloc,&recv_message[k]);
553  }
554  }
555 
556 
557  vcl.progressCommunication();
558  usleep(1000);
559  vcl.progressCommunication();
560  usleep(1000);
561  vcl.progressCommunication();
562  usleep(1000);
563  vcl.progressCommunication();
564  usleep(1000);
565 
567 
568 #ifdef VERBOSE_TEST
569  t.stop();
570 
571  double clk = t.getwct();
572  double clk_max = clk;
573 
574  size_t size_send_recv = 2 * j * (prc.size());
575  vcl.sum(size_send_recv);
576  vcl.max(clk_max);
577  vcl.execute();
578 
579  if (vcl.getProcessUnitID() == 0)
580  std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
581 #endif
582 
583  for (size_t k = 0 ; k < NQUEUE ; k++)
584  {
585  // Check the message
586  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
587  {
588  long int p_id = vcl.getProcessUnitID() - i - 1;
589  if (p_id < 0)
590  p_id += n_proc;
591  else
592  p_id = p_id % n_proc;
593 
594  if (p_id != (long int)vcl.getProcessUnitID())
595  {
596  std::ostringstream msg;
597  msg << "Hello " << k << " from " << p_id << " to " << vcl.getProcessUnitID();
598  std::string str(msg.str());
599  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(p_id).get(0))),true);
600  }
601  else
602  {
603  BOOST_REQUIRE_EQUAL((size_t)0,recv_message[k].get(p_id).size());
604  }
605  }
606  }
607  }
608  }
609 }
610 
611 template<unsigned int ip> void test_short(unsigned int opt)
612 {
613  Vcluster<> & vcl = create_vcluster();
614 
615  // send/recv messages
616 
617  global_rank = vcl.getProcessUnitID();
618  size_t n_proc = vcl.getProcessingUnits();
619 
620  // Checking short communication pattern
621 
622  for (size_t s = 0 ; s < N_TRY ; s++)
623  {
624  for (size_t j = 32 ; j < N_LOOP ; j*=2)
625  {
626  global_step = j;
627 
629 
630  // We send one message for each processor (one message is an openfpm::vector<unsigned char>)
631  // or an array of bytes
633 
634  // receving messages. Each receiving message is an openfpm::vector<unsigned char>
635  // or an array if bytes
637 
638  // each processor communicate based on a list of processor
640 
641  // We construct the processor list in particular in this case
642  // each processor communicate with the 8 next (in id) processors
643  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
644  {
645  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
646 
647  // avoid to communicate with yourself
648  if (p_id != vcl.getProcessUnitID())
649  {
650  // Create an hello message
651  prc.add(p_id);
652  message.add();
653  std::ostringstream msg;
654  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
655  std::string str(msg.str());
656  message.last().resize(j);
657  memset(message.last().getPointer(),0,j);
658  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
659  }
660  }
661 
662  // For simplicity we create in advance a receiving buffer for all processors
663  recv_message.resize(n_proc);
664 
665  // The pattern is not really random preallocate the receive buffer
666  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
667  {
668  long int p_id = vcl.getProcessUnitID() - i - 1;
669  if (p_id < 0)
670  p_id += n_proc;
671  else
672  p_id = p_id % n_proc;
673 
674  if (p_id != (long int)vcl.getProcessUnitID())
675  recv_message.get(p_id).resize(j);
676  }
677 
678  if (opt == RECEIVE_UNKNOWN)
679  {
680  // Send and receive
681  commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message);
682 
683  }
685  else if (opt == RECEIVE_SIZE_UNKNOWN)
686  {
687  openfpm::vector<size_t> sz_send;
689 
690  openfpm::vector<size_t> prc_recv;
691 
692  sz_send.resize(prc.size());
693  ptr.resize(prc.size());
694 
695  for (size_t i = 0 ; i < prc.size() ; i++)
696  {
697  sz_send.get(i) = message.get(i).size();
698  ptr.get(i) = &message.get(i).get(0);
699  }
700 
701  // Calculate the receiving part
702 
703  // Check the message
704  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
705  {
706  long int p_id = vcl.getProcessUnitID() - i - 1;
707  if (p_id < 0)
708  {p_id += n_proc;}
709  else
710  {p_id = p_id % n_proc;}
711 
712  if (p_id != (long int)vcl.getProcessUnitID())
713  {
714  prc_recv.add(p_id);
715  }
716  }
717 
719 
720  commFunc_low<ip>(vcl,prc,sz_send,ptr,prc_recv,msg_alloc,&recv_message);
721  }
722 
723 #ifdef VERBOSE_TEST
724  timer t;
725  t.start();
726 #endif
727 
728 
729 #ifdef VERBOSE_TEST
730  t.stop();
731 
732  double clk = t.getwct();
733  double clk_max = clk;
734 
735  size_t size_send_recv = 2 * j * (prc.size());
736  vcl.sum(size_send_recv);
737  vcl.max(clk_max);
738  vcl.execute();
739 
740  if (vcl.getProcessUnitID() == 0)
741  {std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";}
742 #endif
743 
744  // Check the message
745  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
746  {
747  long int p_id = vcl.getProcessUnitID() - i - 1;
748  if (p_id < 0)
749  p_id += n_proc;
750  else
751  p_id = p_id % n_proc;
752 
753  if (p_id != (long int)vcl.getProcessUnitID())
754  {
755  std::ostringstream msg;
756  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
757  std::string str(msg.str());
758  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
759  }
760  else
761  {
762  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
763  }
764  }
765  }
766  }
767 }
768 
769 template<unsigned int ip> void test_short_multiple(unsigned int opt)
770 {
771  Vcluster<> & vcl = create_vcluster();
772 
773  // send/recv messages
774 
775  global_rank = vcl.getProcessUnitID();
776  size_t n_proc = vcl.getProcessingUnits();
777 
778  // Checking short communication pattern
779 
780  for (size_t s = 0 ; s < N_TRY ; s++)
781  {
782  for (size_t j = 32 ; j < N_LOOP ; j*=2)
783  {
784  global_step = j;
785 
787 
788  // We send one message for each processor (one message is an openfpm::vector<unsigned char>)
789  // or an array of bytes
791 
792  // receving messages. Each receiving message is an openfpm::vector<unsigned char>
793  // or an array if bytes
794 
796 
797  for (size_t i = 0 ; i < NQUEUE ; i++)
798  {recv_message[i].resize(n_proc);}
799 
800  // each processor communicate based on a list of processor
802 
803  // We construct the processor list in particular in this case
804  // each processor communicate with the 8 next (in id) processors
805  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
806  {
807  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
808 
809  // avoid to communicate with yourself
810  if (p_id != vcl.getProcessUnitID())
811  {
812  // Create an hello message
813  prc.add(p_id);
814  for (size_t k = 0 ; k < NQUEUE ; k++)
815  {
816  message[k].add();
817  std::ostringstream msg;
818  msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << p_id;
819  std::string str(msg.str());
820  message[k].last().resize(j);
821  memset(message[k].last().getPointer(),0,j);
822  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
823  }
824  }
825  }
826 
827  openfpm::vector<size_t> sz_send[NQUEUE];
828  openfpm::vector<void *> ptr[NQUEUE];
829 
830  openfpm::vector<size_t> prc_recv[NQUEUE];
831 
832  // For simplicity we create in advance a receiving buffer for all processors
833  for (size_t k = 0 ; k < NQUEUE ; k++)
834  {
835  recv_message[k].resize(n_proc);
836 
837  // The pattern is not really random preallocate the receive buffer
838  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
839  {
840  long int p_id = vcl.getProcessUnitID() - i - 1;
841  if (p_id < 0)
842  p_id += n_proc;
843  else
844  p_id = p_id % n_proc;
845 
846  if (p_id != (long int)vcl.getProcessUnitID())
847  recv_message[k].get(p_id).resize(j);
848  }
849 
850  if (opt == RECEIVE_UNKNOWN)
851  {
852  vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc,&recv_message[k]);
853  }
855  else if (opt == RECEIVE_SIZE_UNKNOWN)
856  {
857  sz_send[k].resize(prc.size());
858  ptr[k].resize(prc.size());
859 
860  for (size_t i = 0 ; i < prc.size() ; i++)
861  {
862  sz_send[k].get(i) = message[k].get(i).size();
863  ptr[k].get(i) = &message[k].get(i).get(0);
864  }
865 
866  // Calculate the receiving part
867 
868  // Check the message
869  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
870  {
871  long int p_id = vcl.getProcessUnitID() - i - 1;
872  if (p_id < 0)
873  {p_id += n_proc;}
874  else
875  {p_id = p_id % n_proc;}
876 
877  if (p_id != (long int)vcl.getProcessUnitID())
878  {
879  prc_recv[k].add(p_id);
880  }
881  }
882 
884 
885  vcl.sendrecvMultipleMessagesNBXAsync(prc.size(),&sz_send[k].get(0),&prc.get(0),
886  &ptr[k].get(0),prc_recv[k].size(),&prc_recv[k].get(0),msg_alloc,&recv_message[k]);
887  }
888 
889  }
890 
891  vcl.progressCommunication();
892  usleep(1000);
893  vcl.progressCommunication();
894  usleep(1000);
895  vcl.progressCommunication();
896  usleep(1000);
897  vcl.progressCommunication();
898  usleep(1000);
899 
901 
902 #ifdef VERBOSE_TEST
903  timer t;
904  t.start();
905 #endif
906 
907 
908 #ifdef VERBOSE_TEST
909  t.stop();
910 
911  double clk = t.getwct();
912  double clk_max = clk;
913 
914  size_t size_send_recv = 2 * j * (prc.size());
915  vcl.sum(size_send_recv);
916  vcl.max(clk_max);
917  vcl.execute();
918 
919  if (vcl.getProcessUnitID() == 0)
920  {std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";}
921 #endif
922 
923  for (size_t k = 0 ; k < NQUEUE ; k++)
924  {
925  // Check the message
926  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
927  {
928  long int p_id = vcl.getProcessUnitID() - i - 1;
929  if (p_id < 0)
930  p_id += n_proc;
931  else
932  p_id = p_id % n_proc;
933 
934  if (p_id != (long int)vcl.getProcessUnitID())
935  {
936  std::ostringstream msg;
937  msg << "H" << k << " from " << p_id << " to " << vcl.getProcessUnitID();
938  std::string str(msg.str());
939  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(p_id).get(0))),true);
940  }
941  else
942  {
943  BOOST_REQUIRE_EQUAL((size_t)0,recv_message[k].get(p_id).size());
944  }
945  }
946  }
947  }
948  }
949 }
950 
951 template<unsigned int ip> void test_random(unsigned int opt)
952 {
953  Vcluster<> & vcl = create_vcluster();
954 
955  // send/recv messages
956 
957  global_rank = vcl.getProcessUnitID();
958  size_t n_proc = vcl.getProcessingUnits();
959 
960  for (size_t s = 0 ; s < N_TRY ; s++)
961  {
962  if (opt == RECEIVE_SIZE_UNKNOWN)
963  {return;}
964 
965  std::srand(create_vcluster().getProcessUnitID());
966  std::default_random_engine eg;
967  std::uniform_int_distribution<int> d(0,n_proc/8);
968 
969  // Check random pattern (maximum 16 processors)
970 
971  for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2)
972  {
973  global_step = j;
974  // original send
976  // send message
978  // recv message
980 // recv_message.reserve(n_proc);
981 
983 
984  for (size_t i = 0 ; i < n_proc ; i++)
985  {
986  // randomly with which processor communicate
987  if (d(eg) == 0)
988  {
989  prc.add(i);
990  o_send.add(i);
991  message.add();
992  message.last().fill(0);
993  std::ostringstream msg;
994  msg << "Hello from " << vcl.getProcessUnitID() << " to " << i;
995  std::string str(msg.str());
996  message.last().resize(str.size());
997  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
998  message.last().resize(j);
999  }
1000  }
1001 
1002  id = 0;
1003  prc_recv.clear();
1004 
1005 
1006 #ifdef VERBOSE_TEST
1007  timer t;
1008  t.start();
1009 #endif
1010 
1011  commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message);
1012 
1013 #ifdef VERBOSE_TEST
1014  t.stop();
1015  double clk = t.getwct();
1016  double clk_max = clk;
1017 
1018  size_t size_send_recv = (prc.size() + recv_message.size()) * j;
1019  vcl.sum(size_send_recv);
1020  vcl.sum(clk);
1021  vcl.max(clk_max);
1022  vcl.execute();
1023  clk /= vcl.getProcessingUnits();
1024 
1025  if (vcl.getProcessUnitID() == 0)
1026  std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n";
1027 #endif
1028 
1029  // Check the message
1030 
1031  for (size_t i = 0 ; i < recv_message.size() ; i++)
1032  {
1033  std::ostringstream msg;
1034  msg << "Hello from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
1035  std::string str(msg.str());
1036  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
1037  }
1038 
1039  // Reply back
1040 
1041  // Create the message
1042 
1043  prc.clear();
1044  message.clear();
1045  for (size_t i = 0 ; i < prc_recv.size() ; i++)
1046  {
1047  prc.add(prc_recv.get(i));
1048  message.add();
1049  std::ostringstream msg;
1050  msg << "Hey from " << vcl.getProcessUnitID() << " to " << prc_recv.get(i);
1051  std::string str(msg.str());
1052  message.last().resize(str.size());
1053  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
1054  message.last().resize(j);
1055  }
1056 
1057  id = 0;
1058  prc_recv.clear();
1059  recv_message.clear();
1060 
1061  commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message);
1062 
1063  // Check if the received hey message match the original send
1064 
1065  BOOST_REQUIRE_EQUAL(o_send.size(),prc_recv.size());
1066 
1067  for (size_t i = 0 ; i < o_send.size() ; i++)
1068  {
1069  size_t j = 0;
1070  for ( ; j < prc_recv.size() ; j++)
1071  {
1072  if (o_send.get(i) == prc_recv.get(j))
1073  {
1074  // found the message check it
1075 
1076  std::ostringstream msg;
1077  msg << "Hey from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
1078  std::string str(msg.str());
1079  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
1080  break;
1081  }
1082  }
1083  // Check that we find always a match
1084  BOOST_REQUIRE_EQUAL(j != prc_recv.size(),true);
1085  }
1086  }
1087 
1088  // Check long communication pattern
1089 
1090  for (size_t j = 32 ; j < N_LOOP ; j*=2)
1091  {
1092  global_step = j;
1093  // Processor step
1094  long int ps = n_proc / (8 + 1);
1095 
1096  // send message
1098  // recv message
1099  openfpm::vector<openfpm::vector<unsigned char>> recv_message(n_proc);
1100 
1102 
1103  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
1104  {
1105  size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc;
1106  if (p_id != vcl.getProcessUnitID())
1107  {
1108  prc.add(p_id);
1109  message.add();
1110  std::ostringstream msg;
1111  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
1112  std::string str(msg.str());
1113  message.last().resize(j);
1114  memset(message.last().getPointer(),0,j);
1115  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
1116  }
1117  }
1118 
1119  recv_message.resize(n_proc);
1120  // The pattern is not really random preallocate the receive buffer
1121  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
1122  {
1123  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
1124  if (p_id < 0)
1125  p_id += n_proc;
1126  else
1127  p_id = p_id % n_proc;
1128 
1129  if (p_id != (long int)vcl.getProcessUnitID())
1130  recv_message.get(p_id).resize(j);
1131  }
1132 
1133 #ifdef VERBOSE_TEST
1134  timer t;
1135  t.start();
1136 #endif
1137 
1138  commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message);
1139 
1140 #ifdef VERBOSE_TEST
1141  t.stop();
1142  double clk = t.getwct();
1143  double clk_max = clk;
1144 
1145  size_t size_send_recv = 2 * j * (prc.size());
1146  vcl.sum(size_send_recv);
1147  vcl.max(clk_max);
1148  vcl.execute();
1149 
1150  if (vcl.getProcessUnitID() == 0)
1151  std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
1152 #endif
1153 
1154  // Check the message
1155  for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++)
1156  {
1157  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
1158  if (p_id < 0)
1159  p_id += n_proc;
1160  else
1161  p_id = p_id % n_proc;
1162 
1163  if (p_id != (long int)vcl.getProcessUnitID())
1164  {
1165  std::ostringstream msg;
1166  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
1167  std::string str(msg.str());
1168  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
1169  }
1170  else
1171  {
1172  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
1173  }
1174  }
1175  }
1176  }
1177 }
1178 
1179 template<unsigned int ip> void test_random_multiple(unsigned int opt)
1180 {
1181  Vcluster<> & vcl = create_vcluster();
1182 
1183  // send/recv messages
1184 
1185  global_rank = vcl.getProcessUnitID();
1186  size_t n_proc = vcl.getProcessingUnits();
1187 
1188  for (size_t s = 0 ; s < N_TRY ; s++)
1189  {
1190  if (opt == RECEIVE_SIZE_UNKNOWN)
1191  {return;}
1192 
1193  std::srand(create_vcluster().getProcessUnitID());
1194  std::default_random_engine eg;
1195  std::uniform_int_distribution<int> d(0,n_proc/8);
1196 
1197  rcv_rm rcv[NQUEUE];
1198 
1199  // Check random pattern (maximum 16 processors)
1200 
1201  for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2)
1202  {
1203  global_step = j;
1204  // original send
1205  openfpm::vector<size_t> o_send[NQUEUE];
1206  // send message
1208  // recv message
1209  openfpm::vector<openfpm::vector<unsigned char>> recv_message[NQUEUE];
1210 // recv_message.reserve(n_proc);
1211  openfpm::vector<size_t> prc_recv[NQUEUE];
1212 
1214 
1215  for (size_t i = 0 ; i < n_proc ; i++)
1216  {
1217  // randomly with which processor communicate
1218  if (d(eg) == 0)
1219  {
1220  prc.add(i);
1221  for (size_t k = 0 ; k < NQUEUE ; k++)
1222  {
1223  o_send[k].add(i);
1224  message[k].add();
1225  message[k].last().fill(0);
1226  std::ostringstream msg;
1227  msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << i;
1228  std::string str(msg.str());
1229  message[k].last().resize(str.size());
1230  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
1231  message[k].last().resize(j);
1232  }
1233  }
1234  }
1235 
1236  id = 0;
1237 
1238 
1239 #ifdef VERBOSE_TEST
1240  timer t;
1241  t.start();
1242 #endif
1243 
1244  for (size_t k = 0 ; k < NQUEUE ; k++)
1245  {
1246  rcv[k].prc_recv = &prc_recv[k];
1247  rcv[k].recv_message = &recv_message[k];
1248 
1249  vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc4,&rcv[k]);
1250  }
1251 
1252 #ifdef VERBOSE_TEST
1253  t.stop();
1254  double clk = t.getwct();
1255  double clk_max = clk;
1256 
1257  size_t size_send_recv = (prc.size() + recv_message.size()) * j;
1258  vcl.sum(size_send_recv);
1259  vcl.sum(clk);
1260  vcl.max(clk_max);
1261  vcl.execute();
1262  clk /= vcl.getProcessingUnits();
1263 
1264  if (vcl.getProcessUnitID() == 0)
1265  std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n";
1266 #endif
1267 
1268  vcl.progressCommunication();
1269  usleep(1000);
1270  vcl.progressCommunication();
1271  usleep(1000);
1272  vcl.progressCommunication();
1273  usleep(1000);
1274  vcl.progressCommunication();
1275  usleep(1000);
1276 
1278 
1279  for (size_t k = 0 ; k < NQUEUE ; k++)
1280  {
1281  // Check the message
1282 
1283  for (size_t i = 0 ; i < recv_message[k].size() ; i++)
1284  {
1285  std::ostringstream msg;
1286  msg << "H" << k << " from " << prc_recv[k].get(i) << " to " << vcl.getProcessUnitID();
1287  std::string str(msg.str());
1288  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(i).get(0))),true);
1289  }
1290 
1291  // Reply back
1292 
1293  // Create the message
1294 
1295  prc.clear();
1296  message[k].clear();
1297  for (size_t i = 0 ; i < prc_recv[k].size() ; i++)
1298  {
1299  prc.add(prc_recv[k].get(i));
1300  message[k].add();
1301  std::ostringstream msg;
1302  msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << prc_recv[k].get(i);
1303  std::string str(msg.str());
1304  message[k].last().resize(str.size());
1305  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
1306  message[k].last().resize(j);
1307  }
1308 
1309  id = 0;
1310  prc_recv[k].clear();
1311  recv_message[k].clear();
1312 
1313  rcv_rm rr;
1314  rr.prc_recv = &prc_recv[k];
1315  rr.recv_message = &recv_message[k];
1316 
1317  vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc4,&rr);
1318 
1319  vcl.progressCommunication();
1320  usleep(1000);
1321  vcl.progressCommunication();
1322  usleep(1000);
1323  vcl.progressCommunication();
1324  usleep(1000);
1325  vcl.progressCommunication();
1326  usleep(1000);
1327 
1329 
1330  // Check if the received hey message match the original send
1331 
1332  BOOST_REQUIRE_EQUAL(o_send[k].size(),prc_recv[k].size());
1333 
1334  for (size_t i = 0 ; i < o_send[k].size() ; i++)
1335  {
1336  size_t j = 0;
1337  for ( ; j < prc_recv[k].size() ; j++)
1338  {
1339  if (o_send[k].get(i) == prc_recv[k].get(j))
1340  {
1341  // found the message check it
1342 
1343  std::ostringstream msg;
1344  msg << "H" << k << " from " << prc_recv[k].get(i) << " to " << vcl.getProcessUnitID();
1345  std::string str(msg.str());
1346  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(i).get(0))),true);
1347  break;
1348  }
1349  }
1350  // Check that we find always a match
1351  BOOST_REQUIRE_EQUAL(j != prc_recv[k].size(),true);
1352  }
1353  }
1354  }
1355  }
1356 }
1357 
1364 void test_send_recv_complex(const size_t n, Vcluster<> & vcl)
1365 {
1367 
1368  // Point test typedef
1369  typedef Point_test<float> p;
1370 
1371  openfpm::vector<Point_test<float>> v_send = allocate_openfpm_fill(n,vcl.getProcessUnitID());
1372 
1373  // Send to 8 processors
1374  for (size_t i = 0 ; i < 8 ; i++)
1375  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send);
1376 
1378  pt_buf.resize(8);
1379 
1380  // Recv from 8 processors
1381  for (size_t i = 0 ; i < 8 ; i++)
1382  {
1383  pt_buf.get(i).resize(n);
1384  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i));
1385  }
1386 
1387  vcl.execute();
1388 
1390 
1391  // Check the received buffers (careful at negative modulo)
1392  for (size_t i = 0 ; i < 8 ; i++)
1393  {
1394  for (size_t j = 0 ; j < n ; j++)
1395  {
1396  Point_test<float> pt = pt_buf.get(i).get(j);
1397 
1398  size_t p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1399 
1400  BOOST_REQUIRE_EQUAL(pt.template get<p::x>(),p_recv);
1401  BOOST_REQUIRE_EQUAL(pt.template get<p::y>(),p_recv);
1402  BOOST_REQUIRE_EQUAL(pt.template get<p::z>(),p_recv);
1403  BOOST_REQUIRE_EQUAL(pt.template get<p::s>(),p_recv);
1404  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[0],p_recv);
1405  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[1],p_recv);
1406  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[2],p_recv);
1407  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][0],p_recv);
1408  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][1],p_recv);
1409  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][2],p_recv);
1410  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][0],p_recv);
1411  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][1],p_recv);
1412  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][2],p_recv);
1413  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][0],p_recv);
1414  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][1],p_recv);
1415  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][2],p_recv);
1416  }
1417  }
1418 }
1419 
1427 template<typename T> void test_send_recv_primitives(size_t n, Vcluster<> & vcl)
1428 {
1429  openfpm::vector<T> v_send = allocate_openfpm_primitive<T>(n,vcl.getProcessUnitID());
1430 
1431  {
1433 
1434  // Send to 8 processors
1435  for (size_t i = 0 ; i < 8 ; i++)
1436  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send);
1437 
1439  pt_buf.resize(8);
1440 
1441  // Recv from 8 processors
1442  for (size_t i = 0 ; i < 8 ; i++)
1443  {
1444  pt_buf.get(i).resize(n);
1445  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i));
1446  }
1447 
1448  vcl.execute();
1449 
1451 
1452  // Check the received buffers (careful at negative modulo)
1453  for (size_t i = 0 ; i < 8 ; i++)
1454  {
1455  for (size_t j = 0 ; j < n ; j++)
1456  {
1457  T pt = pt_buf.get(i).get(j);
1458 
1459  T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1460 
1461  BOOST_REQUIRE_EQUAL(pt,p_recv);
1462  }
1463  }
1464 
1465  }
1466 
1467  {
1469 
1470  // Send to 8 processors
1471  for (size_t i = 0 ; i < 8 ; i++)
1472  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send.getPointer(),v_send.size()*sizeof(T));
1473 
1475  pt_buf.resize(8);
1476 
1477  // Recv from 8 processors
1478  for (size_t i = 0 ; i < 8 ; i++)
1479  {
1480  pt_buf.get(i).resize(n);
1481  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i).getPointer(),pt_buf.get(i).size()*sizeof(T));
1482  }
1483 
1484  vcl.execute();
1485 
1487 
1488  // Check the received buffers (careful at negative modulo)
1489  for (size_t i = 0 ; i < 8 ; i++)
1490  {
1491  for (size_t j = 0 ; j < n ; j++)
1492  {
1493  T pt = pt_buf.get(i).get(j);
1494 
1495  T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1496 
1497  BOOST_REQUIRE_EQUAL(pt,p_recv);
1498  }
1499  }
1500 
1501  }
1502 }
1503 
1504 template<typename T> void test_single_all_gather_primitives(Vcluster<> & vcl)
1505 {
1507 
1508  openfpm::vector<T> clt;
1509  T data = vcl.getProcessUnitID();
1510 
1511  vcl.allGather(data,clt);
1512  vcl.execute();
1513 
1514  for (size_t i = 0 ; i < vcl.getProcessingUnits() ; i++)
1515  BOOST_REQUIRE_EQUAL(i,(size_t)clt.get(i));
1516 
1518 
1519 }
1520 
1521 #endif /* VCLUSTER_UNIT_TEST_UTIL_HPP_ */
Test structure used for several test.
Definition: Point_test.hpp:106
void progressCommunication()
In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function progress th...
void execute()
Execute all the requests.
void sum(T &num)
Sum the numbers across all processors and get the result.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
size_t getProcessUnitID()
Get the process unit id.
size_t getProcessingUnits()
Get the total number of processors.
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Implementation of VCluster class.
Definition: VCluster.hpp:59
size_t size()
Stub size.
Definition: map_vector.hpp:212
Class for cpu time benchmarking.
Definition: timer.hpp:28
KeyT const ValueT ValueT OffsetIteratorT OffsetIteratorT int
[in] The number of segments that comprise the sorting data