Coverage for tests/mock.py: 79%

193 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3import numpy as np 

4from numpy.typing import NDArray 

5from typing import Collection, Generator, Iterator 

6 

7from gpaw.mpi import world 

8from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition 

9 

10from rhodent.density_matrices.buffer import DensityMatrixBuffer 

11from rhodent.density_matrices.density_matrix import DensityMatrix 

12from rhodent.density_matrices.readers.gpaw import KohnShamRhoWfsReader, LCAORhoWfsReader 

13from rhodent.density_matrices.frequency import FrequencyDensityMatrices 

14from rhodent.density_matrices.time import ConvolutionDensityMatrices, ConvolutionDensityMatrixMetadata 

15from rhodent.perturbation import PerturbationLike, PulsePerturbation 

16from rhodent.response import BaseResponse 

17from rhodent.utils import Logger, add_fake_kpts 

18from rhodent.utils.logging import format_frequencies, format_times 

19from rhodent.voronoi import AtomProjectionsType, VoronoiWeights 

20 

21 

22class MockVoronoiWeights(VoronoiWeights): 

23 

24 """ Read Voronoi weights from ulm file. 

25 

26 Parameters 

27 ---------- 

28 nn 

29 Number of states 

30 atom_projections 

31 List of projections on atoms 

32 comm 

33 GPAW MPI communicator object. Defaults to world 

34 """ 

35 

36 def __init__(self, 

37 nn: int, 

38 atom_projections: AtomProjectionsType, 

39 comm=None): 

40 if comm is None: 

41 comm = world 

42 

43 self._log = Logger() 

44 self._comm = comm 

45 self._atom_projections = atom_projections 

46 self._nn = nn 

47 

48 @property 

49 def atom_projections(self) -> AtomProjectionsType: 

50 return self._atom_projections 

51 

52 @property 

53 def nn(self) -> int: 

54 return self._nn 

55 

56 def __iter__(self) -> Iterator[NDArray[np.float64] | None]: 

57 shape = (self.nn, self.nn) 

58 for proj_atoms in self.atom_projections: 

59 if self.comm.rank == 0: 

60 weight_nn = np.zeros(shape, float) 

61 for a in proj_atoms: 

62 rng = np.random.default_rng(1423 + 13 * a) 

63 weight_nn += rng.uniform(-1e-5, 1e-5, shape) 

64 else: 

65 weight_nn = None 

66 

67 yield weight_nn 

68 

69 @property 

70 def saved_fields(self): 

71 return {} 

72 

73 

74class MockLCAOWfsReader(LCAORhoWfsReader): 

75 

76 def __init__(self): 

77 self._striden = 0 

78 self._rho0_skMM = 0 

79 

80 

81class MockKohnShamRhoWfsReader(KohnShamRhoWfsReader): 

82 """ Pretend reader of Kohn-Sham density matrices. 

83 

84 Yield density matrices time by time. 

85 

86 Parameters 

87 ---------- 

88 ksd 

89 KohnShamDecomposition object or file name to the ksd file. 

90 comm 

91 MPI communicator. 

92 yield_re 

93 Whether to yield the real part of wave functions/density matrices. 

94 yield_im 

95 Whether to yield the imaginary part of wave functions/density matrices. 

96 filter_times 

97 A list of times to generate in atomic units. 

98 log 

99 Logger object. 

100 """ 

101 def __init__(self, 

102 ksd: str | KohnShamDecomposition, 

103 comm=world, 

104 yield_re: bool = True, 

105 yield_im: bool = True, 

106 filter_times: list[float] | NDArray[np.float64] | None = None, 

107 log: Logger | None = None): 

108 if comm is None: 

109 comm = world 

110 self._comm = comm 

111 

112 # Set up ksd 

113 if isinstance(ksd, KohnShamDecomposition): 

114 self._ksd = ksd 

115 else: 

116 self._ksd = KohnShamDecomposition(filename=ksd) 

117 add_fake_kpts(self._ksd) 

118 

119 if log is None: 

120 log = Logger() 

121 self._log = log 

