Coverage for rhodent/density_matrices/distributed/time.py: 85%

145 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3from typing import Generator 

4import numpy as np 

5 

6from .base import RhoParameters 

7from rhodent.density_matrices.buffer import DensityMatrixBuffer 

8from rhodent.density_matrices.readers.gpaw import KohnShamRhoWfsReader 

9from rhodent.utils import safe_fill 

10from .base import BaseDistributor, RhoIndices 

11from ...typing import Communicator 

12from ...utils.memory import MemoryEstimate 

13 

14 

15class TimeDistributor(BaseDistributor): 

16 

17 """ Iteratively read density matrices in the time domain 

18 

19 This class uses the KohnShamRhoWfsReader to iteratively read wave functions 

20 (each rank reading the same times) and construct density matrices in the ground state 

21 Kohn-Sham basis for each time. The different ranks are reading different chunks of 

22 the density matrices. The density matrices are accumulated in a buffer and yielded 

23 when all times have been read. 

24 

25 Parameters 

26 ---------- 

27 rho_reader 

28 Density matrices reader 

29 comm 

30 Communicator 

31 log 

32 Logger 

33 """ 

34 

35 def __init__(self, 

36 rho_reader: KohnShamRhoWfsReader, 

37 parameters: RhoParameters | None = None, 

38 comm: Communicator | None = None): 

39 if rho_reader.comm.size > 1: 

40 raise ValueError('Serial TimeDistributor must have serial reader.') 

41 

42 super().__init__(rho_reader, parameters, comm=comm) 

43 

44 @property 

45 def dtype(self): 

46 return np.float64 

47 

48 @property 

49 def xshape(self): 

50 return (self.nt, ) 

51 

52 @property 

53 def dt(self): 

54 return self.rho_wfs_reader.dt 

55 

56 @property 

57 def nt(self): 

58 return self.rho_wfs_reader.nt 

59 

60 @property 

61 def time_t(self): 

62 return self.rho_wfs_reader.time_t 

63 

64 def __str__(self) -> str: 

65 lines = [] 

66 lines.append('Density matrices reader') 

67 lines.append(' Receiving density matrices in continuous chunks.') 

68 lines.append(f' shape {self._parameters.nnshape}') 

69 lines.append(f' received by {self.maxntimes} ranks') 

70 lines.append(f' {self.niters} iterations to process all chunks') 

71 

72 return '\n'.join(lines) 

73 

74 def get_memory_estimate(self) -> MemoryEstimate: 

75 narrays = (2 if self.yield_re and self.yield_im else 1) * len(self.derivative_order_s) 

76 shape = self._parameters.nnshape + (self.nt, narrays) 

77 

78 comment = f'Buffers hold {narrays} arrays ({self.describe_reim()})' 

79 memory_estimate = MemoryEstimate(comment=comment) 

80 memory_estimate.add_key('Density matrix buffers', shape, float, 

81 on_num_ranks=self.comm.size) 

82 

83 return memory_estimate 

84 

85 def __iter__(self) -> Generator[DensityMatrixBuffer, None, None]: 

86 """ Yield density matrices for all times, chunk by chunk 

87 

88 The wave function file is read in chunks, time by time, with 

89 the reading of times in the inner loop. 

90 

91 Yields 

92 ------ 

93 Chunks of the density matrix 

94 """ 

95 read_dm = DensityMatrixBuffer(self._parameters.nnshape, 

96 (self.nt, ), 

97 np.float64) 

98 if self.yield_re: 

99 read_dm.zeros(True, 0) 

100 if self.yield_im: 

101 read_dm.zeros(False, 0) 

102 

103 self.rho_wfs_reader.parallel_prepare() 

104 

105 # Loop over the chunks this rank should gather 

106 for indices in self.work_loop(self.comm.rank): 

107 if indices is None: 

108 continue 

109 # Convert to reading indices 

110 n1 = slice(self._parameters.n1min + indices.n1.start, self._parameters.n1min + indices.n1.stop) 

111 n2 = slice(self._parameters.n2min + indices.n2.start, self._parameters.n2min + indices.n2.stop) 

112 gen = self.rho_wfs_reader.iread(indices.s, indices.k, n1, n2) 

113 for t in self.rho_wfs_reader.work_loop(self.rho_wfs_reader.comm.rank): 

114 if t is None: 

115 continue 

116 dm_buffer = next(gen) 

