Coverage for tests/mock.py: 79%
193 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3import numpy as np
4from numpy.typing import NDArray
5from typing import Collection, Generator, Iterator
7from gpaw.mpi import world
8from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition
10from rhodent.density_matrices.buffer import DensityMatrixBuffer
11from rhodent.density_matrices.density_matrix import DensityMatrix
12from rhodent.density_matrices.readers.gpaw import KohnShamRhoWfsReader, LCAORhoWfsReader
13from rhodent.density_matrices.frequency import FrequencyDensityMatrices
14from rhodent.density_matrices.time import ConvolutionDensityMatrices, ConvolutionDensityMatrixMetadata
15from rhodent.perturbation import PerturbationLike, PulsePerturbation
16from rhodent.response import BaseResponse
17from rhodent.utils import Logger, add_fake_kpts
18from rhodent.utils.logging import format_frequencies, format_times
19from rhodent.voronoi import AtomProjectionsType, VoronoiWeights
22class MockVoronoiWeights(VoronoiWeights):
24 """ Read Voronoi weights from ulm file.
26 Parameters
27 ----------
28 nn
29 Number of states
30 atom_projections
31 List of projections on atoms
32 comm
33 GPAW MPI communicator object. Defaults to world
34 """
36 def __init__(self,
37 nn: int,
38 atom_projections: AtomProjectionsType,
39 comm=None):
40 if comm is None:
41 comm = world
43 self._log = Logger()
44 self._comm = comm
45 self._atom_projections = atom_projections
46 self._nn = nn
48 @property
49 def atom_projections(self) -> AtomProjectionsType:
50 return self._atom_projections
52 @property
53 def nn(self) -> int:
54 return self._nn
56 def __iter__(self) -> Iterator[NDArray[np.float64] | None]:
57 shape = (self.nn, self.nn)
58 for proj_atoms in self.atom_projections:
59 if self.comm.rank == 0:
60 weight_nn = np.zeros(shape, float)
61 for a in proj_atoms:
62 rng = np.random.default_rng(1423 + 13 * a)
63 weight_nn += rng.uniform(-1e-5, 1e-5, shape)
64 else:
65 weight_nn = None
67 yield weight_nn
69 @property
70 def saved_fields(self):
71 return {}
74class MockLCAOWfsReader(LCAORhoWfsReader):
76 def __init__(self):
77 self._striden = 0
78 self._rho0_skMM = 0
81class MockKohnShamRhoWfsReader(KohnShamRhoWfsReader):
82 """ Pretend reader of Kohn-Sham density matrices.
84 Yield density matrices time by time.
86 Parameters
87 ----------
88 ksd
89 KohnShamDecomposition object or file name to the ksd file.
90 comm
91 MPI communicator.
92 yield_re
93 Whether to yield the real part of wave functions/density matrices.
94 yield_im
95 Whether to yield the imaginary part of wave functions/density matrices.
96 filter_times
97 A list of times to generate in atomic units.
98 log
99 Logger object.
100 """
101 def __init__(self,
102 ksd: str | KohnShamDecomposition,
103 comm=world,
104 yield_re: bool = True,
105 yield_im: bool = True,
106 filter_times: list[float] | NDArray[np.float64] | None = None,
107 log: Logger | None = None):
108 if comm is None:
109 comm = world
110 self._comm = comm
112 # Set up ksd
113 if isinstance(ksd, KohnShamDecomposition):
114 self._ksd = ksd
115 else:
116 self._ksd = KohnShamDecomposition(filename=ksd)
117 add_fake_kpts(self._ksd)
119 if log is None:
120 log = Logger()
121 self._log = log
122 self._time_t = np.array(filter_times)
123 self._flt_t = slice(None)
124 self._yield_re = yield_re
125 self._yield_im = yield_im
127 self._C0S_sknM: NDArray[np.float64] | None = None
128 self._rho0_sknn: NDArray[np.float64] | None = None
129 self.lcao_rho_reader = MockLCAOWfsReader()
131 def iread(self,
132 s: int,
133 k: int,
134 n1: slice,
135 n2: slice) -> Generator[DensityMatrixBuffer, None, None]:
136 """ Read the density matrices time by time.
138 Parameters
139 ----------
140 s, k, n1, n2
141 Read these indices.
142 """
143 dm_buffer = DensityMatrixBuffer(self.nnshape(s, k, n1, n2), (), np.float64)
144 if self.yield_re:
145 dm_buffer.zeros(True, 0)
146 if self.yield_im:
147 dm_buffer.zeros(False, 0)
148 self.C0S_sknM # Read this on all ranks
149 nn = self.C0S_sknM.shape[3]
150 full_nnshape = (nn, nn)
152 for globalt in self.work_loop(self.comm.rank):
153 if globalt is None:
154 continue
155 self.log.start('read')
156 rngseed = globalt + 832 * s + 42140 * k
158 if self.yield_re:
159 rng = np.random.default_rng(32636 + rngseed)
160 Rerho_x = rng.uniform(-1e-5, 1e-5, full_nnshape)[n1, n2]
161 dm_buffer.safe_fill(True, 0, Rerho_x)
162 if self.yield_im:
163 rng = np.random.default_rng(94234 + rngseed)
164 Rerho_x = rng.uniform(-1e-5, 1e-5, full_nnshape)[n1, n2]
165 dm_buffer.safe_fill(False, 0, Rerho_x)
167 yield dm_buffer
170class MockTimeDensityMatrices(ConvolutionDensityMatrices):
172 """
173 Pretend ConvolutionDensityMatrices that are filled with random values.
175 The random values are generated using a fixed seed.
177 Parameters
178 ----------
179 ksd
180 KohnShamDecomposition object or file name.
181 times
182 Produce density matrices for these times. In units of as.
183 real
184 Calculate the real part of density matrices.
185 imag
186 Calculate the imaginary part of density matrices.
187 calc_size
188 Size of the calculation communicator.
189 """
191 def __init__(self,
192 ksd: KohnShamDecomposition | str,
193 times: list[float] | NDArray[np.float64],
194 real: bool = True,
195 imag: bool = True,
196 calc_size: int = 1):
197 super().__init__(ksd=ksd, times=times, pulses=[None], calc_size=1, real=real, imag=imag)
199 imin, imax, amin, amax = self.ksd.ialims()
201 # Read density matrices corresponding to ksd ialims
202 self._n1slice = slice(imin, imax + 1)
203 self._n2slice = slice(amin, amax + 1)
205 self._runtime_verify_work_loop()
207 self._time_t = np.array(times)
209 def __str__(self) -> str:
210 lines = ['Mock response']
212 lines.append('')
213 lines.append(f'Calculating response for {self.nt} times and {len(self.pulses)} pulses')
214 lines.append(f' times: {format_times(self.times)}')
216 return '\n'.join(lines)
218 def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]:
219 shape = (self._n1slice.stop - self._n1slice.start, self._n2slice.stop - self._n2slice.start)
220 assert self.calc_comm.size == 1 # TODO
222 for work in self.work_loop(self.loop_comm.rank):
223 if work is None:
224 # Nothing more to do
225 return
227 rho_ia = np.zeros(shape, dtype=complex)
229 if 'Re' in self.reim:
230 rng = np.random.default_rng(248203 + work.globalt)
231 rho_ia += rng.uniform(-1e-5, 1e-5, shape)
232 if 'Im' in self.reim:
233 rng = np.random.default_rng(614203 + work.globalt)
234 rho_ia += 1.0j * rng.uniform(-1e-5, 1e-5, shape)
236 matrices = {0: rho_ia}
237 dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm)
239 yield work, dm
241 def work_loop(self,
242 rank: int) -> Generator[ConvolutionDensityMatrixMetadata | None, None, None]:
243 nt = self.nt
244 ntperrank = (nt + self.loop_comm.size - 1) // self.loop_comm.size
246 for localt in range(ntperrank):
247 globalt = rank + localt * self.loop_comm.size
248 if globalt < nt:
249 yield ConvolutionDensityMatrixMetadata(density_matrices=self, globalt=globalt, localt=localt,
250 globalp=0, localp=0)
251 else:
252 yield None
255class MockConvolutionDensityMatrices(ConvolutionDensityMatrices):
257 """
258 Pretend ConvolutionDensityMatrices that are filled with random values.
260 The random values are generated using a fixed seed.
262 Parameters
263 ----------
264 ksd
265 KohnShamDecomposition object or file name.
266 pulses
267 Convolute the density matrices with these pulses.
268 times
269 Produce density matrices for these times. In units of as.
270 derivative_order_s
271 Density matrix derivatives of the following orders.
272 ``0`` for plain density matrix and positive integers for derivatives.
273 real
274 Calculate the real part of density matrices.
275 imag
276 Calculate the imaginary part of density matrices.
277 calc_size
278 Size of the calculation communicator.
279 """
281 def __init__(self,
282 ksd: KohnShamDecomposition | str,
283 pulses: Collection[PerturbationLike],
284 times: list[float] | NDArray[np.float64],
285 derivative_order_s: list[int] = [0],
286 real: bool = True,
287 imag: bool = True,
288 calc_size: int = 1):
289 super().__init__(ksd=ksd, times=times, pulses=pulses,
290 derivative_order_s=derivative_order_s, calc_size=calc_size, real=real, imag=imag)
292 imin, imax, amin, amax = self.ksd.ialims()
294 # Read density matrices corresponding to ksd ialims
295 self._n1slice = slice(imin, imax + 1)
296 self._n2slice = slice(amin, amax + 1)
298 self._runtime_verify_work_loop()
300 self._time_t = np.array(times)
302 def __str__(self) -> str:
303 lines = ['Mock response']
305 lines.append('')
306 lines.append(f'Calculating response for {self.nt} times')
307 lines.append(f' times: {format_times(self.times)}')
309 return '\n'.join(lines)
311 def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]:
312 shape = (self._n1slice.stop - self._n1slice.start, self._n2slice.stop - self._n2slice.start)
314 for work in self.local_work_plan:
315 matrices = dict()
316 seeds = {0: 0, 1: 992292, 2: 1281934}
317 for derivative in self.derivative_order_s:
318 if self.calc_comm.rank > 0:
319 matrices[derivative] = None
320 continue
321 rho_ia = np.zeros(shape, dtype=complex)
322 seed = work.pulse.pulse.omega0 if isinstance(work.pulse, PulsePerturbation) else 0
323 seed = seeds[derivative] + work.globalt + int(11403 * seed)
325 if 'Re' in self.reim:
326 rng = np.random.default_rng(seed + 573929)
327 rho_ia += rng.uniform(-1e-5, 1e-5, shape)
328 if 'Re' in self.reim:
329 rng = np.random.default_rng(seed + 156305)
330 rho_ia += 1.0j * rng.uniform(-1e-5, 1e-5, shape)
331 matrices[derivative] = rho_ia
333 dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm)
335 yield work, dm
338class MockFrequencyDensityMatrices(FrequencyDensityMatrices):
340 """
341 Pretend FrequencyDensityMatrices that are filled with random values.
343 The random values are generated using a fixed seed
345 Parameters
346 ----------
347 ksd
348 KohnShamDecomposition object or file name
349 frequencies
350 Produce density matrices for these frequencies. In units of eV
351 real
352 Calculate the real part of density matrices
353 imag
354 Calculate the imaginary part of density matrices
355 calc_size
356 Size of the calculation communicator
357 """
359 def __init__(self,
360 ksd: KohnShamDecomposition | str,
361 frequencies: list[float] | NDArray[np.float64],
362 real: bool = True,
363 imag: bool = True,
364 calc_size: int = 1):
365 super().__init__(ksd=ksd, frequencies=frequencies,
366 calc_size=calc_size, real=real, imag=imag)
368 imin, imax, amin, amax = self.ksd.ialims()
370 # Read density matrices corresponding to ksd ialims
371 self._n1slice = slice(imin, imax + 1)
372 self._n2slice = slice(amin, amax + 1)
374 self._runtime_verify_work_loop()
376 def __str__(self) -> str:
377 lines = [' Mock response ']
379 lines.append('')
380 lines.append(f'Calculating response for {len(self.frequencies)} frequencies')
381 lines.append(f' frequencies: {format_frequencies(self.frequencies)}')
383 return '\n'.join(lines)
385 def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]:
386 shape = (self._n1slice.stop - self._n1slice.start, self._n2slice.stop - self._n2slice.start)
388 for work in self.local_work_plan:
389 seed = work.globalw + 1412 if work.reim == 'Re' else 0
390 rng = np.random.default_rng(seed + 421420)
391 rho_ia = np.zeros(shape, dtype=complex)
392 rho_ia += rng.uniform(-1e-5, 1e-5, shape)
393 rho_ia += 1.0j * rng.uniform(-1e-5, 1e-5, shape)
395 matrices = {0: rho_ia if self.calc_comm.rank == 0 else None}
396 dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm)
398 yield work, dm
401class MockResponse(BaseResponse):
403 """ Pretend response that gives pretend density matrices
405 Parameters
406 ----------
407 ksd
408 KohnShamDecomposition object or file name.
409 perturbation
410 Perturbation that was present during time propagation.
411 calc_size
412 Size of the calculation communicator.
413 """
415 def _get_time_density_matrices(self,
416 times: list[float] | NDArray[np.float64],
417 pulses: Collection[PerturbationLike],
418 derivative_order_s: list[int] = [0],
419 real: bool = True,
420 imag: bool = True,
421 log: Logger | None = None,
422 ) -> ConvolutionDensityMatrices:
423 density_matrices = MockConvolutionDensityMatrices(
424 ksd=self.ksd,
425 times=times,
426 pulses=pulses,
427 derivative_order_s=derivative_order_s,
428 real=real,
429 imag=imag)
431 return density_matrices
433 def _get_frequency_density_matrices(self,
434 frequencies: list[float] | NDArray[np.float64],
435 frequency_broadening: float = 0,
436 real: bool = True,
437 imag: bool = True,
438 log: Logger | None = None,
439 ) -> FrequencyDensityMatrices:
440 density_matrices = MockFrequencyDensityMatrices(
441 ksd=self.ksd,
442 frequencies=frequencies,
443 real=real,
444 imag=imag,
445 calc_size=self.calc_size)
447 return density_matrices