OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
VCluster_unit_test_util.hpp
1 /*
2  * VCluster_unit_test_util.hpp
3  *
4  * Created on: May 30, 2015
5  * Author: i-bird
6  */
7 
8 #ifndef VCLUSTER_UNIT_TEST_UTIL_HPP_
9 #define VCLUSTER_UNIT_TEST_UTIL_HPP_
10 
11 #include "Point_test.hpp"
12 #include "VCluster_base.hpp"
13 #include "Vector/vector_test_util.hpp"
14 #include "VCluster/VCluster.hpp"
15 
16 constexpr int RECEIVE_UNKNOWN = 1;
17 constexpr int RECEIVE_SIZE_UNKNOWN = 2;
18 
19 constexpr int NBX = 1;
20 constexpr int NBX_ASYNC = 2;
21 constexpr int KNOWN_PRC = 3;
22 
23 constexpr int N_TRY = 2;
24 constexpr int N_LOOP = 67108864;
25 constexpr int BUFF_STEP = 524288;
26 constexpr int P_STRIDE = 17;
27 
28 bool totp_check;
29 size_t global_step = 0;
30 size_t global_rank;
31 
32 struct rcv_rm
33 {
34  openfpm::vector<size_t> * prc_recv;
36 };
37 
44 int mod(int x, int m) {
45  return (x%m + m)%m;
46 }
47 
48 // Alloc the buffer to receive the messages
49 
51 
52 void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i,size_t ri, size_t tag, void * ptr)
53 {
54  // convert the void pointer argument into a pointer to receiving buffers
56 
59 
60  if (create_vcluster().getProcessingUnits() <= 8)
61  {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,create_vcluster().getProcessingUnits()-1);}
62  else
63  {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,(size_t)8);}
64 
65  BOOST_REQUIRE_EQUAL(msg_i, global_step);
66 
68 
69  // Create the memory to receive the message
70  // msg_i contain the size of the message to receive
71  // i contain the processor id
72  v->get(i).resize(msg_i);
73 
74  // return the pointer of the allocated memory
75  return &(v->get(i).get(0));
76 }
77 
79 
80 // Alloc the buffer to receive the messages
81 
82 size_t id = 0;
84 
85 void * msg_alloc2(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
86 {
88 
89  v->resize(total_p);
90  prc_recv.resize(total_p);
91 
92  BOOST_REQUIRE_EQUAL(msg_i, global_step);
93 
94  id++;
95  v->get(id-1).resize(msg_i);
96  prc_recv.get(id-1) = i;
97  return &(v->get(id-1).get(0));
98 }
99 
100 void * msg_alloc3(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
101 {
103 
104  v->add();
105 
106  prc_recv.add();
107 
108  BOOST_REQUIRE_EQUAL(msg_i, global_step);
109 
110  v->last().resize(msg_i);
111  prc_recv.last() = i;
112  return &(v->last().get(0));
113 }
114 
115 void * msg_alloc4(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
116 {
117  rcv_rm * v = static_cast<rcv_rm *>(ptr);
118 
119  v->recv_message->add();
120 
121  v->prc_recv->add();
122 
123  BOOST_REQUIRE_EQUAL(msg_i, global_step);
124 
125  v->recv_message->last().resize(msg_i);
126  v->prc_recv->last() = i;
127  return &(v->recv_message->last().get(0));
128 }
129 
130 template<unsigned int ip, typename T> void commFunc(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
131 {
132  if (ip == NBX)
133  {vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);}
134  else
135  {
136  vcl.sendrecvMultipleMessagesNBXAsync(prc,data,msg_alloc,ptr_arg);
137 
138  vcl.progressCommunication();
139  usleep(1000);
140  vcl.progressCommunication();
141  usleep(1000);
142  vcl.progressCommunication();
143  usleep(1000);
144  vcl.progressCommunication();
145  usleep(1000);
146 
148  }
149 }
150 
151 
152 template<unsigned int ip>
153 void commFunc_low(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector<size_t> & sz_send ,
155  void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
156  void * ptr_arg)
157 {
158  // Send and receive
159  vcl.sendrecvMultipleMessagesNBX(prc.size(),&sz_send.get(0),&prc.get(0),
160  &ptr.get(0),prc_recv.size(),&prc_recv.get(0),msg_alloc,ptr_arg);
161 }
162 
163 template<unsigned int ip, typename T>
164 void commFunc_null_odd(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
165 {
166  if (vcl.getProcessUnitID() % 2 == 0)
167  vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);
168  else
169  {
170  // No send check if passing null to sendrecv work
171  vcl.sendrecvMultipleMessagesNBX(prc.size(),(size_t *)NULL,(size_t *)NULL,(void **)NULL,msg_alloc,ptr_arg,NONE);
172  }
173 }
174 
175 template<unsigned int ip>
176 void commFunc_kn(Vcluster<> & vcl,
178  openfpm::vector<size_t> & recv_sz,
179  void * ptr)
180 {
181  if (ip == NBX)
182  {
183  vcl.sendrecvMultipleMessagesNBX(prc,message,prc_recv,recv_sz,msg_alloc,ptr);
184  }
185  else
186  {
187  vcl.sendrecvMultipleMessagesNBXAsync(prc,message,prc_recv,recv_sz,msg_alloc,ptr);
188 
189  vcl.progressCommunication();
190  usleep(1000);
191  vcl.progressCommunication();
192  usleep(1000);
193  vcl.progressCommunication();
194  usleep(1000);
195  vcl.progressCommunication();
196  usleep(1000);
197 
199  }
200 }
201 
202 template<unsigned int ip>
203 void commFunc_kn_prc(Vcluster<> & vcl,
206  openfpm::vector<size_t> & prc_recv,
207  openfpm::vector<size_t> & recv_sz,
208  void * ptr_arg)
209 {
212 
213  ptr.resize(message.size());
214  sz.resize(message.size());
215 
216  for (size_t i = 0 ; i < ptr.size() ; i++)
217  {
218  ptr.get(i) = message.get(i).getPointer();
219  sz.get(i) = message.get(i).size();
220  }
221 
222  if (ip == NBX)
223  {
224  vcl.sendrecvMultipleMessagesNBX(ptr.size(),(size_t *)sz.getPointer(),(size_t *)prc.getPointer(),(void **)ptr.getPointer(),
225  prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc,ptr_arg);
226  }
227  else
228  {
229  vcl.sendrecvMultipleMessagesNBXAsync(ptr.size(),(size_t *)sz.getPointer(),(size_t *)prc.getPointer(),(void **)ptr.getPointer(),
230  prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc,ptr_arg);
231 
232  vcl.progressCommunication();
233  usleep(1000);
234  vcl.progressCommunication();
235  usleep(1000);
236  vcl.progressCommunication();
237  usleep(1000);
238  vcl.progressCommunication();
239  usleep(1000);
240 
242  }
243 }
244 
245 template <unsigned int ip> std::string method()
246 {
247  return std::string("NBX");
248 }
249 
250 template<unsigned int ip> void test_no_send_some_peer()
251 {
252  Vcluster<> & vcl = create_vcluster();
253 
254  size_t n_proc = vcl.getProcessingUnits();
255 
256  // Check long communication with some peer not comunication
257 
258  size_t j = 4567;
259 
260  global_step = j;
261  // Processor step
262  long int ps = n_proc / (8 + 1);
263 
264  // send message
266  // recv message
268 
270 
271  // only even communicate
272 
273  if (vcl.getProcessUnitID() % 2 == 0)
274  {
275  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
276  {
277  size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc;
278  if (p_id != vcl.getProcessUnitID())
279  {
280  prc.add(p_id);
281  message.add();
282  std::ostringstream msg;
283  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
284  std::string str(msg.str());
285  message.last().resize(j);
286  memset(message.last().getPointer(),0,j);
287  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
288  }
289  }
290  }
291 
292  recv_message.resize(n_proc);
293 
294 #ifdef VERBOSE_TEST
295  timer t;
296  t.start();
297 #endif
298 
299  commFunc_null_odd<ip>(vcl,prc,message,msg_alloc,&recv_message);
300 
301 #ifdef VERBOSE_TEST
302  t.stop();
303  double clk = t.getwct();
304  double clk_max = clk;
305 
306  size_t size_send_recv = 2 * j * (prc.size());
307  vcl.sum(size_send_recv);
308  vcl.max(clk_max);
309  vcl.execute();
310 
311  if (vcl.getProcessUnitID() == 0)
312  std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
313 #endif
314 
315  // Check the message
316  for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++)
317  {
318  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
319  if (p_id < 0)
320  p_id += n_proc;
321  else
322  p_id = p_id % n_proc;
323 
324  if (p_id != (long int)vcl.getProcessUnitID())
325  {
326  // only even processor communicate
327  if (p_id % 2 == 1)
328  continue;
329 
330  std::ostringstream msg;
331  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
332  std::string str(msg.str());
333  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
334  }
335  else
336  {
337  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
338  }
339  }
340 }
341 
342 template<unsigned int ip> void test_known(size_t opt)
343 {
344  Vcluster<> & vcl = create_vcluster();
345 
346  // send/recv messages
347 
348  global_rank = vcl.getProcessUnitID();
349  size_t n_proc = vcl.getProcessingUnits();
350 
351  // Checking short communication pattern
352 
353  for (size_t s = 0 ; s < N_TRY ; s++)
354  {
355  for (size_t j = 32 ; j < N_LOOP ; j*=2)
356  {
357  global_step = j;
358  // send message
360  // recv message
362  recv_message.reserve(n_proc);
363 
364  openfpm::vector<size_t> prc_recv;
365  openfpm::vector<size_t> recv_sz;
366 
368 
369  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
370  {
371  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
372  if (p_id != vcl.getProcessUnitID())
373  {
374  prc.add(p_id);
375  message.add();
376  std::ostringstream msg;
377  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
378  std::string str(msg.str());
379  message.last().resize(j);
380  memset(message.last().getPointer(),0,j);
381  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
382  }
383  }
384 
385  recv_message.resize(n_proc);
386  // The pattern is not really random preallocate the receive buffer
387  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
388  {
389  long int p_id = vcl.getProcessUnitID() - i - 1;
390  if (p_id < 0)
391  p_id += n_proc;
392  else
393  p_id = p_id % n_proc;
394 
395  if (p_id != (long int)vcl.getProcessUnitID())
396  {
397  prc_recv.add(p_id);
398  recv_message.get(p_id).resize(j);
399  recv_sz.add(j);
400  }
401  }
402 
403 #ifdef VERBOSE_TEST
404  timer t;
405  t.start();
406 #endif
407 
408  if (opt == KNOWN_PRC)
409  {commFunc_kn_prc<ip>(vcl,prc,message,prc_recv,recv_sz,&recv_message);}
410  else
411  {commFunc_kn<ip>(vcl,prc,message,prc_recv,recv_sz,&recv_message);}
412 
413 #ifdef VERBOSE_TEST
414  t.stop();
415 
416  double clk = t.getwct();
417  double clk_max = clk;
418 
419  size_t size_send_recv = 2 * j * (prc.size());
420  vcl.sum(size_send_recv);
421  vcl.max(clk_max);
422  vcl.execute();
423 
424  if (vcl.getProcessUnitID() == 0)
425  std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
426 #endif
427 
428  // Check the message
429  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
430  {
431  long int p_id = vcl.getProcessUnitID() - i - 1;
432  if (p_id < 0)
433  p_id += n_proc;
434  else
435  p_id = p_id % n_proc;
436 
437  if (p_id != (long int)vcl.getProcessUnitID())
438  {
439  std::ostringstream msg;
440  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
441  std::string str(msg.str());
442  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
443  }
444  else
445  {
446  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
447  }
448  }
449  }
450  }
451 }
452 
453 template<unsigned int ip> void test_known_multiple(size_t opt)
454 {
455  Vcluster<> & vcl = create_vcluster();
456 
457  // send/recv messages
458 
459  global_rank = vcl.getProcessUnitID();
460  size_t n_proc = vcl.getProcessingUnits();
461 
462  // Checking short communication pattern
463 
464  for (size_t s = 0 ; s < N_TRY ; s++)
465  {
466  for (size_t j = 32 ; j < N_LOOP ; j*=2)
467  {
468  global_step = j;
469  // send message
471  // recv message
473 
474  openfpm::vector<size_t> prc_recv[NQUEUE];
475  openfpm::vector<size_t> recv_sz[NQUEUE];
476 
477  openfpm::vector<void *> ptr[NQUEUE];
478  openfpm::vector<size_t> sz[NQUEUE];
479 
480 
482 
483  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
484  {
485  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
486  if (p_id != vcl.getProcessUnitID())
487  {
488  prc.add(p_id);
489  for (size_t k = 0 ; k < NQUEUE ; k++)
490  {
491  message[k].add();
492  std::ostringstream msg;
493  msg << "Hello " << k << " from " << vcl.getProcessUnitID() << " to " << p_id;
494  std::string str(msg.str());
495  message[k].last().resize(j);
496  memset(message[k].last().getPointer(),0,j);
497  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
498  }
499  }
500  }
501 
502  for (size_t k = 0 ; k < NQUEUE ; k++)
503  {
504  recv_message[k].resize(n_proc);
505  // The pattern is not really random preallocate the receive buffer
506  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
507  {
508  long int p_id = vcl.getProcessUnitID() - i - 1;
509  if (p_id < 0)
510  p_id += n_proc;
511  else
512  p_id = p_id % n_proc;
513 
514  if (p_id != (long int)vcl.getProcessUnitID())
515  {
516  prc_recv[k].add(p_id);
517  recv_message[k].get(p_id).resize(j);
518  recv_sz[k].add(j);
519  }
520  }
521  }
522 
523 #ifdef VERBOSE_TEST
524  timer t;
525  t.start();
526 #endif
527 
528  for (size_t k = 0 ; k < NQUEUE ; k++)
529  {
530  if (opt == KNOWN_PRC)
531  {
532  ptr[k].resize(message[k].size());
533  sz[k].resize(message[k].size());
534 
535  for (size_t i = 0 ; i < ptr[k].size() ; i++)
536  {
537  ptr[k].get(i) = message[k].get(i).getPointer();
538  sz[k].get(i) = message[k].get(i).size();
539  }
540 
541  vcl.sendrecvMultipleMessagesNBXAsync(ptr[k].size(),(size_t *)sz[k].getPointer(),(size_t *)prc.getPointer(),(void **)ptr[k].getPointer(),
542  prc_recv[k].size(),(size_t *)prc_recv[k].getPointer(),msg_alloc,&recv_message[k]);
543 
544  }
545  else
546  {
547  vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],prc_recv[k],recv_sz[k],msg_alloc,&recv_message[k]);
548  }
549  }
550 
551 
552  vcl.progressCommunication();
553  usleep(1000);
554  vcl.progressCommunication();
555  usleep(1000);
556  vcl.progressCommunication();
557  usleep(1000);
558  vcl.progressCommunication();
559  usleep(1000);
560 
562 
563 #ifdef VERBOSE_TEST
564  t.stop();
565 
566  double clk = t.getwct();
567  double clk_max = clk;
568 
569  size_t size_send_recv = 2 * j * (prc.size());
570  vcl.sum(size_send_recv);
571  vcl.max(clk_max);
572  vcl.execute();
573 
574  if (vcl.getProcessUnitID() == 0)
575  std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
576 #endif
577 
578  for (size_t k = 0 ; k < NQUEUE ; k++)
579  {
580  // Check the message
581  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
582  {
583  long int p_id = vcl.getProcessUnitID() - i - 1;
584  if (p_id < 0)
585  p_id += n_proc;
586  else
587  p_id = p_id % n_proc;
588 
589  if (p_id != (long int)vcl.getProcessUnitID())
590  {
591  std::ostringstream msg;
592  msg << "Hello " << k << " from " << p_id << " to " << vcl.getProcessUnitID();
593  std::string str(msg.str());
594  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(p_id).get(0))),true);
595  }
596  else
597  {
598  BOOST_REQUIRE_EQUAL((size_t)0,recv_message[k].get(p_id).size());
599  }
600  }
601  }
602  }
603  }
604 }
605 
606 template<unsigned int ip> void test_short(unsigned int opt)
607 {
608  Vcluster<> & vcl = create_vcluster();
609 
610  // send/recv messages
611 
612  global_rank = vcl.getProcessUnitID();
613  size_t n_proc = vcl.getProcessingUnits();
614 
615  // Checking short communication pattern
616 
617  for (size_t s = 0 ; s < N_TRY ; s++)
618  {
619  for (size_t j = 32 ; j < N_LOOP ; j*=2)
620  {
621  global_step = j;
622 
624 
625  // We send one message for each processor (one message is an openfpm::vector<unsigned char>)
626  // or an array of bytes
628 
629  // receving messages. Each receiving message is an openfpm::vector<unsigned char>
630  // or an array if bytes
632 
633  // each processor communicate based on a list of processor
635 
636  // We construct the processor list in particular in this case
637  // each processor communicate with the 8 next (in id) processors
638  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
639  {
640  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
641 
642  // avoid to communicate with yourself
643  if (p_id != vcl.getProcessUnitID())
644  {
645  // Create an hello message
646  prc.add(p_id);
647  message.add();
648  std::ostringstream msg;
649  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
650  std::string str(msg.str());
651  message.last().resize(j);
652  memset(message.last().getPointer(),0,j);
653  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
654  }
655  }
656 
657  // For simplicity we create in advance a receiving buffer for all processors
658  recv_message.resize(n_proc);
659 
660  // The pattern is not really random preallocate the receive buffer
661  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
662  {
663  long int p_id = vcl.getProcessUnitID() - i - 1;
664  if (p_id < 0)
665  p_id += n_proc;
666  else
667  p_id = p_id % n_proc;
668 
669  if (p_id != (long int)vcl.getProcessUnitID())
670  recv_message.get(p_id).resize(j);
671  }
672 
673  if (opt == RECEIVE_UNKNOWN)
674  {
675  // Send and receive
676  commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message);
677 
678  }
680  else if (opt == RECEIVE_SIZE_UNKNOWN)
681  {
682  openfpm::vector<size_t> sz_send;
684 
685  openfpm::vector<size_t> prc_recv;
686 
687  sz_send.resize(prc.size());
688  ptr.resize(prc.size());
689 
690  for (size_t i = 0 ; i < prc.size() ; i++)
691  {
692  sz_send.get(i) = message.get(i).size();
693  ptr.get(i) = &message.get(i).get(0);
694  }
695 
696  // Calculate the receiving part
697 
698  // Check the message
699  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
700  {
701  long int p_id = vcl.getProcessUnitID() - i - 1;
702  if (p_id < 0)
703  {p_id += n_proc;}
704  else
705  {p_id = p_id % n_proc;}
706 
707  if (p_id != (long int)vcl.getProcessUnitID())
708  {
709  prc_recv.add(p_id);
710  }
711  }
712 
714 
715  commFunc_low<ip>(vcl,prc,sz_send,ptr,prc_recv,msg_alloc,&recv_message);
716  }
717 
718 #ifdef VERBOSE_TEST
719  timer t;
720  t.start();
721 #endif
722 
723 
724 #ifdef VERBOSE_TEST
725  t.stop();
726 
727  double clk = t.getwct();
728  double clk_max = clk;
729 
730  size_t size_send_recv = 2 * j * (prc.size());
731  vcl.sum(size_send_recv);
732  vcl.max(clk_max);
733  vcl.execute();
734 
735  if (vcl.getProcessUnitID() == 0)
736  {std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";}
737 #endif
738 
739  // Check the message
740  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
741  {
742  long int p_id = vcl.getProcessUnitID() - i - 1;
743  if (p_id < 0)
744  p_id += n_proc;
745  else
746  p_id = p_id % n_proc;
747 
748  if (p_id != (long int)vcl.getProcessUnitID())
749  {
750  std::ostringstream msg;
751  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
752  std::string str(msg.str());
753  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
754  }
755  else
756  {
757  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
758  }
759  }
760  }
761  }
762 }
763 
764 template<unsigned int ip> void test_short_multiple(unsigned int opt)
765 {
766  Vcluster<> & vcl = create_vcluster();
767 
768  // send/recv messages
769 
770  global_rank = vcl.getProcessUnitID();
771  size_t n_proc = vcl.getProcessingUnits();
772 
773  // Checking short communication pattern
774 
775  for (size_t s = 0 ; s < N_TRY ; s++)
776  {
777  for (size_t j = 32 ; j < N_LOOP ; j*=2)
778  {
779  global_step = j;
780 
782 
783  // We send one message for each processor (one message is an openfpm::vector<unsigned char>)
784  // or an array of bytes
786 
787  // receving messages. Each receiving message is an openfpm::vector<unsigned char>
788  // or an array if bytes
789 
791 
792  for (size_t i = 0 ; i < NQUEUE ; i++)
793  {recv_message[i].resize(n_proc);}
794 
795  // each processor communicate based on a list of processor
797 
798  // We construct the processor list in particular in this case
799  // each processor communicate with the 8 next (in id) processors
800  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
801  {
802  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
803 
804  // avoid to communicate with yourself
805  if (p_id != vcl.getProcessUnitID())
806  {
807  // Create an hello message
808  prc.add(p_id);
809  for (size_t k = 0 ; k < NQUEUE ; k++)
810  {
811  message[k].add();
812  std::ostringstream msg;
813  msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << p_id;
814  std::string str(msg.str());
815  message[k].last().resize(j);
816  memset(message[k].last().getPointer(),0,j);
817  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
818  }
819  }
820  }
821 
822  openfpm::vector<size_t> sz_send[NQUEUE];
823  openfpm::vector<void *> ptr[NQUEUE];
824 
825  openfpm::vector<size_t> prc_recv[NQUEUE];
826 
827  // For simplicity we create in advance a receiving buffer for all processors
828  for (size_t k = 0 ; k < NQUEUE ; k++)
829  {
830  recv_message[k].resize(n_proc);
831 
832  // The pattern is not really random preallocate the receive buffer
833  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
834  {
835  long int p_id = vcl.getProcessUnitID() - i - 1;
836  if (p_id < 0)
837  p_id += n_proc;
838  else
839  p_id = p_id % n_proc;
840 
841  if (p_id != (long int)vcl.getProcessUnitID())
842  recv_message[k].get(p_id).resize(j);
843  }
844 
845  if (opt == RECEIVE_UNKNOWN)
846  {
847  vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc,&recv_message[k]);
848  }
850  else if (opt == RECEIVE_SIZE_UNKNOWN)
851  {
852  sz_send[k].resize(prc.size());
853  ptr[k].resize(prc.size());
854 
855  for (size_t i = 0 ; i < prc.size() ; i++)
856  {
857  sz_send[k].get(i) = message[k].get(i).size();
858  ptr[k].get(i) = &message[k].get(i).get(0);
859  }
860 
861  // Calculate the receiving part
862 
863  // Check the message
864  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
865  {
866  long int p_id = vcl.getProcessUnitID() - i - 1;
867  if (p_id < 0)
868  {p_id += n_proc;}
869  else
870  {p_id = p_id % n_proc;}
871 
872  if (p_id != (long int)vcl.getProcessUnitID())
873  {
874  prc_recv[k].add(p_id);
875  }
876  }
877 
879 
880  vcl.sendrecvMultipleMessagesNBXAsync(prc.size(),&sz_send[k].get(0),&prc.get(0),
881  &ptr[k].get(0),prc_recv[k].size(),&prc_recv[k].get(0),msg_alloc,&recv_message[k]);
882  }
883 
884  }
885 
886  vcl.progressCommunication();
887  usleep(1000);
888  vcl.progressCommunication();
889  usleep(1000);
890  vcl.progressCommunication();
891  usleep(1000);
892  vcl.progressCommunication();
893  usleep(1000);
894 
896 
897 #ifdef VERBOSE_TEST
898  timer t;
899  t.start();
900 #endif
901 
902 
903 #ifdef VERBOSE_TEST
904  t.stop();
905 
906  double clk = t.getwct();
907  double clk_max = clk;
908 
909  size_t size_send_recv = 2 * j * (prc.size());
910  vcl.sum(size_send_recv);
911  vcl.max(clk_max);
912  vcl.execute();
913 
914  if (vcl.getProcessUnitID() == 0)
915  {std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";}
916 #endif
917 
918  for (size_t k = 0 ; k < NQUEUE ; k++)
919  {
920  // Check the message
921  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
922  {
923  long int p_id = vcl.getProcessUnitID() - i - 1;
924  if (p_id < 0)
925  p_id += n_proc;
926  else
927  p_id = p_id % n_proc;
928 
929  if (p_id != (long int)vcl.getProcessUnitID())
930  {
931  std::ostringstream msg;
932  msg << "H" << k << " from " << p_id << " to " << vcl.getProcessUnitID();
933  std::string str(msg.str());
934  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(p_id).get(0))),true);
935  }
936  else
937  {
938  BOOST_REQUIRE_EQUAL((size_t)0,recv_message[k].get(p_id).size());
939  }
940  }
941  }
942  }
943  }
944 }
945 
946 template<unsigned int ip> void test_random(unsigned int opt)
947 {
948  Vcluster<> & vcl = create_vcluster();
949 
950  // send/recv messages
951 
952  global_rank = vcl.getProcessUnitID();
953  size_t n_proc = vcl.getProcessingUnits();
954 
955  for (size_t s = 0 ; s < N_TRY ; s++)
956  {
957  if (opt == RECEIVE_SIZE_UNKNOWN)
958  {return;}
959 
960  std::srand(create_vcluster().getProcessUnitID());
961  std::default_random_engine eg;
962  std::uniform_int_distribution<int> d(0,n_proc/8);
963 
964  // Check random pattern (maximum 16 processors)
965 
966  for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2)
967  {
968  global_step = j;
969  // original send
971  // send message
973  // recv message
975 // recv_message.reserve(n_proc);
976 
978 
979  for (size_t i = 0 ; i < n_proc ; i++)
980  {
981  // randomly with which processor communicate
982  if (d(eg) == 0)
983  {
984  prc.add(i);
985  o_send.add(i);
986  message.add();
987  message.last().fill(0);
988  std::ostringstream msg;
989  msg << "Hello from " << vcl.getProcessUnitID() << " to " << i;
990  std::string str(msg.str());
991  message.last().resize(str.size());
992  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
993  message.last().resize(j);
994  }
995  }
996 
997  id = 0;
998  prc_recv.clear();
999 
1000 
1001 #ifdef VERBOSE_TEST
1002  timer t;
1003  t.start();
1004 #endif
1005 
1006  commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message);
1007 
1008 #ifdef VERBOSE_TEST
1009  t.stop();
1010  double clk = t.getwct();
1011  double clk_max = clk;
1012 
1013  size_t size_send_recv = (prc.size() + recv_message.size()) * j;
1014  vcl.sum(size_send_recv);
1015  vcl.sum(clk);
1016  vcl.max(clk_max);
1017  vcl.execute();
1018  clk /= vcl.getProcessingUnits();
1019 
1020  if (vcl.getProcessUnitID() == 0)
1021  std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n";
1022 #endif
1023 
1024  // Check the message
1025 
1026  for (size_t i = 0 ; i < recv_message.size() ; i++)
1027  {
1028  std::ostringstream msg;
1029  msg << "Hello from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
1030  std::string str(msg.str());
1031  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
1032  }
1033 
1034  // Reply back
1035 
1036  // Create the message
1037 
1038  prc.clear();
1039  message.clear();
1040  for (size_t i = 0 ; i < prc_recv.size() ; i++)
1041  {
1042  prc.add(prc_recv.get(i));
1043  message.add();
1044  std::ostringstream msg;
1045  msg << "Hey from " << vcl.getProcessUnitID() << " to " << prc_recv.get(i);
1046  std::string str(msg.str());
1047  message.last().resize(str.size());
1048  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
1049  message.last().resize(j);
1050  }
1051 
1052  id = 0;
1053  prc_recv.clear();
1054  recv_message.clear();
1055 
1056  commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message);
1057 
1058  // Check if the received hey message match the original send
1059 
1060  BOOST_REQUIRE_EQUAL(o_send.size(),prc_recv.size());
1061 
1062  for (size_t i = 0 ; i < o_send.size() ; i++)
1063  {
1064  size_t j = 0;
1065  for ( ; j < prc_recv.size() ; j++)
1066  {
1067  if (o_send.get(i) == prc_recv.get(j))
1068  {
1069  // found the message check it
1070 
1071  std::ostringstream msg;
1072  msg << "Hey from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
1073  std::string str(msg.str());
1074  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
1075  break;
1076  }
1077  }
1078  // Check that we find always a match
1079  BOOST_REQUIRE_EQUAL(j != prc_recv.size(),true);
1080  }
1081  }
1082 
1083  // Check long communication pattern
1084 
1085  for (size_t j = 32 ; j < N_LOOP ; j*=2)
1086  {
1087  global_step = j;
1088  // Processor step
1089  long int ps = n_proc / (8 + 1);
1090 
1091  // send message
1093  // recv message
1094  openfpm::vector<openfpm::vector<unsigned char>> recv_message(n_proc);
1095 
1097 
1098  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
1099  {
1100  size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc;
1101  if (p_id != vcl.getProcessUnitID())
1102  {
1103  prc.add(p_id);
1104  message.add();
1105  std::ostringstream msg;
1106  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
1107  std::string str(msg.str());
1108  message.last().resize(j);
1109  memset(message.last().getPointer(),0,j);
1110  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
1111  }
1112  }
1113 
1114  recv_message.resize(n_proc);
1115  // The pattern is not really random preallocate the receive buffer
1116  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
1117  {
1118  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
1119  if (p_id < 0)
1120  p_id += n_proc;
1121  else
1122  p_id = p_id % n_proc;
1123 
1124  if (p_id != (long int)vcl.getProcessUnitID())
1125  recv_message.get(p_id).resize(j);
1126  }
1127 
1128 #ifdef VERBOSE_TEST
1129  timer t;
1130  t.start();
1131 #endif
1132 
1133  commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message);
1134 
1135 #ifdef VERBOSE_TEST
1136  t.stop();
1137  double clk = t.getwct();
1138  double clk_max = clk;
1139 
1140  size_t size_send_recv = 2 * j * (prc.size());
1141  vcl.sum(size_send_recv);
1142  vcl.max(clk_max);
1143  vcl.execute();
1144 
1145  if (vcl.getProcessUnitID() == 0)
1146  std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
1147 #endif
1148 
1149  // Check the message
1150  for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++)
1151  {
1152  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
1153  if (p_id < 0)
1154  p_id += n_proc;
1155  else
1156  p_id = p_id % n_proc;
1157 
1158  if (p_id != (long int)vcl.getProcessUnitID())
1159  {
1160  std::ostringstream msg;
1161  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
1162  std::string str(msg.str());
1163  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
1164  }
1165  else
1166  {
1167  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
1168  }
1169  }
1170  }
1171  }
1172 }
1173 
1174 template<unsigned int ip> void test_random_multiple(unsigned int opt)
1175 {
1176  Vcluster<> & vcl = create_vcluster();
1177 
1178  // send/recv messages
1179 
1180  global_rank = vcl.getProcessUnitID();
1181  size_t n_proc = vcl.getProcessingUnits();
1182 
1183  for (size_t s = 0 ; s < N_TRY ; s++)
1184  {
1185  if (opt == RECEIVE_SIZE_UNKNOWN)
1186  {return;}
1187 
1188  std::srand(create_vcluster().getProcessUnitID());
1189  std::default_random_engine eg;
1190  std::uniform_int_distribution<int> d(0,n_proc/8);
1191 
1192  rcv_rm rcv[NQUEUE];
1193 
1194  // Check random pattern (maximum 16 processors)
1195 
1196  for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2)
1197  {
1198  global_step = j;
1199  // original send
1200  openfpm::vector<size_t> o_send[NQUEUE];
1201  // send message
1203  // recv message
1204  openfpm::vector<openfpm::vector<unsigned char>> recv_message[NQUEUE];
1205 // recv_message.reserve(n_proc);
1206  openfpm::vector<size_t> prc_recv[NQUEUE];
1207 
1209 
1210  for (size_t i = 0 ; i < n_proc ; i++)
1211  {
1212  // randomly with which processor communicate
1213  if (d(eg) == 0)
1214  {
1215  prc.add(i);
1216  for (size_t k = 0 ; k < NQUEUE ; k++)
1217  {
1218  o_send[k].add(i);
1219  message[k].add();
1220  message[k].last().fill(0);
1221  std::ostringstream msg;
1222  msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << i;
1223  std::string str(msg.str());
1224  message[k].last().resize(str.size());
1225  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
1226  message[k].last().resize(j);
1227  }
1228  }
1229  }
1230 
1231  id = 0;
1232 
1233 
1234 #ifdef VERBOSE_TEST
1235  timer t;
1236  t.start();
1237 #endif
1238 
1239  for (size_t k = 0 ; k < NQUEUE ; k++)
1240  {
1241  rcv[k].prc_recv = &prc_recv[k];
1242  rcv[k].recv_message = &recv_message[k];
1243 
1244  vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc4,&rcv[k]);
1245  }
1246 
1247 #ifdef VERBOSE_TEST
1248  t.stop();
1249  double clk = t.getwct();
1250  double clk_max = clk;
1251 
1252  size_t size_send_recv = (prc.size() + recv_message.size()) * j;
1253  vcl.sum(size_send_recv);
1254  vcl.sum(clk);
1255  vcl.max(clk_max);
1256  vcl.execute();
1257  clk /= vcl.getProcessingUnits();
1258 
1259  if (vcl.getProcessUnitID() == 0)
1260  std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n";
1261 #endif
1262 
1263  vcl.progressCommunication();
1264  usleep(1000);
1265  vcl.progressCommunication();
1266  usleep(1000);
1267  vcl.progressCommunication();
1268  usleep(1000);
1269  vcl.progressCommunication();
1270  usleep(1000);
1271 
1273 
1274  for (size_t k = 0 ; k < NQUEUE ; k++)
1275  {
1276  // Check the message
1277 
1278  for (size_t i = 0 ; i < recv_message[k].size() ; i++)
1279  {
1280  std::ostringstream msg;
1281  msg << "H" << k << " from " << prc_recv[k].get(i) << " to " << vcl.getProcessUnitID();
1282  std::string str(msg.str());
1283  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(i).get(0))),true);
1284  }
1285 
1286  // Reply back
1287 
1288  // Create the message
1289 
1290  prc.clear();
1291  message[k].clear();
1292  for (size_t i = 0 ; i < prc_recv[k].size() ; i++)
1293  {
1294  prc.add(prc_recv[k].get(i));
1295  message[k].add();
1296  std::ostringstream msg;
1297  msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << prc_recv[k].get(i);
1298  std::string str(msg.str());
1299  message[k].last().resize(str.size());
1300  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0)));
1301  message[k].last().resize(j);
1302  }
1303 
1304  id = 0;
1305  prc_recv[k].clear();
1306  recv_message[k].clear();
1307 
1308  rcv_rm rr;
1309  rr.prc_recv = &prc_recv[k];
1310  rr.recv_message = &recv_message[k];
1311 
1312  vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc4,&rr);
1313 
1314  vcl.progressCommunication();
1315  usleep(1000);
1316  vcl.progressCommunication();
1317  usleep(1000);
1318  vcl.progressCommunication();
1319  usleep(1000);
1320  vcl.progressCommunication();
1321  usleep(1000);
1322 
1324 
1325  // Check if the received hey message match the original send
1326 
1327  BOOST_REQUIRE_EQUAL(o_send[k].size(),prc_recv[k].size());
1328 
1329  for (size_t i = 0 ; i < o_send[k].size() ; i++)
1330  {
1331  size_t j = 0;
1332  for ( ; j < prc_recv[k].size() ; j++)
1333  {
1334  if (o_send[k].get(i) == prc_recv[k].get(j))
1335  {
1336  // found the message check it
1337 
1338  std::ostringstream msg;
1339  msg << "H" << k << " from " << prc_recv[k].get(i) << " to " << vcl.getProcessUnitID();
1340  std::string str(msg.str());
1341  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(i).get(0))),true);
1342  break;
1343  }
1344  }
1345  // Check that we find always a match
1346  BOOST_REQUIRE_EQUAL(j != prc_recv[k].size(),true);
1347  }
1348  }
1349  }
1350  }
1351 }
1352 
1359 void test_send_recv_complex(const size_t n, Vcluster<> & vcl)
1360 {
1362 
1363  // Point test typedef
1364  typedef Point_test<float> p;
1365 
1366  openfpm::vector<Point_test<float>> v_send = allocate_openfpm_fill(n,vcl.getProcessUnitID());
1367 
1368  // Send to 8 processors
1369  for (size_t i = 0 ; i < 8 ; i++)
1370  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send);
1371 
1373  pt_buf.resize(8);
1374 
1375  // Recv from 8 processors
1376  for (size_t i = 0 ; i < 8 ; i++)
1377  {
1378  pt_buf.get(i).resize(n);
1379  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i));
1380  }
1381 
1382  vcl.execute();
1383 
1385 
1386  // Check the received buffers (careful at negative modulo)
1387  for (size_t i = 0 ; i < 8 ; i++)
1388  {
1389  for (size_t j = 0 ; j < n ; j++)
1390  {
1391  Point_test<float> pt = pt_buf.get(i).get(j);
1392 
1393  size_t p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1394 
1395  BOOST_REQUIRE_EQUAL(pt.template get<p::x>(),p_recv);
1396  BOOST_REQUIRE_EQUAL(pt.template get<p::y>(),p_recv);
1397  BOOST_REQUIRE_EQUAL(pt.template get<p::z>(),p_recv);
1398  BOOST_REQUIRE_EQUAL(pt.template get<p::s>(),p_recv);
1399  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[0],p_recv);
1400  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[1],p_recv);
1401  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[2],p_recv);
1402  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][0],p_recv);
1403  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][1],p_recv);
1404  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][2],p_recv);
1405  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][0],p_recv);
1406  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][1],p_recv);
1407  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][2],p_recv);
1408  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][0],p_recv);
1409  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][1],p_recv);
1410  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][2],p_recv);
1411  }
1412  }
1413 }
1414 
1422 template<typename T> void test_send_recv_primitives(size_t n, Vcluster<> & vcl)
1423 {
1424  openfpm::vector<T> v_send = allocate_openfpm_primitive<T>(n,vcl.getProcessUnitID());
1425 
1426  {
1428 
1429  // Send to 8 processors
1430  for (size_t i = 0 ; i < 8 ; i++)
1431  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send);
1432 
1434  pt_buf.resize(8);
1435 
1436  // Recv from 8 processors
1437  for (size_t i = 0 ; i < 8 ; i++)
1438  {
1439  pt_buf.get(i).resize(n);
1440  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i));
1441  }
1442 
1443  vcl.execute();
1444 
1446 
1447  // Check the received buffers (careful at negative modulo)
1448  for (size_t i = 0 ; i < 8 ; i++)
1449  {
1450  for (size_t j = 0 ; j < n ; j++)
1451  {
1452  T pt = pt_buf.get(i).get(j);
1453 
1454  T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1455 
1456  BOOST_REQUIRE_EQUAL(pt,p_recv);
1457  }
1458  }
1459 
1460  }
1461 
1462  {
1464 
1465  // Send to 8 processors
1466  for (size_t i = 0 ; i < 8 ; i++)
1467  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send.getPointer(),v_send.size()*sizeof(T));
1468 
1470  pt_buf.resize(8);
1471 
1472  // Recv from 8 processors
1473  for (size_t i = 0 ; i < 8 ; i++)
1474  {
1475  pt_buf.get(i).resize(n);
1476  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i).getPointer(),pt_buf.get(i).size()*sizeof(T));
1477  }
1478 
1479  vcl.execute();
1480 
1482 
1483  // Check the received buffers (careful at negative modulo)
1484  for (size_t i = 0 ; i < 8 ; i++)
1485  {
1486  for (size_t j = 0 ; j < n ; j++)
1487  {
1488  T pt = pt_buf.get(i).get(j);
1489 
1490  T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
1491 
1492  BOOST_REQUIRE_EQUAL(pt,p_recv);
1493  }
1494  }
1495 
1496  }
1497 }
1498 
1499 template<typename T> void test_single_all_gather_primitives(Vcluster<> & vcl)
1500 {
1502 
1503  openfpm::vector<T> clt;
1504  T data = vcl.getProcessUnitID();
1505 
1506  vcl.allGather(data,clt);
1507  vcl.execute();
1508 
1509  for (size_t i = 0 ; i < vcl.getProcessingUnits() ; i++)
1510  BOOST_REQUIRE_EQUAL(i,(size_t)clt.get(i));
1511 
1513 
1514 }
1515 
1516 #endif /* VCLUSTER_UNIT_TEST_UTIL_HPP_ */
void progressCommunication()
In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function progress th...
size_t getProcessUnitID()
Get the process unit id.
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
size_t size()
Stub size.
Definition: map_vector.hpp:211
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
double getwct()
Return the elapsed real time.
Definition: timer.hpp:130
Implementation of VCluster class.
Definition: VCluster.hpp:58
void execute()
Execute all the requests.
KeyT const ValueT ValueT OffsetIteratorT OffsetIteratorT int
[in] The number of segments that comprise the sorting data
void start()
Start the timer.
Definition: timer.hpp:90
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
size_t getProcessingUnits()
Get the total number of processors.
void sum(T &num)
Sum the numbers across all processors and get the result.
void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > &prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages asynchronous version.
void sendrecvMultipleMessagesNBXWait()
Send and receive multiple messages wait NBX communication to complete.
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
Test structure used for several test.
Definition: Point_test.hpp:105
Class for cpu time benchmarking.
Definition: timer.hpp:27
void stop()
Stop the timer.
Definition: timer.hpp:119