117 for source_nn, dest_nn in zip(dm_buffer._iter_buffers(), read_dm[t]._iter_buffers()): 

118 dest_nn[:] = source_nn 

119 

120 yield read_dm 

121 

122 @classmethod 

123 def from_reader(cls, 

124 rho_nn_reader: KohnShamRhoWfsReader, 

125 parameters: RhoParameters, 

126 **kwargs) -> TimeDistributor: 

127 return cls(rho_nn_reader, parameters, **kwargs) 

128 

129 

130class AlltoallvTimeDistributor(TimeDistributor): 

131 

132 """ Iteratively read density matrices in the time domain 

133 

134 This class uses the KohnShamRhoWfsReader to iteratively read wave functions 

135 (one time per rank) and construct density matrices in the ground state Kohn-Sham 

136 basis for each time. When all ranks have read one time each, this class 

137 performs a redistribution of data, such that each rank only gets one chunk of the 

138 density matrices, but for all times. The density matrices are accumulated in a 

139 buffer and yielded when all times have been read. 

140 

141 Parameters 

142 ---------- 

143 rho_reader 

144 Density matrices reader 

145 """ 

146 

147 def __init__(self, 

148 rho_reader: KohnShamRhoWfsReader, 

149 parameters: RhoParameters | None = None): 

150 if rho_reader.lcao_rho_reader.striden != 0: 

151 raise ValueError('n stride must be 0 (index all) for alltoallv parallelized method') 

152 

153 BaseDistributor.__init__(self, rho_reader, parameters, comm=rho_reader.comm) 

154 

155 def __str__(self) -> str: 

156 nnshape = self.rho_wfs_reader.nnshape(*self.first_indices()) 

157 

158 lines = [] 

159 lines.append('Parallel density matrices reader') 

160 lines.append(' Receiving density matrices in continuous chunks.') 

161 lines.append(f' densiy matrix continous chunk {nnshape}') 

162 lines.append(f' split into smaller chunks {self._parameters.nnshape}') 

163 lines.append(f' received by {self.maxntimes} ranks') 

164 lines.append(' Redistributing into continuous time form.') 

165 lines.append(f' sent to {self.maxnchunks} ranks') 

166 lines.append(f' {self.niters} iterations to process all chunks') 

167 

168 return '\n'.join(lines) 

169 

170 def get_memory_estimate(self) -> MemoryEstimate: 

171 narrays = (2 if self.yield_re and self.yield_im else 1) * len(self.derivative_order_s) 

172 

173 nnshape = self.rho_wfs_reader.nnshape(*self.first_indices()) 

174 

175 before_shape = self._parameters.nnshape + (self.maxnchunks, narrays) 

176 after_shape = self._parameters.nnshape + (self.nt, narrays) 

177 total_after_size = np.prod(self._parameters.nnshape + (self.nt, )) * self.maxnchunks * narrays 

178 

179 shape = nnshape + (narrays, ) 

180 

181 comment = f'Buffers hold {narrays} arrays ({self.describe_reim()})' 

182 memory_estimate = MemoryEstimate(comment=comment) 

183 memory_estimate.add_key('Density matrix chunks', shape, float, 

184 on_num_ranks=self.maxntimes) 

185 memory_estimate.add_key('Before parallel redistribution', before_shape, float, 

186 on_num_ranks=self.maxntimes) 

187 memory_estimate.add_key('After parallel redistribution', after_shape, float, 

188 total_size=total_after_size, 

189 on_num_ranks=self.maxnchunks) 

190 

191 return memory_estimate 

192 

193 def first_indices(self): 

194 for first_indices_r in self.work_loop_by_ranks(): 

195 break 

196 

197 indices_by_rank = [chunk for chunk in first_indices_r if chunk is not None] 

198 indices_concat, _ = RhoIndices.concatenate_indices(indices_by_rank) 

199 return indices_concat 

200 

201 def __iter__(self) -> Generator[DensityMatrixBuffer, None, None]: 

202 """ Yield density matrices for all times, chunk by chunk 

203 

204 The wave function file is read in chunks, time by time. However, 

205 chunks are grouped together so that the density matrix at each 

206 time is read in large chunks. Each rank reads the same chunk for 

207 a different time. Then, the chunks and times are redistributed, 

208 so that each rank now holds a small chunk, but for many times. 

209 The same chunk is read for all times before it is yielded. 

210 

211 Yields 

212 ------ 

213 Chunks of the density matrix 

214 """ 

