OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
HDF5_reader_gd.hpp
1 /*
2  * HDF5_reader_gr.hpp
3  *
4  * Created on: May 2, 2017
5  * Author: i-bird
6  */
7 
8 #ifndef OPENFPM_IO_SRC_HDF5_WR_HDF5_READER_GD_HPP_
9 #define OPENFPM_IO_SRC_HDF5_WR_HDF5_READER_GD_HPP_
10 
11 
12 #include "Packer_Unpacker/Pack_selector.hpp"
13 #include "Packer_Unpacker/Packer.hpp"
14 #include "Packer_Unpacker/Unpacker.hpp"
15 #include "util/GBoxes.hpp"
16 
17 template <>
18 class HDF5_reader<GRID_DIST>
19 {
20  template<typename device_grid> void load_block(long int bid,
21  hssize_t mpi_size_old,
22  long int * metadata_out,
23  openfpm::vector<size_t> & metadata_accum,
24  hid_t plist_id,
25  hid_t dataset_2,
26  openfpm::vector<device_grid> & loc_grid_old,
28  {
29  hsize_t offset[1];
30  hsize_t block[1];
31 
32  if (bid < mpi_size_old && bid != -1)
33  {
34  offset[0] = metadata_accum.get(bid);
35  block[0] = metadata_out[bid];
36  }
37  else
38  {
39  offset[0] = 0;
40  block[0] = 0;
41  }
42 
43  hsize_t count[1] = {1};
44 
45  // allocate the memory
46  HeapMemory pmem;
47  //pmem.allocate(req);
48  ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(block[0],pmem));
49  mem.incRef();
50 
51  //Select file dataspace
52  hid_t file_dataspace_id_2 = H5Dget_space(dataset_2);
53 
54  size_t to_read = block[0];
55  size_t coffset = 0;
56 
57  auto & v_cl = create_vcluster();
58 
59  int read_test = (to_read != 0);
60  v_cl.max(read_test);
61  v_cl.execute();
62 
63  while (read_test)
64  {
65  hsize_t block_c[1];
66  block_c[0] = std::min((size_t)(to_read),(size_t)0x7FFFFFFF);
67 
68  hsize_t offset_c[1] = {offset[0] + coffset};
69  H5Sselect_hyperslab(file_dataspace_id_2, H5S_SELECT_SET, offset_c, NULL, count, block_c);
70 
71  hsize_t mdim_2[1] = {block_c[0]};
72 
73  //Create data space in memory
74  hid_t mem_dataspace_id_2 = H5Screate_simple(1, mdim_2, NULL);
75 
76  // Read the dataset.
77  H5Dread(dataset_2, H5T_NATIVE_CHAR, mem_dataspace_id_2, file_dataspace_id_2, plist_id, (char *)mem.getPointer() + coffset);
78 
79  coffset += std::min((size_t)(to_read),(size_t)0x7FFFFFFF);
80  to_read -= std::min((size_t)(to_read),(size_t)0x7FFFFFFF);
81 
82  read_test = (to_read != 0);
83  v_cl.max(read_test);
84  v_cl.execute();
85  }
86 
87  mem.allocate(pmem.size());
88 
89  Unpack_stat ps;
90 
91  openfpm::vector<device_grid> loc_grid_old_unp;
93 
94  Unpacker<typename std::remove_reference<decltype(loc_grid_old)>::type,HeapMemory>::unpack(mem,loc_grid_old_unp,ps,1);
95  Unpacker<typename std::remove_reference<decltype(gdb_ext_old)>::type,HeapMemory>::unpack(mem,gdb_ext_old_unp,ps,1);
96 
97  for (size_t i = 0; i < loc_grid_old_unp.size(); i++)
98  {
99  loc_grid_old.add();
100  loc_grid_old.last().swap(loc_grid_old_unp.get(i));
101  }
102 
103  for (size_t i = 0; i < gdb_ext_old_unp.size(); i++)
104  gdb_ext_old.add(gdb_ext_old_unp.get(i));
105 
106  mem.decRef();
107  delete &mem;
108 
109  }
110 
111 public:
112 
113  template<typename device_grid> inline void load(const std::string & filename,
114  openfpm::vector<device_grid> & loc_grid_old,
116  {
117  Vcluster<> & v_cl = create_vcluster();
118 
119  MPI_Comm comm = v_cl.getMPIComm();
120  MPI_Info info = MPI_INFO_NULL;
121 
122  int mpi_rank = v_cl.getProcessUnitID();
123  //int mpi_size = v_cl.getProcessingUnits();
124 
125  // Set up file access property list with parallel I/O access
126  hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
127  H5Pset_fapl_mpio(plist_id, comm, info);
128 
129  //Open a file
130  hid_t file = H5Fopen (filename.c_str(), H5F_ACC_RDONLY, plist_id);
131  H5Pclose(plist_id);
132 
133  //Open dataset
134  hid_t dataset = H5Dopen (file, "metadata", H5P_DEFAULT);
135 
136  //Create property list for collective dataset read
137  plist_id = H5Pcreate(H5P_DATASET_XFER);
138  H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE);
139 
140  //Select file dataspace
141  hid_t file_dataspace_id = H5Dget_space(dataset);
142 
143  hssize_t mpi_size_old = H5Sget_select_npoints (file_dataspace_id);
144 
145  //if (mpi_rank == 0)
146  //printf ("\nOld MPI size: %llu\n", mpi_size_old);
147 
148  //Where to read metadata
149  long int metadata_out[mpi_size_old];
150 
151  for (int i = 0; i < mpi_size_old; i++)
152  {
153  metadata_out[i] = 0;
154  }
155 
156  //Size for data space in memory
157  hsize_t mdim[1] = {(size_t)mpi_size_old};
158 
159  //Create data space in memory
160  hid_t mem_dataspace_id = H5Screate_simple(1, mdim, NULL);
161 
162  // Read the dataset.
163  H5Dread(dataset, H5T_NATIVE_LLONG, mem_dataspace_id, file_dataspace_id, plist_id, metadata_out);
164 
165 
166  openfpm::vector<size_t> metadata_accum;
167  metadata_accum.resize(mpi_size_old);
168 
169  metadata_accum.get(0) = 0;
170  for (int i = 1 ; i < mpi_size_old ; i++)
171  metadata_accum.get(i) = metadata_accum.get(i-1) + metadata_out[i-1];
172 
173  //Open dataset
174  hid_t dataset_2 = H5Dopen (file, "grid_dist", H5P_DEFAULT);
175 
176  //Create property list for collective dataset read
177  plist_id = H5Pcreate(H5P_DATASET_XFER);
178  H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE);
179 
181 
182  openfpm::vector<size_t> n_block;
183  n_block.resize(v_cl.getProcessingUnits());
184 
185 
186  for(size_t i = 0 ; i < n_block.size() ; i++)
187  n_block.get(i) = mpi_size_old / v_cl.getProcessingUnits();
188 
189  size_t rest_block = mpi_size_old % v_cl.getProcessingUnits();
190 
191  size_t max_block;
192 
193  if (rest_block != 0)
194  max_block = n_block.get(0) + 1;
195  else
196  max_block = n_block.get(0);
197 
198  //for(size_t i = 0 ; i < n_block.size() ; i++)
199  for(size_t i = 0 ; i < rest_block ; i++)
200  n_block.get(i) += 1;
201 
202 
203  //for(size_t i = 0 ; i < n_block.size() ; i++)
204  //std::cout << "n_block.get(i): " << n_block.get(i) << std::endl;
205 
206  size_t start_block = 0;
207  size_t stop_block = 0;
208 
209 
210  if (v_cl.getProcessUnitID() != 0)
211  {
212  for(size_t i = 0 ; i < v_cl.getProcessUnitID() ; i++)
213  start_block += n_block.get(i);
214  }
215 
216  stop_block = start_block + n_block.get(v_cl.getProcessUnitID());
217 
218 // std::cout << "ID: " << v_cl.getProcessUnitID() << "; Start block: " << start_block << "; " << "Stop block: " << stop_block << std::endl;
219 
220  if (mpi_rank >= mpi_size_old)
221  load_block(start_block,mpi_size_old,metadata_out,metadata_accum,plist_id,dataset_2,loc_grid_old,gdb_ext_old);
222  else
223  {
224  size_t n_bl = 0;
225  size_t lb = start_block;
226  for ( ; lb < stop_block ; lb++, n_bl++)
227  load_block(lb,mpi_size_old,metadata_out,metadata_accum,plist_id,dataset_2,loc_grid_old,gdb_ext_old);
228 
229  if (n_bl < max_block)
230  load_block(-1,mpi_size_old,metadata_out,metadata_accum,plist_id,dataset_2,loc_grid_old,gdb_ext_old);
231  }
232 
234 
235  //std::cout << "LOAD: sum: " << sum << std::endl;
236 
237  // Close the dataset.
238  H5Dclose(dataset);
239  H5Dclose(dataset_2);
240  // Close the file.
241  H5Fclose(file);
242  H5Pclose(plist_id);
243  }
244 
245 };
246 
247 
248 #endif /* OPENFPM_IO_SRC_HDF5_WR_HDF5_READER_GD_HPP_ */
size_t getProcessUnitID()
Get the process unit id.
Unpacker class.
Definition: Packer_util.hpp:20
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
virtual void * getPointer()
Return the pointer of the last allocation.
size_t size()
Stub size.
Definition: map_vector.hpp:211
This class allocate, and destroy CPU memory.
Definition: HeapMemory.hpp:39
virtual bool allocate(size_t sz)
Allocate a chunk of memory.
Implementation of VCluster class.
Definition: VCluster.hpp:58
This structure store the Box that define the domain inside the Ghost + domain box.
Definition: GBoxes.hpp:39
virtual size_t size() const
the the size of the allocated memory
Definition: HeapMemory.cpp:153
size_t getProcessingUnits()
Get the total number of processors.
virtual void incRef()
Increment the reference counter.
Definition: ExtPreAlloc.hpp:98
Unpacking status object.
Definition: Pack_stat.hpp:15
virtual void decRef()
Decrement the reference counter.