122 self._time_t = np.array(filter_times) 

123 self._flt_t = slice(None) 

124 self._yield_re = yield_re 

125 self._yield_im = yield_im 

126 

127 self._C0S_sknM: NDArray[np.float64] | None = None 

128 self._rho0_sknn: NDArray[np.float64] | None = None 

129 self.lcao_rho_reader = MockLCAOWfsReader() 

130 

131 def iread(self, 

132 s: int, 

133 k: int, 

134 n1: slice, 

135 n2: slice) -> Generator[DensityMatrixBuffer, None, None]: 

136 """ Read the density matrices time by time. 

137 

138 Parameters 

139 ---------- 

140 s, k, n1, n2 

141 Read these indices. 

142 """ 

143 dm_buffer = DensityMatrixBuffer(self.nnshape(s, k, n1, n2), (), np.float64) 

144 if self.yield_re: 

145 dm_buffer.zeros(True, 0) 

146 if self.yield_im: 

147 dm_buffer.zeros(False, 0) 

148 self.C0S_sknM # Read this on all ranks 

149 nn = self.C0S_sknM.shape[3] 

150 full_nnshape = (nn, nn) 

151 

152 for globalt in self.work_loop(self.comm.rank): 

153 if globalt is None: 

154 continue 

155 self.log.start('read') 

156 rngseed = globalt + 832 * s + 42140 * k 

157 

158 if self.yield_re: 

159 rng = np.random.default_rng(32636 + rngseed) 

160 Rerho_x = rng.uniform(-1e-5, 1e-5, full_nnshape)[n1, n2] 

161 dm_buffer.safe_fill(True, 0, Rerho_x) 

162 if self.yield_im: 

163 rng = np.random.default_rng(94234 + rngseed) 

164 Rerho_x = rng.uniform(-1e-5, 1e-5, full_nnshape)[n1, n2] 

165 dm_buffer.safe_fill(False, 0, Rerho_x) 

166 

167 yield dm_buffer 

168 

169 

170class MockTimeDensityMatrices(ConvolutionDensityMatrices): 

171 

172 """ 

173 Pretend ConvolutionDensityMatrices that are filled with random values. 

174 

175 The random values are generated using a fixed seed. 

176 

177 Parameters 

178 ---------- 

179 ksd 

180 KohnShamDecomposition object or file name. 

181 times 

182 Produce density matrices for these times. In units of as. 

183 real 

184 Calculate the real part of density matrices. 

185 imag 

186 Calculate the imaginary part of density matrices. 

187 calc_size 

188 Size of the calculation communicator. 

189 """ 

190 

191 def __init__(self, 

192 ksd: KohnShamDecomposition | str, 

193 times: list[float] | NDArray[np.float64], 

194 real: bool = True, 

195 imag: bool = True, 

196 calc_size: int = 1): 

197 super().__init__(ksd=ksd, times=times, pulses=[None], calc_size=1, real=real, imag=imag) 

198 

199 imin, imax, amin, amax = self.ksd.ialims() 

200 

201 # Read density matrices corresponding to ksd ialims 

202 self._n1slice = slice(imin, imax + 1) 

203 self._n2slice = slice(amin, amax + 1) 

204 

205 self._runtime_verify_work_loop() 

206 

207 self._time_t = np.array(times) 

208 

209 def __str__(self) -> str: 

210 lines = ['Mock response'] 

211 

212 lines.append('') 

213 lines.append(f'Calculating response for {self.nt} times and {len(self.pulses)} pulses') 

214 lines.append(f' times: {format_times(self.times)}') 

215 

216 return '\n'.join(lines) 

217 

218 def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]: 

219 shape = (self._n1slice.stop - self._n1slice.start, self._n2slice.stop - self._n2slice.start) 

220 assert self.calc_comm.size == 1 # TODO 

221 

222 for work in self.work_loop(self.loop_comm.rank): 

223 if work is None: 

224 # Nothing more to do 

225 return 

226 

227 rho_ia = np.zeros(shape, dtype=complex) 

228 

229 if 'Re' in self.reim: 

