OpenFPM_pdata  1.1.0
Project that contain the implementation of distributed structures
 All Data Structures Namespaces Functions Variables Typedefs Enumerations Friends Pages
VCluster_unit_test_util.hpp
1 /*
2  * VCluster_unit_test_util.hpp
3  *
4  * Created on: May 30, 2015
5  * Author: i-bird
6  */
7 
8 #ifndef VCLUSTER_UNIT_TEST_UTIL_HPP_
9 #define VCLUSTER_UNIT_TEST_UTIL_HPP_
10 
11 #include "Point_test.hpp"
12 #include "VCluster_base.hpp"
13 #include "Vector/vector_test_util.hpp"
14 
15 #define RECEIVE_UNKNOWN 1
16 #define RECEIVE_SIZE_UNKNOWN 2
17 
18 #define NBX 1
19 
20 #define N_TRY 2
21 #define N_LOOP 67108864
22 #define BUFF_STEP 524288
23 #define P_STRIDE 17
24 
25 bool totp_check;
26 size_t global_step = 0;
27 size_t global_rank;
28 
35 int mod(int x, int m) {
36  return (x%m + m)%m;
37 }
38 
39 // Alloc the buffer to receive the messages
40 
42 
43 void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i,size_t ri, void * ptr)
44 {
45  // convert the void pointer argument into a pointer to receiving buffers
47 
50 
51  if (create_vcluster().getProcessingUnits() <= 8)
52  {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,create_vcluster().getProcessingUnits()-1);}
53  else
54  {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,(size_t)8);}
55 
56  BOOST_REQUIRE_EQUAL(msg_i, global_step);
57 
59 
60  // Create the memory to receive the message
61  // msg_i contain the size of the message to receive
62  // i contain the processor id
63  v->get(i).resize(msg_i);
64 
65  // return the pointer of the allocated memory
66  return &(v->get(i).get(0));
67 }
68 
70 
71 // Alloc the buffer to receive the messages
72 
73 size_t id = 0;
75 
76 void * msg_alloc2(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
77 {
79 
80  v->resize(total_p);
81  prc_recv.resize(total_p);
82 
83  BOOST_REQUIRE_EQUAL(msg_i, global_step);
84 
85  id++;
86  v->get(id-1).resize(msg_i);
87  prc_recv.get(id-1) = i;
88  return &(v->get(id-1).get(0));
89 }
90 
91 void * msg_alloc3(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
92 {
94 
95  v->add();
96 
97  prc_recv.add();
98 
99  BOOST_REQUIRE_EQUAL(msg_i, global_step);
100 
101  v->last().resize(msg_i);
102  prc_recv.last() = i;
103  return &(v->last().get(0));
104 }
105 
106 template<unsigned int ip, typename T> void commFunc(Vcluster & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
107 {
108  vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);
109 }
110 
111 template<unsigned int ip, typename T> void commFunc_null_odd(Vcluster & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
112 {
113  if (vcl.getProcessUnitID() % 2 == 0)
114  vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);
115  else
116  {
117  // No send check if passing null to sendrecv work
118  vcl.sendrecvMultipleMessagesNBX(prc.size(),(size_t *)NULL,(size_t *)NULL,(void **)NULL,msg_alloc,ptr_arg,NONE);
119  }
120 }
121 
122 template <unsigned int ip> std::string method()
123 {
124  return std::string("NBX");
125 }
126 
127 template<unsigned int ip> void test_no_send_some_peer()
128 {
129  Vcluster & vcl = create_vcluster();
130 
131  size_t n_proc = vcl.getProcessingUnits();
132 
133  // Check long communication with some peer not comunication
134 
135  size_t j = 4567;
136 
137  global_step = j;
138  // Processor step
139  long int ps = n_proc / (8 + 1);
140 
141  // send message
143  // recv message
145 
147 
148  // only even communicate
149 
150  if (vcl.getProcessUnitID() % 2 == 0)
151  {
152  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
153  {
154  size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc;
155  if (p_id != vcl.getProcessUnitID())
156  {
157  prc.add(p_id);
158  message.add();
159  std::ostringstream msg;
160  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
161  std::string str(msg.str());
162  message.last().resize(j);
163  memset(message.last().getPointer(),0,j);
164  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
165  }
166  }
167  }
168 
169  recv_message.resize(n_proc);
170 
171 #ifdef VERBOSE_TEST
172  timer t;
173  t.start();
174 #endif
175 
176  commFunc_null_odd<ip>(vcl,prc,message,msg_alloc,&recv_message);
177 
178 #ifdef VERBOSE_TEST
179  t.stop();
180  double clk = t.getwct();
181  double clk_max = clk;
182 
183  size_t size_send_recv = 2 * j * (prc.size());
184  vcl.sum(size_send_recv);
185  vcl.max(clk_max);
186  vcl.execute();
187 
188  if (vcl.getProcessUnitID() == 0)
189  std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
190 #endif
191 
192  // Check the message
193  for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++)
194  {
195  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
196  if (p_id < 0)
197  p_id += n_proc;
198  else
199  p_id = p_id % n_proc;
200 
201  if (p_id != (long int)vcl.getProcessUnitID())
202  {
203  // only even processor communicate
204  if (p_id % 2 == 1)
205  continue;
206 
207  std::ostringstream msg;
208  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
209  std::string str(msg.str());
210  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
211  }
212  else
213  {
214  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
215  }
216  }
217 }
218 
219 template<unsigned int ip> void test_known()
220 {
221  Vcluster & vcl = create_vcluster();
222 
223  // send/recv messages
224 
225  global_rank = vcl.getProcessUnitID();
226  size_t n_proc = vcl.getProcessingUnits();
227 
228  // Checking short communication pattern
229 
230  for (size_t s = 0 ; s < N_TRY ; s++)
231  {
232  for (size_t j = 32 ; j < N_LOOP ; j*=2)
233  {
234  global_step = j;
235  // send message
237  // recv message
239  recv_message.reserve(n_proc);
240 
241  openfpm::vector<size_t> prc_recv;
242  openfpm::vector<size_t> recv_sz;
243 
245 
246  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
247  {
248  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
249  if (p_id != vcl.getProcessUnitID())
250  {
251  prc.add(p_id);
252  message.add();
253  std::ostringstream msg;
254  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
255  std::string str(msg.str());
256  message.last().resize(j);
257  memset(message.last().getPointer(),0,j);
258  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
259  }
260  }
261 
262  recv_message.resize(n_proc);
263  // The pattern is not really random preallocate the receive buffer
264  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
265  {
266  long int p_id = vcl.getProcessUnitID() - i - 1;
267  if (p_id < 0)
268  p_id += n_proc;
269  else
270  p_id = p_id % n_proc;
271 
272  if (p_id != (long int)vcl.getProcessUnitID())
273  {
274  prc_recv.add(p_id);
275  recv_message.get(p_id).resize(j);
276  recv_sz.add(j);
277  }
278  }
279 
280 #ifdef VERBOSE_TEST
281  timer t;
282  t.start();
283 #endif
284 
285  vcl.sendrecvMultipleMessagesNBX(prc,message,prc_recv,recv_sz,msg_alloc,&recv_message);
286 
287 #ifdef VERBOSE_TEST
288  t.stop();
289 
290  double clk = t.getwct();
291  double clk_max = clk;
292 
293  size_t size_send_recv = 2 * j * (prc.size());
294  vcl.sum(size_send_recv);
295  vcl.max(clk_max);
296  vcl.execute();
297 
298  if (vcl.getProcessUnitID() == 0)
299  std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
300 #endif
301 
302  // Check the message
303  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
304  {
305  long int p_id = vcl.getProcessUnitID() - i - 1;
306  if (p_id < 0)
307  p_id += n_proc;
308  else
309  p_id = p_id % n_proc;
310 
311  if (p_id != (long int)vcl.getProcessUnitID())
312  {
313  std::ostringstream msg;
314  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
315  std::string str(msg.str());
316  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
317  }
318  else
319  {
320  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
321  }
322  }
323  }
324  }
325 }
326 
327 template<unsigned int ip> void test(unsigned int opt)
328 {
329  Vcluster & vcl = create_vcluster();
330 
331  // send/recv messages
332 
333  global_rank = vcl.getProcessUnitID();
334  size_t n_proc = vcl.getProcessingUnits();
335 
336  // Checking short communication pattern
337 
338  for (size_t s = 0 ; s < N_TRY ; s++)
339  {
340  for (size_t j = 32 ; j < N_LOOP ; j*=2)
341  {
342  global_step = j;
343 
345 
346  // We send one message for each processor (one message is an openfpm::vector<unsigned char>)
347  // or an array of bytes
349 
350  // receving messages. Each receiving message is an openfpm::vector<unsigned char>
351  // or an array if bytes
353 
354  // each processor communicate based on a list of processor
356 
357  // We construct the processor list in particular in this case
358  // each processor communicate with the 8 next (in id) processors
359  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
360  {
361  size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
362 
363  // avoid to communicate with yourself
364  if (p_id != vcl.getProcessUnitID())
365  {
366  // Create an hello message
367  prc.add(p_id);
368  message.add();
369  std::ostringstream msg;
370  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
371  std::string str(msg.str());
372  message.last().resize(j);
373  memset(message.last().getPointer(),0,j);
374  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
375  }
376  }
377 
378  // For simplicity we create in advance a receiving buffer for all processors
379  recv_message.resize(n_proc);
380 
381  // The pattern is not really random preallocate the receive buffer
382  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
383  {
384  long int p_id = vcl.getProcessUnitID() - i - 1;
385  if (p_id < 0)
386  p_id += n_proc;
387  else
388  p_id = p_id % n_proc;
389 
390  if (p_id != (long int)vcl.getProcessUnitID())
391  recv_message.get(p_id).resize(j);
392  }
393 
394  if (opt == RECEIVE_UNKNOWN)
395  {
396  // Send and receive
397  vcl.sendrecvMultipleMessagesNBX(prc,message,msg_alloc,&recv_message);
398  }
400  else if (opt == RECEIVE_SIZE_UNKNOWN)
401  {
402  openfpm::vector<size_t> sz_send;
404 
405  openfpm::vector<size_t> prc_recv;
406 
407  sz_send.resize(prc.size());
408  ptr.resize(prc.size());
409 
410  for (size_t i = 0 ; i < prc.size() ; i++)
411  {
412  sz_send.get(i) = message.get(i).size();
413  ptr.get(i) = &message.get(i).get(0);
414  }
415 
416  // Calculate the receiving part
417 
418  // Check the message
419  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
420  {
421  long int p_id = vcl.getProcessUnitID() - i - 1;
422  if (p_id < 0)
423  {p_id += n_proc;}
424  else
425  {p_id = p_id % n_proc;}
426 
427  if (p_id != (long int)vcl.getProcessUnitID())
428  {
429  prc_recv.add(p_id);
430  }
431  }
432 
434 
435  // Send and receive
436  vcl.sendrecvMultipleMessagesNBX(prc.size(),&sz_send.get(0),&prc.get(0),
437  &ptr.get(0),prc_recv.size(),&prc_recv.get(0),msg_alloc,&recv_message);
438  }
439 
440 #ifdef VERBOSE_TEST
441  timer t;
442  t.start();
443 #endif
444 
445 
446 #ifdef VERBOSE_TEST
447  t.stop();
448 
449  double clk = t.getwct();
450  double clk_max = clk;
451 
452  size_t size_send_recv = 2 * j * (prc.size());
453  vcl.sum(size_send_recv);
454  vcl.max(clk_max);
455  vcl.execute();
456 
457  if (vcl.getProcessUnitID() == 0)
458  std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
459 #endif
460 
461  // Check the message
462  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
463  {
464  long int p_id = vcl.getProcessUnitID() - i - 1;
465  if (p_id < 0)
466  p_id += n_proc;
467  else
468  p_id = p_id % n_proc;
469 
470  if (p_id != (long int)vcl.getProcessUnitID())
471  {
472  std::ostringstream msg;
473  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
474  std::string str(msg.str());
475  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
476  }
477  else
478  {
479  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
480  }
481  }
482  }
483 
484  if (opt == RECEIVE_SIZE_UNKNOWN)
485  {return;}
486 
487  std::srand(create_vcluster().getProcessUnitID());
488  std::default_random_engine eg;
489  std::uniform_int_distribution<int> d(0,n_proc/8);
490 
491  // Check random pattern (maximum 16 processors)
492 
493  for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2)
494  {
495  global_step = j;
496  // original send
498  // send message
500  // recv message
502 // recv_message.reserve(n_proc);
503 
505 
506  for (size_t i = 0 ; i < n_proc ; i++)
507  {
508  // randomly with which processor communicate
509  if (d(eg) == 0)
510  {
511  prc.add(i);
512  o_send.add(i);
513  message.add();
514  message.last().fill(0);
515  std::ostringstream msg;
516  msg << "Hello from " << vcl.getProcessUnitID() << " to " << i;
517  std::string str(msg.str());
518  message.last().resize(str.size());
519  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
520  message.last().resize(j);
521  }
522  }
523 
524  id = 0;
525  prc_recv.clear();
526 
527 
528 #ifdef VERBOSE_TEST
529  timer t;
530  t.start();
531 #endif
532 
533  commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message);
534 
535 #ifdef VERBOSE_TEST
536  t.stop();
537  double clk = t.getwct();
538  double clk_max = clk;
539 
540  size_t size_send_recv = (prc.size() + recv_message.size()) * j;
541  vcl.sum(size_send_recv);
542  vcl.sum(clk);
543  vcl.max(clk_max);
544  vcl.execute();
545  clk /= vcl.getProcessingUnits();
546 
547  if (vcl.getProcessUnitID() == 0)
548  std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n";
549 #endif
550 
551  // Check the message
552 
553  for (size_t i = 0 ; i < recv_message.size() ; i++)
554  {
555  std::ostringstream msg;
556  msg << "Hello from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
557  std::string str(msg.str());
558  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
559  }
560 
561  // Reply back
562 
563  // Create the message
564 
565  prc.clear();
566  message.clear();
567  for (size_t i = 0 ; i < prc_recv.size() ; i++)
568  {
569  prc.add(prc_recv.get(i));
570  message.add();
571  std::ostringstream msg;
572  msg << "Hey from " << vcl.getProcessUnitID() << " to " << prc_recv.get(i);
573  std::string str(msg.str());
574  message.last().resize(str.size());
575  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
576  message.last().resize(j);
577  }
578 
579  id = 0;
580  prc_recv.clear();
581  recv_message.clear();
582 
583  commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message);
584 
585  // Check if the received hey message match the original send
586 
587  BOOST_REQUIRE_EQUAL(o_send.size(),prc_recv.size());
588 
589  for (size_t i = 0 ; i < o_send.size() ; i++)
590  {
591  size_t j = 0;
592  for ( ; j < prc_recv.size() ; j++)
593  {
594  if (o_send.get(i) == prc_recv.get(j))
595  {
596  // found the message check it
597 
598  std::ostringstream msg;
599  msg << "Hey from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
600  std::string str(msg.str());
601  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
602  break;
603  }
604  }
605  // Check that we find always a match
606  BOOST_REQUIRE_EQUAL(j != prc_recv.size(),true);
607  }
608  }
609 
610  // Check long communication pattern
611 
612  for (size_t j = 32 ; j < N_LOOP ; j*=2)
613  {
614  global_step = j;
615  // Processor step
616  long int ps = n_proc / (8 + 1);
617 
618  // send message
620  // recv message
622 
624 
625  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
626  {
627  size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc;
628  if (p_id != vcl.getProcessUnitID())
629  {
630  prc.add(p_id);
631  message.add();
632  std::ostringstream msg;
633  msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
634  std::string str(msg.str());
635  message.last().resize(j);
636  memset(message.last().getPointer(),0,j);
637  std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
638  }
639  }
640 
641  recv_message.resize(n_proc);
642  // The pattern is not really random preallocate the receive buffer
643  for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
644  {
645  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
646  if (p_id < 0)
647  p_id += n_proc;
648  else
649  p_id = p_id % n_proc;
650 
651  if (p_id != (long int)vcl.getProcessUnitID())
652  recv_message.get(p_id).resize(j);
653  }
654 
655 #ifdef VERBOSE_TEST
656  timer t;
657  t.start();
658 #endif
659 
660  commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message);
661 
662 #ifdef VERBOSE_TEST
663  t.stop();
664  double clk = t.getwct();
665  double clk_max = clk;
666 
667  size_t size_send_recv = 2 * j * (prc.size());
668  vcl.sum(size_send_recv);
669  vcl.max(clk_max);
670  vcl.execute();
671 
672  if (vcl.getProcessUnitID() == 0)
673  std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
674 #endif
675 
676  // Check the message
677  for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++)
678  {
679  long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID());
680  if (p_id < 0)
681  p_id += n_proc;
682  else
683  p_id = p_id % n_proc;
684 
685  if (p_id != (long int)vcl.getProcessUnitID())
686  {
687  std::ostringstream msg;
688  msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
689  std::string str(msg.str());
690  BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
691  }
692  else
693  {
694  BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size());
695  }
696  }
697  }
698  }
699 }
700 
707 void test_send_recv_complex(const size_t n, Vcluster & vcl)
708 {
710 
711  // Point test typedef
712  typedef Point_test<float> p;
713 
714  openfpm::vector<Point_test<float>> v_send = allocate_openfpm_fill(n,vcl.getProcessUnitID());
715 
716  // Send to 8 processors
717  for (size_t i = 0 ; i < 8 ; i++)
718  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send);
719 
721  pt_buf.resize(8);
722 
723  // Recv from 8 processors
724  for (size_t i = 0 ; i < 8 ; i++)
725  {
726  pt_buf.get(i).resize(n);
727  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i));
728  }
729 
730  vcl.execute();
731 
733 
734  // Check the received buffers (careful at negative modulo)
735  for (size_t i = 0 ; i < 8 ; i++)
736  {
737  for (size_t j = 0 ; j < n ; j++)
738  {
739  Point_test<float> pt = pt_buf.get(i).get(j);
740 
741  size_t p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
742 
743  BOOST_REQUIRE_EQUAL(pt.template get<p::x>(),p_recv);
744  BOOST_REQUIRE_EQUAL(pt.template get<p::y>(),p_recv);
745  BOOST_REQUIRE_EQUAL(pt.template get<p::z>(),p_recv);
746  BOOST_REQUIRE_EQUAL(pt.template get<p::s>(),p_recv);
747  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[0],p_recv);
748  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[1],p_recv);
749  BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[2],p_recv);
750  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][0],p_recv);
751  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][1],p_recv);
752  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][2],p_recv);
753  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][0],p_recv);
754  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][1],p_recv);
755  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][2],p_recv);
756  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][0],p_recv);
757  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][1],p_recv);
758  BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][2],p_recv);
759  }
760  }
761 }
762 
770 template<typename T> void test_send_recv_primitives(size_t n, Vcluster & vcl)
771 {
772  openfpm::vector<T> v_send = allocate_openfpm_primitive<T>(n,vcl.getProcessUnitID());
773 
774  {
776 
777  // Send to 8 processors
778  for (size_t i = 0 ; i < 8 ; i++)
779  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send);
780 
782  pt_buf.resize(8);
783 
784  // Recv from 8 processors
785  for (size_t i = 0 ; i < 8 ; i++)
786  {
787  pt_buf.get(i).resize(n);
788  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i));
789  }
790 
791  vcl.execute();
792 
794 
795  // Check the received buffers (careful at negative modulo)
796  for (size_t i = 0 ; i < 8 ; i++)
797  {
798  for (size_t j = 0 ; j < n ; j++)
799  {
800  T pt = pt_buf.get(i).get(j);
801 
802  T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
803 
804  BOOST_REQUIRE_EQUAL(pt,p_recv);
805  }
806  }
807 
808  }
809 
810  {
812 
813  // Send to 8 processors
814  for (size_t i = 0 ; i < 8 ; i++)
815  vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send.getPointer(),v_send.size()*sizeof(T));
816 
818  pt_buf.resize(8);
819 
820  // Recv from 8 processors
821  for (size_t i = 0 ; i < 8 ; i++)
822  {
823  pt_buf.get(i).resize(n);
824  vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i).getPointer(),pt_buf.get(i).size()*sizeof(T));
825  }
826 
827  vcl.execute();
828 
830 
831  // Check the received buffers (careful at negative modulo)
832  for (size_t i = 0 ; i < 8 ; i++)
833  {
834  for (size_t j = 0 ; j < n ; j++)
835  {
836  T pt = pt_buf.get(i).get(j);
837 
838  T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits());
839 
840  BOOST_REQUIRE_EQUAL(pt,p_recv);
841  }
842  }
843 
844  }
845 }
846 
847 template<typename T> void test_single_all_gather_primitives(Vcluster & vcl)
848 {
850 
851  openfpm::vector<T> clt;
852  T data = vcl.getProcessUnitID();
853 
854  vcl.allGather(data,clt);
855  vcl.execute();
856 
857  for (size_t i = 0 ; i < vcl.getProcessingUnits() ; i++)
858  BOOST_REQUIRE_EQUAL(i,(size_t)clt.get(i));
859 
861 
862 }
863 
864 template<typename T> void test_single_all_broadcast_primitives(Vcluster & vcl)
865 {
867 
868  openfpm::vector<T> bdata;
869 
870  if (vcl.getProcessUnitID() == 0)
871  {
872  bdata.add(0);
873  bdata.add(1);
874  bdata.add(2);
875  bdata.add(3);
876  bdata.add(4);
877  bdata.add(5);
878  bdata.add(6);
879  }
880  else
881  {
882  bdata.resize(7);
883  }
884 
885  vcl.Bcast(bdata,0);
886  vcl.execute();
887 
888  for (size_t i = 0 ; i < bdata.size() ; i++)
889  BOOST_REQUIRE_EQUAL(i,(size_t)bdata.get(i));
890 
892 
893 }
894 
895 #endif /* VCLUSTER_UNIT_TEST_UTIL_HPP_ */
void sum(T &num)
Sum the numbers across all processors and get the result.
size_t getProcessUnitID()
Get the process unit id.
void execute()
Execute all the requests.
bool Bcast(openfpm::vector< T, Mem, gr > &v, size_t root)
Broadcast the data to all processors.
size_t size()
Stub size.
Definition: map_vector.hpp:70
void max(T &num)
Get the maximum number across all processors (or reduction with infinity norm)
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > &prc, openfpm::vector< T > &data, openfpm::vector< size_t > prc_recv, openfpm::vector< size_t > &recv_sz, void *(*msg_alloc)(size_t, size_t, size_t, size_t, size_t, void *), void *ptr_arg, long int opt=NONE)
Send and receive multiple messages.
double getwct()
Return the elapsed real time.
Definition: timer.hpp:108
bool send(size_t proc, size_t tag, const void *mem, size_t sz)
Send data to a processor.
Implementation of VCluster class.
Definition: VCluster.hpp:36
bool recv(size_t proc, size_t tag, void *v, size_t sz)
Recv data from a processor.
void start()
Start the timer.
Definition: timer.hpp:73
Test structure used for several test.
Definition: Point_test.hpp:105
Implementation of 1-D std::vector like structure.
Definition: map_vector.hpp:61
size_t getProcessingUnits()
Get the total number of processors.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
Class for cpu time benchmarking.
Definition: timer.hpp:25
void stop()
Stop the timer.
Definition: timer.hpp:97