Coverage for rhodent/density_matrices/frequency.py: 98%

138 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3from typing import Generator 

4from pathlib import Path 

5 

6import numpy as np 

7 

8from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition 

9from gpaw.tddft.units import au_to_eV, eV_to_au 

10from gpaw.mpi import world 

11 

12from .distributed.frequency import FourierTransformer 

13from .density_matrix import DensityMatrix 

14from .readers.numpy import FrequencyDensityMatrixReader 

15from .base import BaseDensityMatrices, WorkMetadata 

16from ..perturbation import create_perturbation, PerturbationLike, DeltaKick, NoPerturbation 

17from ..utils import Logger, two_communicator_sizes 

18from ..utils.memory import MemoryEstimate 

19from ..typing import Array1D 

20 

21 

22class FrequencyDensityMatrixMetadata(WorkMetadata): 

23 

24 """ Metadata to the density matrices. 

25 

26 Parameters 

27 ---------- 

28 density_matrices 

29 Parent of this object. 

30 globalw 

31 Frequency index. 

32 localw 

33 Frequency index on this rank. 

34 globalr 

35 Real/imaginary index. 

36 localr 

37 Real/imaginary index on this rank. 

38 """ 

39 

40 density_matrices: FrequencyDensityMatrices 

41 globalw: int 

42 localw: int 

43 globalr: int 

44 localr: int 

45 

46 def __new__(cls, 

47 density_matrices: FrequencyDensityMatrices, 

48 globalw: int, 

49 globalr: int, 

50 localw: int, 

51 localr: int): 

52 self = WorkMetadata.__new__(cls, density_matrices=density_matrices) 

53 self.globalw = globalw 

54 self.globalr = globalr 

55 self.localw = localw 

56 self.localr = localr 

57 return self 

58 

59 @property 

60 def global_indices(self): 

61 return (self.globalw, self.globalr) 

62 

63 @property 

64 def freq(self) -> float: 

65 """ Frequency in units of eV. """ 

66 return self.density_matrices.frequencies[self.globalw] 

67 

68 @property 

69 def reim(self) -> str: 

70 """ Returns real/imaginary tag ``'Re'`` or ``'Im'``. 

71 

72 The tag corresponds to the Fourier transform of the real 

73 or imaginary part of the density matrix. 

74 """ 

75 return self.density_matrices.reim[self.globalr] 

76 

77 @property 

78 def desc(self) -> str: 

79 return f'{self.reim} @ Freq. {self.freq:.3f}eV' 

80 

81 @property 

82 def real(self) -> bool: 

83 return self.reim == 'Re' 

84 

85 @property 

86 def imag(self) -> bool: 

87 return not self.real 

88 

89 

90class FrequencyDensityMatrices(BaseDensityMatrices[FrequencyDensityMatrixMetadata]): 

91 

92 """ 

93 Collection of density matrices in the Kohn-Sham basis in the frequency 

94 domain, for different frequencies. 

95 

96 Parameters 

97 ---------- 

98 ksd 

99 KohnShamDecomposition object or file name. 

100 frequencies 

101 Compute density matrices for these frequencies (or as close to them as possible). In units of eV. 

102 frequency_broadening 

103 Gaussian broadening width in units of eV. Default (0) is no broadening. 

104 real 

105 Calculate the Fourier transform of the real part of the density matrix. 

106 imag 

107 Calculate the Fourier transform of the imaginary part of the density matrix. 

108 calc_size 

109 Size of the calculation communicator. 

110 """ 

111 

112 def __init__(self, 

113 ksd: KohnShamDecomposition | str, 

114 frequencies: list[float] | Array1D[np.float64], 

115 frequency_broadening: float = 0, 

116 real: bool = True, 

117 imag: bool = True, 

118 calc_size: int = 1, 

119 log: Logger | None = None): 

120 self._freq_w = np.array(frequencies) 

121 self._frequency_broadening = frequency_broadening 

122 

123 super().__init__(ksd=ksd, 

124 real=real, imag=imag, 

125 calc_size=calc_size, log=log) 

126 

127 @property 

128 def frequencies(self) -> Array1D[np.float64]: 

129 """ Frequencies in units of eV. """ 

130 return self._freq_w # type: ignore 

131 

132 @property 

133 def frequency_broadening(self) -> float: 

134 """ Gaussian broadening width for frequencies in units of eV. """ 

135 return self._frequency_broadening 

136 

137 def work_loop(self, 

138 rank: int) -> Generator[FrequencyDensityMatrixMetadata | None, None, None]: 

139 nw = len(self.frequencies) 

140 nwperrank = (nw + self.loop_comm.size - 1) // self.loop_comm.size 

141 

142 for localw in range(nwperrank): 

143 globalw = rank + localw * self.loop_comm.size 