215 log = self.log 

216 

217 self.rho_wfs_reader.parallel_prepare() 

218 

219 # Here x is a compound index for a slice of both n and M 

220 for chunki, chunks_r in enumerate(self.work_loop_by_ranks()): 

221 log.start('read_alltoallv') 

222 

223 # The work this rank is supposed to read 

224 indices = chunks_r[self.comm.rank] 

225 indices_by_rank = [chunk for chunk in chunks_r if chunk is not None] 

226 

227 # Number of chunks of nn-indices being read 

228 nchunks = len(indices_by_rank) 

229 assert nchunks > 0 

230 

231 # Find out how much of the total density matrix need to be read to get only 

232 # the required chunks 

233 indices_concat, reduced_indices_by_rank = RhoIndices.concatenate_indices(indices_by_rank) 

234 

235 if indices is None: 

236 # This rank does not want any slices of n1 and n2. 

237 # It will still potentially participate in the parallel reading of times 

238 assert self.comm.rank >= nchunks 

239 

240 n1 = slice(self._parameters.n1min + indices_concat.n1.start, 

241 self._parameters.n1min + indices_concat.n1.stop) 

242 n2 = slice(self._parameters.n2min + indices_concat.n2.start, 

243 self._parameters.n2min + indices_concat.n2.stop) 

244 

245 if self.comm.rank < self.maxntimes: 

246 # This rank will read 

247 contiguous_chunks_buffer = DensityMatrixBuffer(self._parameters.nnshape, 

248 (nchunks, ), 

249 np.float64) 

250 else: 

251 # This rank does not read any times 

252 contiguous_chunks_buffer = DensityMatrixBuffer(self._parameters.nnshape, 

253 (0, ), 

254 np.float64) 

255 if self.comm.rank < nchunks: 

256 # This rank will get a chunk of the density matrices after redistribution 

257 contiguous_time_buffer = DensityMatrixBuffer(self._parameters.nnshape, 

258 (self.nt, ), 

259 np.float64) 

260 else: 

261 contiguous_time_buffer = DensityMatrixBuffer(self._parameters.nnshape, 

262 (0, ), 

263 np.float64) 

264 if self.yield_re: 

265 contiguous_chunks_buffer.zeros(True, 0) 

266 contiguous_time_buffer.zeros(True, 0) 

267 if self.yield_im: 

268 contiguous_chunks_buffer.zeros(False, 0) 

269 contiguous_time_buffer.zeros(False, 0) 

270 

271 gen = self.rho_wfs_reader.iread(indices_concat.s, indices_concat.k, n1, n2) 

272 

273 for t_r in self.rho_wfs_reader.work_loop_by_ranks(): 

274 # Number of times being read 

275 ntimes = sum(1 for t in t_r if t is not None) 

276 # Time index this rank is reading, or None if not reading 

277 globalt = t_r[self.comm.rank] 

278 

279 # Read the density matrices for one time per rank, 

280 # with each rank reading a large chunk of the density matrix 

281 if globalt is not None: 

282 read_dm = next(gen) 

283 

284 for recvrank, readindices in enumerate(reduced_indices_by_rank): 

285 for source_nn, dest_nn in zip(read_dm._iter_buffers(), 

286 contiguous_chunks_buffer[recvrank]._iter_buffers()): 

287 safe_fill(dest_nn, source_nn[readindices.n1, readindices.n2]) 

288 else: 

289 # This rank actually has no time to read (number of times 

290 # is not evenly divisible by number of ranks, and this rank 

291 # is trying to read past the end) 

292 # This rank will still participate in the alltoallv operation 

293 assert self.comm.rank >= ntimes 

294 

295 # Perform the redistributions, so that each rank now holds 

296 # a smaller chunk of the density matrix, but for many times. 

297 contiguous_chunks_buffer.redistribute( 

298 contiguous_time_buffer, comm=self.comm, 

299 source_indices_r=[(r, ) if r < nchunks else None for r in range(self.comm.size)], 

300 target_indices_r=[None if t is None else (t, ) for t in t_r], 

301 log=log if 0 in t_r else None) 

302 

303 if self.comm.rank == 0: 

304 log(f'Chunk {chunki+1}/{self.niters}: Read and distributed density matrices in ' 

305 f'{log.elapsed("read_alltoallv"):.1f}s', who='Response', flush=True) 

306 

307 if indices is not None: 

308 yield contiguous_time_buffer