230 rng = np.random.default_rng(248203 + work.globalt) 

231 rho_ia += rng.uniform(-1e-5, 1e-5, shape) 

232 if 'Im' in self.reim: 

233 rng = np.random.default_rng(614203 + work.globalt) 

234 rho_ia += 1.0j * rng.uniform(-1e-5, 1e-5, shape) 

235 

236 matrices = {0: rho_ia} 

237 dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm) 

238 

239 yield work, dm 

240 

241 def work_loop(self, 

242 rank: int) -> Generator[ConvolutionDensityMatrixMetadata | None, None, None]: 

243 nt = self.nt 

244 ntperrank = (nt + self.loop_comm.size - 1) // self.loop_comm.size 

245 

246 for localt in range(ntperrank): 

247 globalt = rank + localt * self.loop_comm.size 

248 if globalt < nt: 

249 yield ConvolutionDensityMatrixMetadata(density_matrices=self, globalt=globalt, localt=localt, 

250 globalp=0, localp=0) 

251 else: 

252 yield None 

253 

254 

255class MockConvolutionDensityMatrices(ConvolutionDensityMatrices): 

256 

257 """ 

258 Pretend ConvolutionDensityMatrices that are filled with random values. 

259 

260 The random values are generated using a fixed seed. 

261 

262 Parameters 

263 ---------- 

264 ksd 

265 KohnShamDecomposition object or file name. 

266 pulses 

267 Convolute the density matrices with these pulses. 

268 times 

269 Produce density matrices for these times. In units of as. 

270 derivative_order_s 

271 Density matrix derivatives of the following orders. 

272 ``0`` for plain density matrix and positive integers for derivatives. 

273 real 

274 Calculate the real part of density matrices. 

275 imag 

276 Calculate the imaginary part of density matrices. 

277 calc_size 

278 Size of the calculation communicator. 

279 """ 

280 

281 def __init__(self, 

282 ksd: KohnShamDecomposition | str, 

283 pulses: Collection[PerturbationLike], 

284 times: list[float] | NDArray[np.float64], 

285 derivative_order_s: list[int] = [0], 

286 real: bool = True, 

287 imag: bool = True, 

288 calc_size: int = 1): 

289 super().__init__(ksd=ksd, times=times, pulses=pulses, 

290 derivative_order_s=derivative_order_s, calc_size=calc_size, real=real, imag=imag) 

291 

292 imin, imax, amin, amax = self.ksd.ialims() 

293 

294 # Read density matrices corresponding to ksd ialims 

295 self._n1slice = slice(imin, imax + 1) 

296 self._n2slice = slice(amin, amax + 1) 

297 

298 self._runtime_verify_work_loop() 

299 

300 self._time_t = np.array(times) 

301 

302 def __str__(self) -> str: 

303 lines = ['Mock response'] 

304 

305 lines.append('') 

306 lines.append(f'Calculating response for {self.nt} times') 

307 lines.append(f' times: {format_times(self.times)}') 

308 

309 return '\n'.join(lines) 

310 

311 def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]: 

312 shape = (self._n1slice.stop - self._n1slice.start, self._n2slice.stop - self._n2slice.start) 

313 

314 for work in self.local_work_plan: 

315 matrices = dict() 

316 seeds = {0: 0, 1: 992292, 2: 1281934} 

317 for derivative in self.derivative_order_s: 

318 if self.calc_comm.rank > 0: 

319 matrices[derivative] = None 

320 continue 

321 rho_ia = np.zeros(shape, dtype=complex) 

322 seed = work.pulse.pulse.omega0 if isinstance(work.pulse, PulsePerturbation) else 0 

323 seed = seeds[derivative] + work.globalt + int(11403 * seed) 

324 

325 if 'Re' in self.reim: 

326 rng = np.random.default_rng(seed + 573929) 

327 rho_ia += rng.uniform(-1e-5, 1e-5, shape) 

328 if 'Re' in self.reim: 

329 rng = np.random.default_rng(seed + 156305) 