144 for r in range(len(self.reim)): 

145 if globalw < nw: 

146 yield FrequencyDensityMatrixMetadata(density_matrices=self, globalw=globalw, 

147 localw=localw, globalr=r, localr=r) 

148 else: 

149 yield None 

150 

151 def write_to_disk(self, 

152 frho_fmt: str): 

153 """ Calculate the density matrices and save to disk. 

154 

155 Parameters 

156 ---------- 

157 frho_fmt 

158 Formatting string for the density matrices saved to disk. 

159 

160 The formatting string should be a plain string containing variable 

161 placeholders within curly brackets ``{}``. It should not be confused with 

162 a formatted string literal (f-string). 

163 

164 Example: 

165 

166 * frho_fmt = ``frho/w{freq:05.2f}-{reim}.npy``. 

167 

168 Accepts variables: 

169 

170 * ``{freq}`` - Frequency in units of eV. 

171 * ``{reim}`` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary 

172 part of density matrix. 

173 """ 

174 nlocaltot = len(self.local_work_plan) 

175 

176 # Iterate over density matrices on all ranks 

177 for ndone, (work, dm) in enumerate(self, 1): 

178 avg = self.log.elapsed('read')/ndone 

179 estrem = avg * (nlocaltot - ndone) 

180 self.log(f'Obtained density matrix {ndone:4.0f}/{nlocaltot:4.0f} on this rank. ' 

181 f'Avg: {avg:10.3f}s, estimated remaining: {estrem:10.3f}s', who='Writer', flush=True) 

182 fname_kw = dict(freq=work.freq, reim=work.reim) 

183 fpath = Path(frho_fmt.format(**fname_kw)) 

184 rho_ia = dm.rho_ia 

185 if self.calc_comm.rank == 0: 

186 assert isinstance(rho_ia, np.ndarray) 

187 fpath.parent.mkdir(parents=True, exist_ok=True) 

188 if fpath.suffix == '.npz': 

189 # Write numpy archive 

190 np.savez(str(fpath), rho_ia=rho_ia) 

191 else: 

192 # Save numpy binary file 

193 np.save(str(fpath), rho_ia) 

194 self.log(f'Written {fpath}', who='Writer', flush=True) 

195 world.barrier() 

196 

197 

198class FrequencyDensityMatricesFromDisk(FrequencyDensityMatrices): 

199 

200 """ 

201 Collection of density matrices in the Kohn-Sham basis in the frequency 

202 domain, for different frequencies. Read from disk. 

203 

204 Parameters 

205 ---------- 

206 ksd 

207 KohnShamDecomposition object or file name. 

208 frho_fmt 

209 Formatting string for the density matrices 

210 in frequency space saved to disk. 

211 

212 The formatting string should be a plain string containing variable 

213 placeholders within curly brackets ``{}``. It should not be confused with 

214 a formatted string literal (f-string). 

215 

216 Example: 

217 

218 * frho_fmt = ``frho/w{freq:05.2f}-{reim}.npy``. 

219 

220 Accepts variables: 

221 

222 * ``{freq}`` - Frequency in units of eV. 

223 * ``{reim}`` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary 

224 part of density matrix. 

225 perturbation 

226 The perturbation which the density matrices are a response to. 

227 frequencies 

228 Compute density matrices for these frequencies (or as close to them as possible). In units of eV. 

229 real 

230 Calculate the Fourier transform of the real part of the density matrix 

231 imag 

232 Calculate the Fourier transform of the imaginary part of the density matrix. 

233 calc_size 

234 Size of the calculation communicator. 

235 """ 

236 

237 def __init__(self, 

238 ksd: KohnShamDecomposition | str, 

239 frho_fmt: str, 

240 perturbation: PerturbationLike, 

241 frequencies: list[float] | Array1D[np.float64], 

242 real: bool = True, 

243 imag: bool = True, 

244 calc_size: int = 1, 

245 log: Logger | None = None): 

246 reader = FrequencyDensityMatrixReader(frho_fmt=frho_fmt, 

247 ksd=ksd, 

248 filter_frequencies=frequencies, 

249 real=real, imag=imag, 

250 log=log) 

251 self.reader = reader 

252 super().__init__(ksd=reader.ksd, frequencies=reader.frequencies, 

253 real=real, imag=imag, 

254 calc_size=calc_size, 

255 log=reader.log) 

256 self.perturbation = create_perturbation(perturbation) 

257 assert isinstance(self.perturbation, (DeltaKick, NoPerturbation)) 

258 

259 def __str__(self) -> str: 

260 return str(self.reader) 

261 

262 def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]: 

263 for work in self.local_work_plan: 

264 self.log.start('read') 

265 rho = self.reader.read(work.freq, work.reim == 'Re') 

