OpenFPM_pdata  1.1.0
Project that contain the implementation of distributed structures
 All Data Structures Namespaces Functions Variables Typedefs Enumerations Friends Pages
HDF5_reader_vd.hpp
1 /*
2  * HDF5_reader_vd.hpp
3  *
4  * Created on: May 1, 2017
5  * Author: i-bird
6  */
7 
8 #ifndef OPENFPM_IO_SRC_HDF5_WR_HDF5_READER_VD_HPP_
9 #define OPENFPM_IO_SRC_HDF5_WR_HDF5_READER_VD_HPP_
10 
11 
12 template <>
13 class HDF5_reader<VECTOR_DIST>
14 {
15 private:
16 
17  template<unsigned int dim, typename St,typename prp>
18  bool load_block(long int bid,
19  hssize_t mpi_size_old,
20  int * metadata_out,
21  openfpm::vector<size_t> metadata_accum,
22  hid_t plist_id,
23  hid_t dataset_2,
24  size_t & g_m,
26  openfpm::vector<prp> & v_prp)
27  {
28  hsize_t offset[1];
29  hsize_t block[1];
30 
31  if (bid < mpi_size_old && bid != -1)
32  {
33  offset[0] = metadata_accum.get(bid);
34  block[0] = metadata_out[bid];
35  }
36  else
37  {
38  offset[0] = 0;
39  block[0] = 0;
40  }
41 
42  hsize_t count[1] = {1};
43 
44 
45  //Select file dataspace
46  hid_t file_dataspace_id_2 = H5Dget_space(dataset_2);
47  if (file_dataspace_id_2 < 0) {return false;}
48 
49  herr_t err = H5Sselect_hyperslab(file_dataspace_id_2, H5S_SELECT_SET, offset, NULL, count, block);
50  if (err < 0) {return false;}
51 
52  hsize_t mdim_2[1] = {block[0]};
53 
54 
55  //Create data space in memory
56  hid_t mem_dataspace_id_2 = H5Screate_simple(1, mdim_2, NULL);
57  if (mem_dataspace_id_2 < 0) {return false;}
58 
59  size_t sum = 0;
60 
61  for (int i = 0; i < mpi_size_old; i++)
62  sum += metadata_out[i];
63 
64  // allocate the memory
65  HeapMemory pmem;
66 
67  ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(block[0],pmem));
68  mem.incRef();
69 
70  // Read the dataset.
71  err = H5Dread(dataset_2, H5T_NATIVE_CHAR, mem_dataspace_id_2, file_dataspace_id_2, plist_id, (char *)mem.getPointer());
72  if (err < 0) {return false;}
73 
74  mem.allocate(pmem.size());
75 
76  Unpack_stat ps;
77 
79 
80  openfpm::vector<prp> v_prp_unp;
81 
84 
85  mem.decRef();
86  delete &mem;
87 
88  for (size_t i = 0; i < v_pos_unp.size(); i++)
89  v_pos.add(v_pos_unp.get(i));
90 
91  for (size_t i = 0; i < v_prp_unp.size(); i++)
92  v_prp.add(v_prp_unp.get(i));
93 
94  g_m = v_pos.size();
95 
96  H5Sclose(file_dataspace_id_2);
97  H5Sclose(mem_dataspace_id_2);
98 
99  return true;
100  }
101 
102 public:
103 
104  template<unsigned int dim, typename St, typename prp> inline bool load(const std::string & filename,
106  openfpm::vector<prp> & v_prp,
107  size_t & g_m)
108  {
109  Vcluster & v_cl = create_vcluster();
110 
111  v_pos.clear();
112  v_prp.clear();
113 
114  g_m = 0;
115 
116  MPI_Comm comm = v_cl.getMPIComm();
117  MPI_Info info = MPI_INFO_NULL;
118 
119  int mpi_rank = v_cl.getProcessUnitID();
120 
121  // Set up file access property list with parallel I/O access
122  hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
123  if (plist_id == -1) {return false;}
124  herr_t err = H5Pset_fapl_mpio(plist_id, comm, info);
125  if (err < 0)
126  return false;
127 
128  //Open a file
129  hid_t file = H5Fopen (filename.c_str(), H5F_ACC_RDONLY, plist_id);
130  if (file < 0) {return false;}
131  H5Pclose(plist_id);
132 
133  //Open dataset
134  hid_t dataset = H5Dopen (file, "metadata", H5P_DEFAULT);
135  if (dataset < 0) {return false;}
136 
137  //Create property list for collective dataset read
138  plist_id = H5Pcreate(H5P_DATASET_XFER);
139  if (plist_id == -1) {return false;}
140  H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE);
141 
142  //Select file dataspace
143  hid_t file_dataspace_id = H5Dget_space(dataset);
144  if (file_dataspace_id < 0) {return false;}
145 
146  hssize_t mpi_size_old = H5Sget_select_npoints (file_dataspace_id);
147  if (mpi_size_old < 0) {return false;}
148 
149  //Where to read metadata
150  int metadata_out[mpi_size_old];
151 
152  for (int i = 0; i < mpi_size_old; i++)
153  {metadata_out[i] = 0;}
154 
155  //Size for data space in memory
156  hsize_t mdim[1] = {(size_t)mpi_size_old};
157 
158  //Create data space in memory
159  hid_t mem_dataspace_id = H5Screate_simple(1, mdim, NULL);
160  if (mem_dataspace_id < 0) {return false;}
161 
162  // Read the dataset.
163  err = H5Dread(dataset, H5T_NATIVE_INT, mem_dataspace_id, file_dataspace_id, plist_id, metadata_out);
164  if (err < 0) {return false;}
165 
166  openfpm::vector<size_t> metadata_accum;
167  metadata_accum.resize(mpi_size_old);
168 
169  metadata_accum.get(0) = 0;
170  for (int i = 1 ; i < mpi_size_old ; i++)
171  metadata_accum.get(i) = metadata_accum.get(i-1) + metadata_out[i-1];
172 
173  //Open dataset
174  hid_t dataset_2 = H5Dopen (file, "vector_dist", H5P_DEFAULT);
175  if (dataset_2 < 0) {return false;}
176 
177  //Create property list for collective dataset read
178  plist_id = H5Pcreate(H5P_DATASET_XFER);
179  if (plist_id == -1) {return false;}
180 
181  err = H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE);
182  if (err < 0) {return false;}
183 
184  openfpm::vector<size_t> n_block;
185  n_block.resize(v_cl.getProcessingUnits());
186 
187 
188  for(size_t i = 0 ; i < n_block.size() ; i++)
189  n_block.get(i) = mpi_size_old / v_cl.getProcessingUnits();
190 
191  size_t rest_block = mpi_size_old % v_cl.getProcessingUnits();
192 
193  size_t max_block;
194 
195  if (rest_block != 0)
196  max_block = n_block.get(0) + 1;
197  else
198  max_block = n_block.get(0);
199 
200  //for(size_t i = 0 ; i < n_block.size() ; i++)
201  for(size_t i = 0 ; i < rest_block ; i++)
202  n_block.get(i) += 1;
203 
204  size_t start_block = 0;
205  size_t stop_block = 0;
206 
207 
208  if (v_cl.getProcessUnitID() != 0)
209  {
210  for(size_t i = 0 ; i < v_cl.getProcessUnitID() ; i++)
211  start_block += n_block.get(i);
212  }
213 
214  stop_block = start_block + n_block.get(v_cl.getProcessUnitID());
215 
216  if (mpi_rank >= mpi_size_old)
217  load_block(start_block,mpi_size_old,metadata_out,metadata_accum,plist_id,dataset_2,g_m,v_pos,v_prp);
218  else
219  {
220  size_t n_bl = 0;
221  size_t lb = start_block;
222  for ( ; lb < stop_block ; lb++, n_bl++)
223  load_block(lb,mpi_size_old,metadata_out,metadata_accum,plist_id,dataset_2,g_m,v_pos,v_prp);
224 
225  if (n_bl < max_block)
226  load_block(-1,mpi_size_old,metadata_out,metadata_accum,plist_id,dataset_2,g_m,v_pos,v_prp);
227  }
228 
229  // Close open object
230  H5Sclose(mem_dataspace_id);
231  H5Sclose(file_dataspace_id);
232  // Close the dataset.
233  H5Dclose(dataset);
234  H5Dclose(dataset_2);
235  // Close the file.
236  H5Fclose(file);
237  H5Pclose(plist_id);
238 
239  return true;
240  }
241 };
242 
243 
244 #endif /* OPENFPM_IO_SRC_HDF5_WR_HDF5_READER_VD_HPP_ */
Unpacker class.
Definition: Packer_util.hpp:20
size_t getProcessUnitID()
Get the process unit id.
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
virtual void * getPointer()
Return the pointer of the last allocation.
size_t size()
Stub size.
Definition: map_vector.hpp:70
This class implement the point shape in an N-dimensional space.
Definition: Point.hpp:22
virtual size_t size() const
the the size of the allocated memory
Definition: HeapMemory.cpp:157
This class allocate, and destroy CPU memory.
Definition: HeapMemory.hpp:39
virtual bool allocate(size_t sz)
Allocate a chunk of memory.
Definition: ExtPreAlloc.hpp:92
Implementation of VCluster class.
Definition: VCluster.hpp:36
virtual void incRef()
Increment the reference counter.
Definition: ExtPreAlloc.hpp:69
Unpacking status object.
Definition: Pack_stat.hpp:15
It model an expression expr1 + ... exprn.
Definition: sum.hpp:92
size_t getProcessingUnits()
Get the total number of processors.