Coverage for rhodent/density_matrices/frequency.py: 98%
138 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3from typing import Generator
4from pathlib import Path
6import numpy as np
8from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition
9from gpaw.tddft.units import au_to_eV, eV_to_au
10from gpaw.mpi import world
12from .distributed.frequency import FourierTransformer
13from .density_matrix import DensityMatrix
14from .readers.numpy import FrequencyDensityMatrixReader
15from .base import BaseDensityMatrices, WorkMetadata
16from ..perturbation import create_perturbation, PerturbationLike, DeltaKick, NoPerturbation
17from ..utils import Logger, two_communicator_sizes
18from ..utils.memory import MemoryEstimate
19from ..typing import Array1D
22class FrequencyDensityMatrixMetadata(WorkMetadata):
24 """ Metadata to the density matrices.
26 Parameters
27 ----------
28 density_matrices
29 Parent of this object.
30 globalw
31 Frequency index.
32 localw
33 Frequency index on this rank.
34 globalr
35 Real/imaginary index.
36 localr
37 Real/imaginary index on this rank.
38 """
40 density_matrices: FrequencyDensityMatrices
41 globalw: int
42 localw: int
43 globalr: int
44 localr: int
46 def __new__(cls,
47 density_matrices: FrequencyDensityMatrices,
48 globalw: int,
49 globalr: int,
50 localw: int,
51 localr: int):
52 self = WorkMetadata.__new__(cls, density_matrices=density_matrices)
53 self.globalw = globalw
54 self.globalr = globalr
55 self.localw = localw
56 self.localr = localr
57 return self
59 @property
60 def global_indices(self):
61 return (self.globalw, self.globalr)
63 @property
64 def freq(self) -> float:
65 """ Frequency in units of eV. """
66 return self.density_matrices.frequencies[self.globalw]
68 @property
69 def reim(self) -> str:
70 """ Returns real/imaginary tag ``'Re'`` or ``'Im'``.
72 The tag corresponds to the Fourier transform of the real
73 or imaginary part of the density matrix.
74 """
75 return self.density_matrices.reim[self.globalr]
77 @property
78 def desc(self) -> str:
79 return f'{self.reim} @ Freq. {self.freq:.3f}eV'
81 @property
82 def real(self) -> bool:
83 return self.reim == 'Re'
85 @property
86 def imag(self) -> bool:
87 return not self.real
90class FrequencyDensityMatrices(BaseDensityMatrices[FrequencyDensityMatrixMetadata]):
92 """
93 Collection of density matrices in the Kohn-Sham basis in the frequency
94 domain, for different frequencies.
96 Parameters
97 ----------
98 ksd
99 KohnShamDecomposition object or file name.
100 frequencies
101 Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
102 frequency_broadening
103 Gaussian broadening width in units of eV. Default (0) is no broadening.
104 real
105 Calculate the Fourier transform of the real part of the density matrix.
106 imag
107 Calculate the Fourier transform of the imaginary part of the density matrix.
108 calc_size
109 Size of the calculation communicator.
110 """
112 def __init__(self,
113 ksd: KohnShamDecomposition | str,
114 frequencies: list[float] | Array1D[np.float64],
115 frequency_broadening: float = 0,
116 real: bool = True,
117 imag: bool = True,
118 calc_size: int = 1,
119 log: Logger | None = None):
120 self._freq_w = np.array(frequencies)
121 self._frequency_broadening = frequency_broadening
123 super().__init__(ksd=ksd,
124 real=real, imag=imag,
125 calc_size=calc_size, log=log)
127 @property
128 def frequencies(self) -> Array1D[np.float64]:
129 """ Frequencies in units of eV. """
130 return self._freq_w # type: ignore
132 @property
133 def frequency_broadening(self) -> float:
134 """ Gaussian broadening width for frequencies in units of eV. """
135 return self._frequency_broadening
137 def work_loop(self,
138 rank: int) -> Generator[FrequencyDensityMatrixMetadata | None, None, None]:
139 nw = len(self.frequencies)
140 nwperrank = (nw + self.loop_comm.size - 1) // self.loop_comm.size
142 for localw in range(nwperrank):
143 globalw = rank + localw * self.loop_comm.size
144 for r in range(len(self.reim)):
145 if globalw < nw:
146 yield FrequencyDensityMatrixMetadata(density_matrices=self, globalw=globalw,
147 localw=localw, globalr=r, localr=r)
148 else:
149 yield None
151 def write_to_disk(self,
152 frho_fmt: str):
153 """ Calculate the density matrices and save to disk.
155 Parameters
156 ----------
157 frho_fmt
158 Formatting string for the density matrices saved to disk.
160 The formatting string should be a plain string containing variable
161 placeholders within curly brackets ``{}``. It should not be confused with
162 a formatted string literal (f-string).
164 Example:
166 * frho_fmt = ``frho/w{freq:05.2f}-{reim}.npy``.
168 Accepts variables:
170 * ``{freq}`` - Frequency in units of eV.
171 * ``{reim}`` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary
172 part of density matrix.
173 """
174 nlocaltot = len(self.local_work_plan)
176 # Iterate over density matrices on all ranks
177 for ndone, (work, dm) in enumerate(self, 1):
178 avg = self.log.elapsed('read')/ndone
179 estrem = avg * (nlocaltot - ndone)
180 self.log(f'Obtained density matrix {ndone:4.0f}/{nlocaltot:4.0f} on this rank. '
181 f'Avg: {avg:10.3f}s, estimated remaining: {estrem:10.3f}s', who='Writer', flush=True)
182 fname_kw = dict(freq=work.freq, reim=work.reim)
183 fpath = Path(frho_fmt.format(**fname_kw))
184 rho_ia = dm.rho_ia
185 if self.calc_comm.rank == 0:
186 assert isinstance(rho_ia, np.ndarray)
187 fpath.parent.mkdir(parents=True, exist_ok=True)
188 if fpath.suffix == '.npz':
189 # Write numpy archive
190 np.savez(str(fpath), rho_ia=rho_ia)
191 else:
192 # Save numpy binary file
193 np.save(str(fpath), rho_ia)
194 self.log(f'Written {fpath}', who='Writer', flush=True)
195 world.barrier()
198class FrequencyDensityMatricesFromDisk(FrequencyDensityMatrices):
200 """
201 Collection of density matrices in the Kohn-Sham basis in the frequency
202 domain, for different frequencies. Read from disk.
204 Parameters
205 ----------
206 ksd
207 KohnShamDecomposition object or file name.
208 frho_fmt
209 Formatting string for the density matrices
210 in frequency space saved to disk.
212 The formatting string should be a plain string containing variable
213 placeholders within curly brackets ``{}``. It should not be confused with
214 a formatted string literal (f-string).
216 Example:
218 * frho_fmt = ``frho/w{freq:05.2f}-{reim}.npy``.
220 Accepts variables:
222 * ``{freq}`` - Frequency in units of eV.
223 * ``{reim}`` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary
224 part of density matrix.
225 perturbation
226 The perturbation which the density matrices are a response to.
227 frequencies
228 Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
229 real
230 Calculate the Fourier transform of the real part of the density matrix
231 imag
232 Calculate the Fourier transform of the imaginary part of the density matrix.
233 calc_size
234 Size of the calculation communicator.
235 """
237 def __init__(self,
238 ksd: KohnShamDecomposition | str,
239 frho_fmt: str,
240 perturbation: PerturbationLike,
241 frequencies: list[float] | Array1D[np.float64],
242 real: bool = True,
243 imag: bool = True,
244 calc_size: int = 1,
245 log: Logger | None = None):
246 reader = FrequencyDensityMatrixReader(frho_fmt=frho_fmt,
247 ksd=ksd,
248 filter_frequencies=frequencies,
249 real=real, imag=imag,
250 log=log)
251 self.reader = reader
252 super().__init__(ksd=reader.ksd, frequencies=reader.frequencies,
253 real=real, imag=imag,
254 calc_size=calc_size,
255 log=reader.log)
256 self.perturbation = create_perturbation(perturbation)
257 assert isinstance(self.perturbation, (DeltaKick, NoPerturbation))
259 def __str__(self) -> str:
260 return str(self.reader)
262 def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]:
263 for work in self.local_work_plan:
264 self.log.start('read')
265 rho = self.reader.read(work.freq, work.reim == 'Re')
266 if isinstance(self.perturbation, DeltaKick):
267 # Only delta kick supported at this point for normalization
268 rho /= self.perturbation.strength
269 matrices = {0: rho if self.calc_comm.rank == 0 else None}
270 dm = DensityMatrix(ksd=self.ksd, matrices=matrices)
272 yield work, dm
275class FrequencyDensityMatricesFromWaveFunctions(FrequencyDensityMatrices):
277 """
278 Collection of density matrices in the Kohn-Sham basis in the frequency
279 domain, for different frequencies. Obtained from the time-dependent wave functions file,
280 which is read from disk.
282 Parameters
283 ----------
284 ksd
285 KohnShamDecomposition object or file name.
286 wfs_fname
287 File name of the time-dependent wave functions file.
288 perturbation
289 The perturbation which the density matrices are a response to.
290 frequencies
291 Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
292 frequency_broadening
293 Gaussian broadening width in units of eV. Default (0) is no broadening.
294 real
295 Calculate the real part of density matrices.
296 imag
297 Calculate the imaginary part of density matrices.
298 calc_size
299 Size of the calculation communicator.
300 stridet
301 Skip this many steps when reading the time-dependent wave functions file.
302 """
304 def __init__(self,
305 ksd: KohnShamDecomposition | str,
306 wfs_fname: str,
307 perturbation: PerturbationLike,
308 frequencies: list[float] | Array1D[np.float64],
309 frequency_broadening: float = 0,
310 real: bool = True,
311 imag: bool = True,
312 log: Logger | None = None,
313 calc_size: int = 1,
314 stridet: int = 1):
315 _, calc_size = two_communicator_sizes(-1, calc_size)
316 # The calc_comm rank 0's are world ranks 0, with a spacing of calc_size
317 result_on_ranks = list(range(0, world.size, calc_size))
319 rho_nn_fft = FourierTransformer.from_parameters(
320 wfs_fname, ksd,
321 perturbation=perturbation,
322 yield_re=real,
323 yield_im=imag,
324 filter_frequencies=np.array(frequencies) * eV_to_au,
325 frequency_broadening=frequency_broadening * eV_to_au,
326 stridet=stridet,
327 result_on_ranks=result_on_ranks,
328 log=log)
329 self.rho_nn_fft = rho_nn_fft
330 super().__init__(ksd=rho_nn_fft.ksd, frequencies=rho_nn_fft.freq_w * au_to_eV,
331 frequency_broadening=frequency_broadening,
332 real=real, imag=imag,
333 calc_size=calc_size,
334 log=rho_nn_fft.log)
336 def __str__(self) -> str:
337 lines = []
338 lines.append('Response from time-dependent wave functions')
339 lines.append('')
340 lines += ['' + line for line in str(self.rho_nn_fft.rho_nn_reader.rho_wfs_reader).split('\n')]
341 lines.append('')
342 lines += ['' + line for line in str(self.rho_nn_fft.rho_nn_reader).split('\n')]
343 lines.append('')
344 lines += ['' + line for line in str(self.rho_nn_fft).split('\n')]
346 return '\n'.join(lines)
348 def get_memory_estimate(self) -> MemoryEstimate:
349 return self.rho_nn_fft.get_memory_estimate()
351 @property
352 def myw(self) -> list[int]:
353 """ List of indices corresponding to the frequency indices on held on this rank. """
354 return self.rho_nn_fft.my_work()
356 def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]:
357 parameters = self.rho_nn_fft.rho_nn_reader._parameters
358 flt = (slice(parameters.n1size), slice(parameters.n2size))
360 dist_buffer = self.rho_nn_fft.dist_buffer # Perform the redistribution
361 self.ksd.distribute(self.calc_comm)
363 for work in self.local_work_plan:
364 if self.calc_comm.rank == 0:
365 assert self.myw[work.localw] == work.globalw
367 if self.calc_comm.rank == 0:
368 # Buffer shape is i, a, frequencies
369 rho_ia = dist_buffer._get_data(work.reim == 'Re', 0)[flt + (work.localw, )]
370 matrices = {0: rho_ia if self.calc_comm.rank == 0 else None}
371 dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm)
373 yield work, dm
375 def parallel_prepare(self):
376 self.rho_nn_fft.dist_buffer # Perform the redistribution