330 rho_ia += 1.0j * rng.uniform(-1e-5, 1e-5, shape) 

331 matrices[derivative] = rho_ia 

332 

333 dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm) 

334 

335 yield work, dm 

336 

337 

338class MockFrequencyDensityMatrices(FrequencyDensityMatrices): 

339 

340 """ 

341 Pretend FrequencyDensityMatrices that are filled with random values. 

342 

343 The random values are generated using a fixed seed 

344 

345 Parameters 

346 ---------- 

347 ksd 

348 KohnShamDecomposition object or file name 

349 frequencies 

350 Produce density matrices for these frequencies. In units of eV 

351 real 

352 Calculate the real part of density matrices 

353 imag 

354 Calculate the imaginary part of density matrices 

355 calc_size 

356 Size of the calculation communicator 

357 """ 

358 

359 def __init__(self, 

360 ksd: KohnShamDecomposition | str, 

361 frequencies: list[float] | NDArray[np.float64], 

362 real: bool = True, 

363 imag: bool = True, 

364 calc_size: int = 1): 

365 super().__init__(ksd=ksd, frequencies=frequencies, 

366 calc_size=calc_size, real=real, imag=imag) 

367 

368 imin, imax, amin, amax = self.ksd.ialims() 

369 

370 # Read density matrices corresponding to ksd ialims 

371 self._n1slice = slice(imin, imax + 1) 

372 self._n2slice = slice(amin, amax + 1) 

373 

374 self._runtime_verify_work_loop() 

375 

376 def __str__(self) -> str: 

377 lines = [' Mock response '] 

378 

379 lines.append('') 

380 lines.append(f'Calculating response for {len(self.frequencies)} frequencies') 

381 lines.append(f' frequencies: {format_frequencies(self.frequencies)}') 

382 

383 return '\n'.join(lines) 

384 

385 def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]: 

386 shape = (self._n1slice.stop - self._n1slice.start, self._n2slice.stop - self._n2slice.start) 

387 

388 for work in self.local_work_plan: 

389 seed = work.globalw + 1412 if work.reim == 'Re' else 0 

390 rng = np.random.default_rng(seed + 421420) 

391 rho_ia = np.zeros(shape, dtype=complex) 

392 rho_ia += rng.uniform(-1e-5, 1e-5, shape) 

393 rho_ia += 1.0j * rng.uniform(-1e-5, 1e-5, shape) 

394 

395 matrices = {0: rho_ia if self.calc_comm.rank == 0 else None} 

396 dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm) 

397 

398 yield work, dm 

399 

400 

401class MockResponse(BaseResponse): 

402 

403 """ Pretend response that gives pretend density matrices 

404 

405 Parameters 

406 ---------- 

407 ksd 

408 KohnShamDecomposition object or file name. 

409 perturbation 

410 Perturbation that was present during time propagation. 

411 calc_size 

412 Size of the calculation communicator. 

413 """ 

414 

415 def _get_time_density_matrices(self, 

416 times: list[float] | NDArray[np.float64], 

417 pulses: Collection[PerturbationLike], 

418 derivative_order_s: list[int] = [0], 

419 real: bool = True, 

420 imag: bool = True, 

421 log: Logger | None = None, 

422 ) -> ConvolutionDensityMatrices: 

423 density_matrices = MockConvolutionDensityMatrices( 

424 ksd=self.ksd, 

425 times=times, 

426 pulses=pulses, 

427 derivative_order_s=derivative_order_s, 

428 real=real, 

429 imag=imag) 

430 

431 return density_matrices 

432 

433 def _get_frequency_density_matrices(self, 

434 frequencies: list[float] | NDArray[np.float64], 

435 frequency_broadening: float = 0, 

436 real: bool = True, 

437 imag: bool = True, 

438 log: Logger | None = None, 

439 ) -> FrequencyDensityMatrices: 

440 density_matrices = MockFrequencyDensityMatrices( 

441 ksd=self.ksd, 

442 frequencies=frequencies, 

443 real=real, 

444 imag=imag, 

445 calc_size=self.calc_size) 

446 

447 return density_matrices