OpenFPM_pdata  4.1.0
Project that contain the implementation of distributed structures
HDF5_writer_gd.hpp
1 /*
2  * HDF5_writer_gd.hpp
3  *
4  * Created on: May 2, 2017
5  * Author: i-bird
6  */
7 
8 #ifndef OPENFPM_IO_SRC_HDF5_WR_HDF5_WRITER_GD_HPP_
9 #define OPENFPM_IO_SRC_HDF5_WR_HDF5_WRITER_GD_HPP_
10 
11 
12 #include "Packer_Unpacker/Pack_selector.hpp"
13 #include "Packer_Unpacker/Packer.hpp"
14 #include "Packer_Unpacker/Unpacker.hpp"
15 #include "util/GBoxes.hpp"
16 
17 template <>
18 class HDF5_writer<GRID_DIST>
19 {
20 public:
21 
22  template<typename device_grid>
23  inline void save(const std::string & filename,
24  const openfpm::vector<device_grid> & loc_grid,
25  const openfpm::vector<GBoxes<device_grid::dims>> & gdb_ext) const
26  {
27  Vcluster<> & v_cl = create_vcluster();
28 
29  //Pack_request vector
30  size_t req = 0;
31 
32  //Pack request
33  Packer<typename std::remove_reference<decltype(loc_grid)>::type,HeapMemory>::packRequest(loc_grid,req);
34  Packer<typename std::remove_reference<decltype(gdb_ext)>::type,HeapMemory>::packRequest(gdb_ext,req);
35 
36  //std::cout << "Req: " << req << std::endl;
37 
38  // allocate the memory
39  HeapMemory pmem;
40  //pmem.allocate(req);
41  ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(req,pmem));
42  mem.incRef();
43 
44  //Packing
45 
46  Pack_stat sts;
47 
48  Packer<typename std::remove_reference<decltype(loc_grid)>::type,HeapMemory>::pack(mem,loc_grid,sts);
49  Packer<typename std::remove_reference<decltype(gdb_ext)>::type,HeapMemory>::pack(mem,gdb_ext,sts);
50 
51  /*****************************************************************
52  * Create a new file with default creation and access properties.*
53  * Then create a dataset and write data to it and close the file *
54  * and dataset. *
55  *****************************************************************/
56 
57  int mpi_rank = v_cl.getProcessUnitID();
58  int mpi_size = v_cl.getProcessingUnits();
59 
60  MPI_Comm comm = v_cl.getMPIComm();
61  MPI_Info info = MPI_INFO_NULL;
62 
63  // Set up file access property list with parallel I/O access
64 
65  hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
66  H5Pset_fapl_mpio(plist_id, comm, info);
67 
68  // Create a new file collectively and release property list identifier.
69  hid_t file = H5Fcreate (filename.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
70  H5Pclose(plist_id);
71 
72  size_t sz = pmem.size();
73  //std::cout << "Pmem.size: " << pmem.size() << std::endl;
74  openfpm::vector<size_t> sz_others;
75  v_cl.allGather(sz,sz_others);
76  v_cl.execute();
77 
78  size_t sum = 0;
79 
80  for (size_t i = 0; i < sz_others.size(); i++)
81  sum += sz_others.get(i);
82 
83  //Size for data space in file
84  hsize_t fdim[1] = {sum};
85 
86  //Size for data space in file
87  hsize_t fdim2[1] = {(size_t)mpi_size};
88 
89  //Create data space in file
90  hid_t file_dataspace_id = H5Screate_simple(1, fdim, NULL);
91 
92  //Create data space in file
93  hid_t file_dataspace_id_2 = H5Screate_simple(1, fdim2, NULL);
94 
95  //if (mpi_rank == 0)
96  //std::cout << "Total object size: " << sum << std::endl;
97 
98  //Create data set in file
99  hid_t file_dataset = H5Dcreate (file, "grid_dist", H5T_NATIVE_CHAR, file_dataspace_id, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
100 
101  //Create data set 2 in file
102  hid_t file_dataset_2 = H5Dcreate (file, "metadata", H5T_NATIVE_LLONG, file_dataspace_id_2, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
103 
104  //H5Pclose(plist_id);
105  H5Sclose(file_dataspace_id);
106  H5Sclose(file_dataspace_id_2);
107 
108  hsize_t block[1] = {pmem.size()};
109 
110  //hsize_t stride[1] = {1};
111 
112  hsize_t count[1] = {1};
113 
114  hsize_t offset[1] = {0};
115 
116  for (int i = 0; i < mpi_rank; i++)
117  {
118  if (mpi_rank == 0)
119  {
120  /* coverity[dead_error_line] */
121  offset[0] = 0;
122  }
123  else
124  offset[0] += sz_others.get(i);
125  }
126 
127  // std::cout << "MPI rank: " << mpi_rank << ", MPI size: " << mpi_size << ", Offset: " << offset[0] << ", Block: " << block[0] << std::endl;
128 
129  long int metadata[mpi_size];
130 
131  for (int i = 0; i < mpi_size; i++)
132  metadata[i] = sz_others.get(i);
133 
134  //Select hyperslab in the file.
135  file_dataspace_id = H5Dget_space(file_dataset);
136 
137  //Create property list for collective dataset write.
138  plist_id = H5Pcreate(H5P_DATASET_XFER);
139  H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_COLLECTIVE);
140 
141  // We slipt the write in chunk of 2GB maximum
142  size_t to_write = block[0];
143  size_t coffset = 0;
144  while (to_write)
145  {
146  hsize_t block_c[1];
147  block_c[0] = std::min((size_t)(to_write),(size_t)0x7FFFFFFF);
148 
149  //Create data space in memory
150  hid_t mem_dataspace_id = H5Screate_simple(1, block_c, NULL);
151 
152  hsize_t offset_c[1] = {offset[0] + coffset};
153  H5Sselect_hyperslab(file_dataspace_id, H5S_SELECT_SET, offset_c, NULL, count, block_c);
154 
155  //Write a data set to a file
156  H5Dwrite(file_dataset, H5T_NATIVE_CHAR, mem_dataspace_id, file_dataspace_id, plist_id, (const char *)pmem.getPointer() + coffset);
157 
158  coffset += std::min((size_t)(to_write),(size_t)0x7FFFFFFF);
159  to_write -= std::min((size_t)(to_write),(size_t)0x7FFFFFFF);
160 
161  H5Sclose(mem_dataspace_id);
162  }
163 
164  file_dataspace_id_2 = H5Dget_space(file_dataset_2);
165 
166  //Write a data set 2 to a file
167  H5Dwrite(file_dataset_2, H5T_NATIVE_LLONG, H5S_ALL, file_dataspace_id_2, plist_id, metadata);
168 
169  //Close/release resources.
170  H5Dclose(file_dataset);
171  H5Sclose(file_dataspace_id);
172  H5Dclose(file_dataset_2);
173  H5Sclose(file_dataspace_id_2);
174  H5Pclose(plist_id);
175  H5Fclose(file);
176 
177  mem.decRef();
178  delete &mem;
179  }
180 
181 };
182 
183 #endif /* OPENFPM_IO_SRC_HDF5_WR_HDF5_WRITER_GD_HPP_ */
size_t getProcessUnitID()
Get the process unit id.
MPI_Comm getMPIComm()
Get the MPI_Communicator (or processor group) this VCluster is using.
bool allGather(T &send, openfpm::vector< T, Mem, gr > &v)
Gather the data from all processors.
size_t size()
Stub size.
Definition: map_vector.hpp:211
This class allocate, and destroy CPU memory.
Definition: HeapMemory.hpp:39
Implementation of VCluster class.
Definition: VCluster.hpp:58
virtual void * getPointer()
get a readable pointer with the data
Definition: HeapMemory.cpp:228
This structure store the Box that define the domain inside the Ghost + domain box.
Definition: GBoxes.hpp:39
void execute()
Execute all the requests.
virtual size_t size() const
the the size of the allocated memory
Definition: HeapMemory.cpp:153
Packing class.
Definition: Packer.hpp:49
size_t getProcessingUnits()
Get the total number of processors.
virtual void incRef()
Increment the reference counter.
Definition: ExtPreAlloc.hpp:98
virtual void decRef()
Decrement the reference counter.
It model an expression expr1 + ... exprn.
Definition: sum.hpp:92
Packing status object.
Definition: Pack_stat.hpp:60