from __future__ import annotations
from typing import Generator
from pathlib import Path
import numpy as np
from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition
from gpaw.tddft.units import au_to_eV, eV_to_au
from gpaw.mpi import world
from .distributed.frequency import FourierTransformer
from .density_matrix import DensityMatrix
from .readers.numpy import FrequencyDensityMatrixReader
from .base import BaseDensityMatrices, WorkMetadata
from ..perturbation import create_perturbation, PerturbationLike, DeltaKick, NoPerturbation
from ..utils import Logger, two_communicator_sizes
from ..utils.memory import MemoryEstimate
from ..typing import Array1D
class FrequencyDensityMatrices(BaseDensityMatrices[FrequencyDensityMatrixMetadata]):
"""
Collection of density matrices in the Kohn-Sham basis in the frequency
domain, for different frequencies.
Parameters
----------
ksd
KohnShamDecomposition object or file name.
frequencies
Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
frequency_broadening
Gaussian broadening width in units of eV. Default (0) is no broadening.
real
Calculate the Fourier transform of the real part of the density matrix.
imag
Calculate the Fourier transform of the imaginary part of the density matrix.
calc_size
Size of the calculation communicator.
"""
def __init__(self,
ksd: KohnShamDecomposition | str,
frequencies: list[float] | Array1D[np.float64],
frequency_broadening: float = 0,
real: bool = True,
imag: bool = True,
calc_size: int = 1,
log: Logger | None = None):
self._freq_w = np.array(frequencies)
self._frequency_broadening = frequency_broadening
super().__init__(ksd=ksd,
real=real, imag=imag,
calc_size=calc_size, log=log)
@property
def frequencies(self) -> Array1D[np.float64]:
""" Frequencies in units of eV. """
return self._freq_w # type: ignore
@property
def frequency_broadening(self) -> float:
""" Gaussian broadening width for frequencies in units of eV. """
return self._frequency_broadening
def work_loop(self,
rank: int) -> Generator[FrequencyDensityMatrixMetadata | None, None, None]:
nw = len(self.frequencies)
nwperrank = (nw + self.loop_comm.size - 1) // self.loop_comm.size
for localw in range(nwperrank):
globalw = rank + localw * self.loop_comm.size
for r in range(len(self.reim)):
if globalw < nw:
yield FrequencyDensityMatrixMetadata(density_matrices=self, globalw=globalw,
localw=localw, globalr=r, localr=r)
else:
yield None
def write_to_disk(self,
frho_fmt: str):
""" Calculate the density matrices and save to disk.
Parameters
----------
frho_fmt
Formatting string for the density matrices saved to disk.
The formatting string should be a plain string containing variable
placeholders within curly brackets ``{}``. It should not be confused with
a formatted string literal (f-string).
Example:
* frho_fmt = ``frho/w{freq:05.2f}-{reim}.npy``.
Accepts variables:
* ``{freq}`` - Frequency in units of eV.
* ``{reim}`` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary
part of density matrix.
"""
nlocaltot = len(self.local_work_plan)
# Iterate over density matrices on all ranks
for ndone, (work, dm) in enumerate(self, 1):
avg = self.log.elapsed('read')/ndone
estrem = avg * (nlocaltot - ndone)
self.log(f'Obtained density matrix {ndone:4.0f}/{nlocaltot:4.0f} on this rank. '
f'Avg: {avg:10.3f}s, estimated remaining: {estrem:10.3f}s', who='Writer', flush=True)
fname_kw = dict(freq=work.freq, reim=work.reim)
fpath = Path(frho_fmt.format(**fname_kw))
rho_ia = dm.rho_ia
if self.calc_comm.rank == 0:
assert isinstance(rho_ia, np.ndarray)
fpath.parent.mkdir(parents=True, exist_ok=True)
if fpath.suffix == '.npz':
# Write numpy archive
np.savez(str(fpath), rho_ia=rho_ia)
else:
# Save numpy binary file
np.save(str(fpath), rho_ia)
self.log(f'Written {fpath}', who='Writer', flush=True)
world.barrier()
[docs]
class FrequencyDensityMatricesFromDisk(FrequencyDensityMatrices):
"""
Collection of density matrices in the Kohn-Sham basis in the frequency
domain, for different frequencies. Read from disk.
Parameters
----------
ksd
KohnShamDecomposition object or file name.
frho_fmt
Formatting string for the density matrices
in frequency space saved to disk.
The formatting string should be a plain string containing variable
placeholders within curly brackets ``{}``. It should not be confused with
a formatted string literal (f-string).
Example:
* frho_fmt = ``frho/w{freq:05.2f}-{reim}.npy``.
Accepts variables:
* ``{freq}`` - Frequency in units of eV.
* ``{reim}`` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary
part of density matrix.
perturbation
The perturbation which the density matrices are a response to.
frequencies
Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
real
Calculate the Fourier transform of the real part of the density matrix
imag
Calculate the Fourier transform of the imaginary part of the density matrix.
calc_size
Size of the calculation communicator.
"""
def __init__(self,
ksd: KohnShamDecomposition | str,
frho_fmt: str,
perturbation: PerturbationLike,
frequencies: list[float] | Array1D[np.float64],
real: bool = True,
imag: bool = True,
calc_size: int = 1,
log: Logger | None = None):
reader = FrequencyDensityMatrixReader(frho_fmt=frho_fmt,
ksd=ksd,
filter_frequencies=frequencies,
real=real, imag=imag,
log=log)
self.reader = reader
super().__init__(ksd=reader.ksd, frequencies=reader.frequencies,
real=real, imag=imag,
calc_size=calc_size,
log=reader.log)
self.perturbation = create_perturbation(perturbation)
assert isinstance(self.perturbation, (DeltaKick, NoPerturbation))
def __str__(self) -> str:
return str(self.reader)
def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]:
for work in self.local_work_plan:
self.log.start('read')
rho = self.reader.read(work.freq, work.reim == 'Re')
if isinstance(self.perturbation, DeltaKick):
# Only delta kick supported at this point for normalization
rho /= self.perturbation.strength
matrices = {0: rho if self.calc_comm.rank == 0 else None}
dm = DensityMatrix(ksd=self.ksd, matrices=matrices)
yield work, dm
[docs]
class FrequencyDensityMatricesFromWaveFunctions(FrequencyDensityMatrices):
"""
Collection of density matrices in the Kohn-Sham basis in the frequency
domain, for different frequencies. Obtained from the time-dependent wave functions file,
which is read from disk.
Parameters
----------
ksd
KohnShamDecomposition object or file name.
wfs_fname
File name of the time-dependent wave functions file.
perturbation
The perturbation which the density matrices are a response to.
frequencies
Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
frequency_broadening
Gaussian broadening width in units of eV. Default (0) is no broadening.
real
Calculate the real part of density matrices.
imag
Calculate the imaginary part of density matrices.
calc_size
Size of the calculation communicator.
stridet
Skip this many steps when reading the time-dependent wave functions file.
"""
def __init__(self,
ksd: KohnShamDecomposition | str,
wfs_fname: str,
perturbation: PerturbationLike,
frequencies: list[float] | Array1D[np.float64],
frequency_broadening: float = 0,
real: bool = True,
imag: bool = True,
log: Logger | None = None,
calc_size: int = 1,
stridet: int = 1):
_, calc_size = two_communicator_sizes(-1, calc_size)
# The calc_comm rank 0's are world ranks 0, with a spacing of calc_size
result_on_ranks = list(range(0, world.size, calc_size))
rho_nn_fft = FourierTransformer.from_parameters(
wfs_fname, ksd,
perturbation=perturbation,
yield_re=real,
yield_im=imag,
filter_frequencies=np.array(frequencies) * eV_to_au,
frequency_broadening=frequency_broadening * eV_to_au,
stridet=stridet,
result_on_ranks=result_on_ranks,
log=log)
self.rho_nn_fft = rho_nn_fft
super().__init__(ksd=rho_nn_fft.ksd, frequencies=rho_nn_fft.freq_w * au_to_eV,
frequency_broadening=frequency_broadening,
real=real, imag=imag,
calc_size=calc_size,
log=rho_nn_fft.log)
def __str__(self) -> str:
lines = []
lines.append('Response from time-dependent wave functions')
lines.append('')
lines += ['' + line for line in str(self.rho_nn_fft.rho_nn_reader.rho_wfs_reader).split('\n')]
lines.append('')
lines += ['' + line for line in str(self.rho_nn_fft.rho_nn_reader).split('\n')]
lines.append('')
lines += ['' + line for line in str(self.rho_nn_fft).split('\n')]
return '\n'.join(lines)
def get_memory_estimate(self) -> MemoryEstimate:
return self.rho_nn_fft.get_memory_estimate()
@property
def myw(self) -> list[int]:
""" List of indices corresponding to the frequency indices on held on this rank. """
return self.rho_nn_fft.my_work()
def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]:
parameters = self.rho_nn_fft.rho_nn_reader._parameters
flt = (slice(parameters.n1size), slice(parameters.n2size))
dist_buffer = self.rho_nn_fft.dist_buffer # Perform the redistribution
self.ksd.distribute(self.calc_comm)
for work in self.local_work_plan:
if self.calc_comm.rank == 0:
assert self.myw[work.localw] == work.globalw
if self.calc_comm.rank == 0:
# Buffer shape is i, a, frequencies
rho_ia = dist_buffer._get_data(work.reim == 'Re', 0)[flt + (work.localw, )]
matrices = {0: rho_ia if self.calc_comm.rank == 0 else None}
dm = DensityMatrix(ksd=self.ksd, matrices=matrices, comm=self.calc_comm)
yield work, dm
[docs]
def parallel_prepare(self):
self.rho_nn_fft.dist_buffer # Perform the redistribution