Coverage for rhodent/calculators/base.py: 89%
263 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3from abc import ABC, abstractmethod
4from typing import Generator, Collection
6import numpy as np
7from numpy.typing import NDArray
9from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition
10from gpaw.mpi import world
12from ..typing import Communicator, Array2D, Array3D
13from ..response import BaseResponse
14from ..perturbation import create_perturbation, Perturbation, PerturbationLike
15from ..density_matrices.base import BaseDensityMatrices, WorkMetadata
16from ..density_matrices.time import ConvolutionDensityMatrices
17from ..density_matrices.frequency import FrequencyDensityMatrices
18from ..voronoi import VoronoiWeights, EmptyVoronoiWeights
19from ..typing import Array1D
20from ..utils import Logger, ResultKeys, Result, broaden_n2e, broaden_xn2e, broaden_ia2ou
21from ..utils.memory import HasMemoryEstimate, MemoryEstimate
24class BaseObservableCalculator(HasMemoryEstimate, ABC):
26 """ Object of this class compute observables.
28 Parameters
29 ----------
30 response
31 Response object.
32 voronoi
33 Voronoi weights object.
34 energies_occ
35 Energy grid in units of eV for occupied levels (holes).
36 energies_unocc
37 Energy grid in units of eV for unoccupied levels (electrons).
38 sigma
39 Gaussian broadening width for energy grid in units of eV.
40 times
41 Compute observables in the time domain, for these times (or as close to them as possible).
42 In units of as.
44 May not be used together with :attr:`frequencies` or :attr:`frequency_broadening`.
45 pulses
46 Compute observables in the time domain, in response to these pulses.
47 If none, then no pulse convolution is performed.
49 May not be used together with :attr:`frequencies` or :attr:`frequency_broadening`.
50 frequencies
51 Compute observables in the frequency domain, for these frequencies. In units of eV.
53 May not be used together with :attr:`times` or :attr:`pulses`.
54 frequency_broadening
55 Compute observables in the frequency domain, with Gaussian broadening of this width.
56 In units of eV.
58 May not be used together with :attr:`times` or :attr:`pulses`.
59 """
61 def __init__(self,
62 response: BaseResponse,
63 voronoi: VoronoiWeights | None = None,
64 *,
65 energies_occ: list[float] | Array1D[np.float64] | None = None,
66 energies_unocc: list[float] | Array1D[np.float64] | None = None,
67 sigma: float | None = None,
68 times: list[float] | Array1D[np.float64] | None = None,
69 pulses: Collection[PerturbationLike] | None = None,
70 frequencies: list[float] | Array1D[np.float64] | None = None,
71 frequency_broadening: float = 0,
72 ):
73 self._log = Logger()
74 self.log.startup_message()
75 world.barrier()
76 self._response = response
78 self._is_time_density_matrices: bool
79 if times is not None:
80 if frequencies is not None or frequency_broadening != 0:
81 raise ValueError('The parameters frequencies and frequency_broadening may not '
82 'be used together with times and pulses.')
83 # Time calculator
84 if pulses is None:
85 pulses = [None] # No perturbation
86 pulses = [create_perturbation(pulse) for pulse in pulses]
87 self._is_time_density_matrices = True
88 else:
89 if frequencies is None:
90 raise ValueError('One of the parameters times or frequencies must be given.')
91 if pulses is not None:
92 raise ValueError('The parameters frequencies and frequency_broadening may not '
93 'be used together with times and pulses.')
94 # Frequency calculator
95 self._is_time_density_matrices = False
97 derivatives, real, imag = self._need_derivatives_real_imag
99 density_matrices: BaseDensityMatrices
100 if self._is_time_density_matrices:
101 assert times is not None
102 assert pulses is not None
103 density_matrices = response._get_time_density_matrices(
104 times, pulses, derivatives, real, imag, self.log)
105 else:
106 assert pulses is None
107 assert frequencies is not None
108 density_matrices = response._get_frequency_density_matrices(
109 frequencies, frequency_broadening, real, imag, self.log)
111 self._density_matrices = density_matrices
112 self._voronoi: VoronoiWeights
113 if voronoi is None:
114 self._voronoi = EmptyVoronoiWeights()
115 else:
116 self._voronoi = voronoi
117 if energies_occ is None:
118 energies_occ = []
119 if energies_unocc is None:
120 energies_unocc = []
121 self._energies_occ = np.asarray(energies_occ)
122 self._energies_unocc = np.asarray(energies_unocc)
123 self._sigma = sigma
124 self._weight_In: Array2D[np.float64] | None = None
125 self._weight_Iii: Array3D[np.float64] | None = None
126 self._weight_Iaa: Array3D[np.float64] | None = None
127 world.barrier()
128 self.log(f'Set up calculator:\n'
129 f'{self}\n\n'
130 '==================================\n'
131 ' Procedure for obtaining response \n'
132 '==================================\n'
133 f'{self.density_matrices}\n\n'
134 '=================\n'
135 ' Memory estimate \n'
136 '=================\n'
137 f'{self.get_memory_estimate()}\n',
138 rank=0)
140 def __str__(self) -> str:
141 lines = [f' {self.__class__.__name__} ']
143 lines.append('response:')
144 lines += [' ' + line for line in str(self.response).split('\n')]
145 lines.append('')
147 lines.append('voronoi:')
148 if self.nproj == 0:
149 lines.append(' No Voronoi decomposition')
150 else:
151 lines += [' ' + line for line in str(self.voronoi).split('\n')]
152 lines.append('')
154 if len(self.energies_occ) == 0 or len(self.energies_unocc) == 0 or self.sigma is None:
155 lines.append('No energies for broadening')
156 else:
157 lines += [f'Energies for broadening (sigma = {self.sigma:.1f} eV)',
158 f' Occupied: {len(self.energies_occ)} from '
159 f'{self.energies_occ[0]:.1f} to {self.energies_occ[-1]:.1f} eV',
160 f' Unoccupied: {len(self.energies_unocc)} from '
161 f'{self.energies_unocc[0]:.1f} to {self.energies_unocc[-1]:.1f} eV',
162 ]
164 # Make a cute frame
165 maxlen = max(len(line) for line in lines)
166 lines[0] = '{s:=^{n}}'.format(s=f' {lines[0]} ', n=maxlen)
167 lines.append(maxlen * '-')
169 return '\n'.join(lines)
171 def get_memory_estimate(self) -> MemoryEstimate:
172 memory_estimate = MemoryEstimate()
173 memory_estimate.add_child('Reading response', self.density_matrices.get_memory_estimate())
174 for key, shape in self._voronoi_shapes.items():
175 memory_estimate.add_key(f'Voronoi weights {key}', shape, float,
176 on_num_ranks=self.loop_comm.size)
178 return memory_estimate
180 @property
181 def response(self) -> BaseResponse:
182 """ Response object """
183 return self._response
185 @property
186 def density_matrices(self) -> BaseDensityMatrices:
187 """ Object that gives the density matrix in the time or freqency domain. """
188 return self._density_matrices
190 @property
191 def nproj(self) -> int:
192 """ Number of projections in the Voronoi weights object """
193 return self.voronoi.nproj
195 @property
196 def voronoi(self) -> VoronoiWeights:
197 """ Voronoi weights object """
198 return self._voronoi
200 @property
201 def energies_occ(self) -> NDArray[np.float64]:
202 """ Energy grid (in units of eV) for occupied levels (hot holes). """
203 return self._energies_occ
205 @property
206 def energies_unocc(self) -> NDArray[np.float64]:
207 """ Energy grid (in units of eV) for unoccupied levels (hot electrons). """
208 return self._energies_unocc
210 @property
211 def sigma(self) -> float | None:
212 """ Gaussian broadening width in units of eV. """
213 return self._sigma
215 @property
216 def ksd(self) -> KohnShamDecomposition:
217 """ Kohn-Sham decomposition object """
218 return self.density_matrices.ksd
220 @property
221 def log(self) -> Logger:
222 """ Logger """
223 return self._log
225 def log_parallel(self, *args, **kwargs) -> Logger:
226 """ Log message with communicator information. """
227 return self._log(*args, **kwargs, comm=self.loop_comm, who='Calculator')
229 @property
230 def frequencies(self) -> Array1D[np.float64]:
231 """ Frequencies (in units of eV) at which the density matrices are evaluated.
233 Only valid when the density matrices object is defined in the frequency domain. """
234 assert isinstance(self.density_matrices, FrequencyDensityMatrices)
235 return self.density_matrices.frequencies
237 @property
238 def frequency_broadening(self) -> float:
239 """ Value of frequency broadening in units of eV.
241 Only valid when the density matrices object is defined in the frequency domain. """
242 assert isinstance(self.density_matrices, FrequencyDensityMatrices)
243 return self.density_matrices.frequency_broadening
245 @property
246 def times(self) -> Array1D[np.float64]:
247 """ Times (in units of as) at which the density matrices are evaluated.
249 Only valid when the density matrices object is defined in the time domain. """
250 assert isinstance(self.density_matrices, ConvolutionDensityMatrices)
251 return self.density_matrices.times
253 @property
254 def pulses(self) -> list[Perturbation]:
255 """ List of pulses which the density matrices are responses to.
257 Only valid when the density matrices object is a convolution with pulses. """
258 assert isinstance(self.density_matrices, ConvolutionDensityMatrices)
259 return self.density_matrices.pulses
261 @property
262 def calc_comm(self) -> Communicator:
263 """ Calculation communicator.
265 Each rank of this communicator calculates the observables corresponding to
266 a part (in electron-hole space) of the density matrices. """
267 return self.density_matrices.calc_comm
269 @property
270 def loop_comm(self) -> Communicator:
271 """ Loop communicator.
273 Each rank of this communicator calculates the density matrices corresponding to
274 different times, frequencies or after convolution with a different pulse. """
275 return self.density_matrices.loop_comm
277 @property
278 def eig_n(self) -> Array1D[np.float64]:
279 """ Eigenvalues (in units of eV) relative to Fermi level in the full ground state KS basis. """
280 eig_n, _ = self.ksd.get_eig_n(zero_fermilevel=True)
281 return eig_n # type: ignore
283 @property
284 def eig_i(self) -> Array1D[np.float64]:
285 """ Eigenvalues (in units of eV) relative to Fermi level of occupied states (holes). """
286 return self.eig_n[self.flti] # type: ignore
288 @property
289 def eig_a(self) -> Array1D[np.float64]:
290 """ Eigenvalues (in units of eV) relative to Fermi level of unoccupied states (electrons). """
291 return self.eig_n[self.flta] # type: ignore
293 @property
294 def flti(self) -> slice:
295 """ Slice for extracting indices corresponding to occupied states. """
296 imin, imax, _, _ = self.ksd.ialims()
297 return slice(imin, imax+1)
299 @property
300 def flta(self) -> slice:
301 """ Slice for extracting indices corresponding to unoccupied states. """
302 _, _, amin, amax = self.ksd.ialims()
303 return slice(amin, amax+1)
305 @property
306 def _need_derivatives_real_imag(self) -> tuple[list[int], bool, bool]:
307 """ Derivatives needed by this calculator, and
308 whether real and imaginary parts are needed
309 """
310 raise NotImplementedError
312 @property
313 def _voronoi_shapes(self) -> dict[str, tuple[int, ...]]:
314 """ List of shapes for the Voronoi weights stored on this calculator. """
315 return {}
317 def _read_weights_diagonal(self) -> None:
318 """ Read the diagonal weights from the voronoi object and store them in memory. """
319 nI = self.voronoi.nproj
320 if nI == 0:
321 return
322 Nn = self.voronoi.nn
324 if self.calc_comm.rank == 0:
325 weight_In = np.empty((nI, Nn), dtype=float)
326 else:
327 weight_In = None
328 for iI, weight_nn in enumerate(self.voronoi):
329 if self.voronoi.comm.rank == 0:
330 assert weight_In is not None
331 assert weight_nn is not None
332 weight_In[iI, ...] = weight_nn.diagonal()
333 else:
334 assert weight_nn is None
336 if self.calc_comm.rank == 0:
337 # Broadcast to all calc_comm rank 0's
338 self.loop_comm.broadcast(weight_In, 0)
340 self._weight_In = weight_In
342 def _read_weights_eh(self) -> None:
343 """ Read the electron-hole weights from the voronoi object and store them in memory. """
344 nI = self.voronoi.nproj
345 if nI == 0:
346 return
348 if self.calc_comm.rank == 0:
349 weight_Iii = np.empty((nI, len(self.eig_i), len(self.eig_i)), dtype=float)
350 weight_Iaa = np.empty((nI, len(self.eig_a), len(self.eig_a)), dtype=float)
351 else:
352 weight_Iii = None
353 weight_Iaa = None
354 for iI, weight_nn in enumerate(self.voronoi):
355 if self.voronoi.comm.rank == 0:
356 assert weight_nn is not None
357 assert weight_Iii is not None
358 assert weight_Iaa is not None
359 weight_Iii[iI, ...] = weight_nn[self.flti, self.flti]
360 weight_Iaa[iI, ...] = weight_nn[self.flta, self.flta]
361 else:
362 assert weight_nn is None
364 if self.calc_comm.rank == 0:
365 # Broadcast to all calc_comm rank 0's
366 self.loop_comm.broadcast(weight_Iii, 0)
367 self.loop_comm.broadcast(weight_Iaa, 0)
369 self._weight_Iii = weight_Iii
370 self._weight_Iaa = weight_Iaa
372 @property
373 def _iterate_weights_diagonal(self) -> Generator[NDArray[np.float64], None, None]:
374 """ Iterate over the diagonal weights.
376 Yields
377 ------
378 The diagonal of the Voronoi weights, one projection at a time """
379 assert self.calc_comm.rank == 0
380 if self.nproj == 0:
381 return
383 if self._weight_In is None:
384 self._read_weights_diagonal()
385 assert self._weight_In is not None
387 for weight_n in self._weight_In:
388 yield weight_n
390 @property
391 def _iterate_weights_eh(self) -> Generator[tuple[NDArray[np.float64], NDArray[np.float64]], None, None]:
392 """ Iterate over the electron-hole part of the weights.
394 Yields
395 ------
396 The electron-hole part of the Voronoi weights, one projection at a time """
397 assert self.calc_comm.rank == 0
398 if self.nproj == 0:
399 return
401 if self._weight_Iaa is None or self._weight_Iii is None:
402 self._read_weights_eh()
403 assert self._weight_Iii is not None
404 assert self._weight_Iaa is not None
406 for weight_ii, weight_aa in zip(self._weight_Iii, self._weight_Iaa):
407 yield weight_ii, weight_aa
409 @abstractmethod
410 def get_result_keys(self) -> ResultKeys:
411 """ Get the keys that each result will contain, and dimensions thereof.
413 Returns
414 -------
415 Object representing the data that will be present in the result objects. """
416 raise NotImplementedError
418 @abstractmethod
419 def icalculate(self) -> Generator[tuple[WorkMetadata, Result], None, None]:
420 """ Iteratively calculate results.
422 Yields
423 ------
424 Tuple (work, result) on the root rank of the calculation communicator:
426 work
427 An object representing the metadata (time, frequency or pulse) for the work done
428 result
429 Object containg the calculation results for this time, frequency or pulse
431 Yields nothing on non-root ranks of the calculation communicator.
432 """
433 raise NotImplementedError
435 def icalculate_gather_on_root(self, **kwargs) -> Generator[
436 tuple[WorkMetadata, Result], None, None]:
437 """ Iteratively calculate results and gather to the root rank.
439 Yields
440 ------
441 Tuple (work, result) on the root rank of the both calculation and loop communicators:
443 work
444 An object representing the metadata (time, frequency or pulse) for the work done
445 result
446 Object containg the calculation results for this time, frequency or pulse
448 Yields nothing on non-root ranks of the communicators.
449 """
450 resultkeys = self.get_result_keys(**kwargs)
451 gen = iter(self.icalculate(**kwargs))
453 # Loop over the work to be done, and the ranks that are supposed to do it
454 for rank, work in self.density_matrices.global_work_loop_with_idle():
455 if work is None:
456 # Rank rank will not do any work at this point
457 continue
459 if self.calc_comm.rank > 0:
460 continue
462 if self.loop_comm.rank == 0 and rank == 0:
463 # This is the root rank, and the root rank should yield its own work
464 mywork, result = next(gen)
465 assert work.global_indices == mywork.global_indices, f'{work.desc} != {mywork.desc}'
466 yield mywork, result
467 # self.log.start('communicate')
468 elif self.loop_comm.rank == 0:
469 # This is the root rank, and the root rank should receive the work
470 # done by rank rank, and yield that
471 result.inplace_receive(resultkeys, rank, comm=self.loop_comm)
472 yield work, result
473 # self.log(f'Communicated for {self.log.elapsed("communicate"):.2f}s', flush=True)
474 elif self.loop_comm.rank == rank:
475 # This is not the root rank, but this rank should send its data to root
476 _, result = next(gen)
477 result.send(resultkeys, 0, comm=self.loop_comm)
479 _exhausted = object()
480 rem = next(gen, _exhausted)
481 assert rem is _exhausted, rem
483 def write_response_to_disk(self,
484 fmt: str):
485 """ Calculate the response needed for this calculator and write the response to disk.
487 The necessary derivatives and real and imaginary parts of the density matrices
488 will be written. Can be used both in the time and frequency domains.
490 Parameters
491 ----------
492 fmt
493 Formatting string for the density matrices saved to disk.
495 The formatting string should be a plain string containing variable
496 placeholders within curly brackets `{}`. It should not be confused with
497 a formatted string literal (f-string).
499 Examples:
501 * frho_fmt = `'frho/w{freq:05.2f}-{reim}.npy'`.
502 * pulserho_fmt = `pulserho/t{time:09.1f}{tag}.npy`.
504 Accepts variables in the time domain:
506 * `{time}` - Time in units of as.
507 * `{tag}` - Derivative tag, `''`, `'-Iomega'`, or `'-omega2'`.
508 * `{pulsefreq}` - Pulse frequency in units of eV.
509 * `{pulsefwhm}` - Pulse FWHM in units of fs.
511 Accepts variables in the frequency domain:
513 * `{freq}` - Frequency in units of eV.
514 * `{reim}` - `'Re'` or `'Im'` for Fourier transform of real/imaginary
515 part of density matrix.
516 """
517 self._density_matrices.write_to_disk(fmt)
519 def broaden_occ(self,
520 M_i: NDArray[np.float64]) -> NDArray[np.float64]:
521 """ Broaden array corresponding to occupied levels with Gaussians of width sigma
523 Parameters
524 ----------
525 M_i
526 Array to broaden
528 Returns
529 -------
530 Broadened array
531 """
532 assert self.sigma is not None
534 return broaden_n2e(M_i, self.eig_i, self.energies_occ, self.sigma)
536 def broaden_unocc(self,
537 M_a: NDArray[np.float64]) -> NDArray[np.float64]:
538 """ Broaden array corresponding to unoccupied levels with Gaussians of width sigma
540 Parameters
541 ----------
542 M_a
543 Array to broaden
545 Returns
546 -------
547 Broadened array
548 """
549 assert self.sigma is not None
551 return broaden_n2e(M_a, self.eig_a, self.energies_unocc, self.sigma)
553 def broaden_xi2o(self,
554 M_xi: NDArray[np.float64]) -> NDArray[np.float64]:
555 """ Broaden array corresponding to occupied levels with Gaussians of width sigma
557 Parameters
558 ----------
559 M_xa
560 Array to broaden. The last dimension should correspond to the occupied levels
562 Returns
563 -------
564 Broadened array
565 """
566 assert self.sigma is not None
568 return broaden_xn2e(M_xi, self.eig_i, self.energies_occ, self.sigma)
570 def broaden_xi2u(self,
571 M_xa: NDArray[np.float64]) -> NDArray[np.float64]:
572 """ Broaden array corresponding to unoccupied levels with Gaussians of width sigma
574 Parameters
575 ----------
576 M_xa
577 Array to broaden. The last dimension should correspond to the unoccupied levels
579 Returns
580 -------
581 Broadened array
582 """
583 assert self.sigma is not None
585 return broaden_xn2e(M_xa, self.eig_a, self.energies_unocc, self.sigma)
587 def broaden_ia2ou(self,
588 M_ia: NDArray[np.float64]) -> NDArray[np.float64]:
589 """ Broaden matrix in electron-hole basis with Gaussians of width sigma.
591 Parameters
592 ----------
593 M_ia
594 Matrix to broaden. The first dimension should correspond to occupied levels
595 and the second to unoccupied levels.
597 Returns
598 -------
599 Broadened array
600 """
601 assert self.sigma is not None
603 return broaden_ia2ou(M_ia, self.eig_i, self.eig_a,
604 self.energies_occ, self.energies_unocc, self.sigma)