266 if isinstance(self.perturbation, DeltaKick): 

267 # Only delta kick supported at this point for normalization 

268 rho /= self.perturbation.strength 

269 matrices = {0: rho if self.calc_comm.rank == 0 else None} 

270 dm = DensityMatrix(ksd=self.ksd, matrices=matrices) 

271 

272 yield work, dm 

273 

274 

275class FrequencyDensityMatricesFromWaveFunctions(FrequencyDensityMatrices): 

276 

277 """ 

278 Collection of density matrices in the Kohn-Sham basis in the frequency 

279 domain, for different frequencies. Obtained from the time-dependent wave functions file, 

280 which is read from disk. 

281 

282 Parameters 

283 ---------- 

284 ksd 

285 KohnShamDecomposition object or file name. 

286 wfs_fname 

287 File name of the time-dependent wave functions file. 

288 perturbation 

289 The perturbation which the density matrices are a response to. 

290 frequencies 

291 Compute density matrices for these frequencies (or as close to them as possible). In units of eV. 

292 frequency_broadening 

293 Gaussian broadening width in units of eV. Default (0) is no broadening. 

294 real 

295 Calculate the real part of density matrices. 

296 imag 

297 Calculate the imaginary part of density matrices. 

298 calc_size 

299 Size of the calculation communicator. 

300 stridet 

301 Skip this many steps when reading the time-dependent wave functions file. 

302 """ 

303 

304 def __init__(self, 

305 ksd: KohnShamDecomposition | str, 

306 wfs_fname: str, 

307 perturbation: PerturbationLike, 

308 frequencies: list[float] | Array1D[np.float64], 

309 frequency_broadening: float = 0, 

310 real: bool = True, 

311 imag: bool = True, 

312 log: Logger | None = None, 

313 calc_size: int = 1, 

314 stridet: int = 1): 

315 _, calc_size = two_communicator_sizes(-1, calc_size) 

316 # The calc_comm rank 0's are world ranks 0, with a spacing of calc_size 

317 result_on_ranks = list(range(0, world.size, calc_size)) 

318 

319 rho_nn_fft = FourierTransformer.from_parameters( 

320 wfs_fname, ksd, 

321 perturbation=perturbation, 

322 yield_re=real, 

323 yield_im=imag, 

324 filter_frequencies=np.array(frequencies) * eV_to_au, 

325 frequency_broadening=frequency_broadening * eV_to_au, 

326 stridet=stridet, 

327 result_on_ranks=result_on_ranks, 

328 log=log) 

329 self.rho_nn_fft = rho_nn_fft 

330 super().__init__(ksd=rho_nn_fft.ksd, frequencies=rho_nn_fft.freq_w * au_to_eV, 

331 frequency_broadening=frequency_broadening, 

332 real=real, imag=imag, 

333 calc_size=calc_size, 

334 log=rho_nn_fft.log) 

335 

336 def __str__(self) -> str: 

337 lines = [] 

338 lines.append('Response from time-dependent wave functions') 

339 lines.append('') 

340 lines += ['' + line for line in str(self.rho_nn_fft.rho_nn_reader.rho_wfs_reader).split('\n')] 

341 lines.append('') 

342 lines += ['' + line for line in str(self.rho_nn_fft.rho_nn_reader).split('\n')] 

343 lines.append('') 

344 lines += ['' + line for line in str(self.rho_nn_fft).split('\n')] 

345 

346 return '\n'.join(lines) 

347 

348 def get_memory_estimate(self) -> MemoryEstimate: 

349 return self.rho_nn_fft.get_memory_estimate() 

350 

351 @property 

352 def myw(self) -> list[int]: 

353 """ List of indices corresponding to the frequency indices on held on this rank. """ 

354 return self.rho_nn_fft.my_work() 

355 

356 def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]: 

357 parameters = self.rho_nn_fft.rho_nn_reader._parameters 

358 flt = (slice(parameters.n1size), slice(parameters.n2size)) 

359 

360 dist_buffer = self.rho_nn_fft.dist_buffer # Perform the redistribution 

361 self.ksd.distribute(self.calc_comm) 

362 

363 for work in self.local_work_plan: 

364 if self.calc_comm.rank == 0: 

365 assert self.myw[work.localw] == work.globalw 

366 

367 if self.calc_comm.rank == 0: 

368 # Buffer shape is i, a, frequencies 

369 rho_ia = dist_buffer._get_data(work.reim == 'Re', 0)[flt + (work.localw, )] 

370 matrices = {0: rho_ia if self.calc_comm.rank == 0 else None} 

371 dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm) 

372 

373 yield work, dm 

374 

375 def parallel_prepare(self): 

376 self.rho_nn_fft.dist_buffer # Perform the redistribution