from __future__ import annotations
from typing import Generator
import numpy as np
from numpy.typing import NDArray
from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition
from gpaw.tddft.units import au_to_eV, eV_to_au
from gpaw.mpi import world
from .distributed.frequency import FourierTransformer
from .density_matrix import DensityMatrix
from .base import BaseDensityMatrices, WorkMetadata
from ..utils import Logger, two_communicator_sizes
from ..utils.logging import format_frequencies
from ..utils.memory import MemoryEstimate
from ..perturbation import create_perturbation, PerturbationLike, DeltaKick
def read_frho(fname: str,
omega: float,
derivative: int = 0,
only_ia: bool = True) -> NDArray[np.complex128]:
""" Read density matrix in frequency space from ULM file
and apply the correct scaling.
Parameters
----------
fname
Input file name.
omega
Frequency in atomic units.
derivative
0 for plain density matrix and positive integers for derivatives.
only_ia
Whether the density matrix was written with the
:class:`KohnShamDecomposition` in :attr:`only_ia` mode.
Returns
-------
Density matrix in the Kohn-Sham basis (a 1D array of electron-hole pairs).
"""
f = np.load(fname)
if isinstance(f, np.lib.npyio.NpzFile):
# Read npz file
rho = f['rho_p']
f.close()
else:
# Read npy file
assert isinstance(f, np.ndarray)
rho = f
# Apply scale
if derivative == 1:
rho *= -1.0j * omega
elif derivative == 2:
rho *= -1.0 * omega ** 2
if only_ia:
# Twice the rho is saved by the KohnShamDecomposition transform
rho /= 2
return rho
class FrequencyDensityMatrices(BaseDensityMatrices[FrequencyDensityMatrixMetadata]):
"""
Collection of density matrices in the Kohn-Sham basis in the frequency
domain, for different frequencies.
Plain density matrices and/or derivatives thereof may be represented.
Parameters
----------
ksd
KohnShamDecomposition object or filename.
frequencies
Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
frequency_broadening
Gaussian broadening width in units of eV. Default (0) is no broadening.
derivative_order_s
Compute density matrix derivatives of the following orders.
``0`` for plain density matrix and positive integers for derivatives.
real
Calculate the Fourier transform of the real part of the density matrix.
imag
Calculate the Fourier transform of the imaginary part of the density matrix.
calc_size
Size of the calculation communicator.
"""
def __init__(self,
ksd: KohnShamDecomposition | str,
frequencies: list[float] | NDArray[np.float64],
frequency_broadening: float = 0,
derivative_order_s: list[int] = [0],
real: bool = True,
imag: bool = True,
calc_size: int = 1,
log: Logger | None = None):
self._freq_w = np.array(frequencies)
self._frequency_broadening = frequency_broadening
super().__init__(ksd=ksd,
derivative_order_s=derivative_order_s,
real=real, imag=imag,
calc_size=calc_size, log=log)
@property
def frequencies(self) -> NDArray[np.float64]:
""" Frequencies in units of eV. """
return self._freq_w
@property
def frequency_broadening(self) -> float:
""" Gaussian broadening width for frequencies in units of eV. """
return self._frequency_broadening
def work_loop(self,
rank: int) -> Generator[FrequencyDensityMatrixMetadata | None, None, None]:
nw = len(self.frequencies)
nwperrank = (nw + self.loop_comm.size - 1) // self.loop_comm.size
for localw in range(nwperrank):
globalw = rank + localw * self.loop_comm.size
for r in range(len(self.reim)):
if globalw < nw:
yield FrequencyDensityMatrixMetadata(density_matrices=self, globalw=globalw,
localw=localw, globalr=r, localr=r)
else:
yield None
[docs]
class FrequencyDensityMatricesFromDisk(FrequencyDensityMatrices):
"""
Collection of density matrices in the Kohn-Sham basis in the frequency
domain, for different frequencies. Read from disk.
Plain density matrices and/or derivatives thereof may be represented.
Parameters
----------
ksd
KohnShamDecomposition object or filename.
frho_fmt
Formatting string for the density matrices
in frequency space saved to disk.
Example:
* frho_fmt = `frho/w{freq:05.2f}-{reim}.npy`.
Accepts variables:
* `{freq}` - Frequency in units of eV.
* `{reim}` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary
part of density matrix.
perturbation
The perturbation which the density matrices are a response to.
frequencies
Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
derivative_order_s
Compute density matrix derivatives of the following orders.
``0`` for plain density matrix and positive integers for derivatives.
real
Calculate the Fourier transform of the real part of the density matrix
imag
Calculate the Fourier transform of the imaginary part of the density matrix.
calc_size
Size of the calculation communicator.
"""
def __init__(self,
ksd: KohnShamDecomposition | str,
frho_fmt: str,
perturbation: PerturbationLike,
frequencies: list[float] | NDArray[np.float64],
real: bool = True,
imag: bool = True,
derivative_order_s: list[int] = [0],
calc_size: int = 1,
log: Logger | None = None):
super().__init__(ksd=ksd, frequencies=frequencies,
real=real, imag=imag,
derivative_order_s=derivative_order_s, calc_size=calc_size,
log=log)
self.frho_fmt = frho_fmt
self.perturbation = create_perturbation(perturbation)
assert isinstance(self.perturbation, DeltaKick)
def __str__(self) -> str:
lines = ['Response from Fourier transform of density matrices on disk']
lines.append('')
lines.append(f'Calculating response for {len(self.frequencies)} frequencies')
lines.append(f' frequencies: {format_frequencies(self.frequencies)}')
return '\n'.join(lines)
def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]:
from gpaw.tddft.units import eV_to_au
omega_w = self.frequencies * eV_to_au
for work in self.local_work_plan:
self.log.start('read')
matrices: dict[int, NDArray[np.complex128] | None] = dict()
for derivative in self.derivative_order_s:
if self.calc_comm.rank > 0:
# Don't read on non calc comm root ranks
matrices[derivative] = None
continue
fname_kw = dict(freq=work.freq, reim=work.reim)
fname = self.frho_fmt.format(**fname_kw)
rho = read_frho(fname, omega_w[work.globalw], derivative, self.ksd.only_ia)
rho /= self.perturbation.strength
matrices[derivative] = rho
dm = DensityMatrix(ksd=self.ksd, matrices=matrices, data_is_ravelled=True)
yield work, dm
[docs]
class FrequencyDensityMatricesFromWaveFunctions(FrequencyDensityMatrices):
"""
Collection of density matrices in the Kohn-Sham basis in the frequency
domain, for different frequencies. Obtained from the time-dependent wave functions file,
which is read from disk.
Plain density matrices and/or derivatives thereof may be represented.
Parameters
----------
ksd
KohnShamDecomposition object or filename.
wfs_fname
Filename of the time-dependent wave functions file.
perturbation
The perturbation which the density matrices are a response to.
frequencies
Compute density matrices for these frequencies (or as close to them as possible). In units of eV.
frequency_broadening
Gaussian broadening width in units of eV. Default (0) is no broadening.
derivative_order_s
Compute density matrix derivatives of the following orders.
``0`` for plain density matrix and positive integers for derivatives.
real
Calculate the real part of density matrices.
imag
Calculate the imaginary part of density matrices.
calc_size
Size of the calculation communicator.
stridet
Skip this many steps when reading the time-dependent wave functions file.
"""
def __init__(self,
ksd: KohnShamDecomposition | str,
wfs_fname: str,
perturbation: PerturbationLike,
frequencies: list[float] | NDArray[np.float64],
frequency_broadening: float = 0,
real: bool = True,
imag: bool = True,
derivative_order_s: list[int] = [0],
log: Logger | None = None,
calc_size: int = 1,
stridet: int = 1):
_, calc_size = two_communicator_sizes(-1, calc_size)
# The calc_comm rank 0's are world ranks 0, with a spacing of calc_size
result_on_ranks = list(range(0, world.size, calc_size))
rho_nn_fft = FourierTransformer.from_parameters(
wfs_fname, ksd,
perturbation=perturbation,
yield_re=real,
yield_im=imag,
filter_frequencies=np.array(frequencies) * eV_to_au,
frequency_broadening=frequency_broadening * eV_to_au,
stridet=stridet,
result_on_ranks=result_on_ranks,
log=log)
self.rho_nn_fft = rho_nn_fft
super().__init__(ksd=rho_nn_fft.ksd, frequencies=rho_nn_fft.freq_w * au_to_eV,
frequency_broadening=frequency_broadening,
real=real, imag=imag,
derivative_order_s=derivative_order_s, calc_size=calc_size,
log=rho_nn_fft.log)
def __str__(self) -> str:
lines = []
lines.append('Response from time-dependent wave functions')
lines.append('')
lines += ['' + line for line in str(self.rho_nn_fft.rho_nn_reader.rho_wfs_reader).split('\n')]
lines.append('')
lines += ['' + line for line in str(self.rho_nn_fft.rho_nn_reader).split('\n')]
lines.append('')
lines += ['' + line for line in str(self.rho_nn_fft).split('\n')]
return '\n'.join(lines)
def get_memory_estimate(self) -> MemoryEstimate:
return self.rho_nn_fft.get_memory_estimate()
@property
def myw(self) -> list[int]:
""" List of indices corresponding to the frequency indices on held on this rank. """
return self.rho_nn_fft.my_work()
def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]:
parameters = self.rho_nn_fft.rho_nn_reader._parameters
flt = (slice(parameters.n1size), slice(parameters.n2size))
dist_buffer = self.rho_nn_fft.dist_buffer # Perform the redistribution
self.ksd.distribute(self.calc_comm)
omega_w = self.frequencies * eV_to_au
for work in self.local_work_plan:
if self.calc_comm.rank == 0:
assert self.myw[work.localw] == work.globalw
matrices: dict[int, NDArray[np.complex128] | None] = dict()
for derivative in self.derivative_order_s:
if self.calc_comm.rank > 0:
matrices[derivative] = None
continue
# Buffer shape is i, a, frequencies
rho_ia = dist_buffer._get_data(work.reim == 'Re', 0)[flt + (work.localw, )]
if derivative == 1:
rho_ia = rho_ia * 1.0j * omega_w[work.globalw] # TODO * -1j
elif derivative == 2:
rho_ia = - rho_ia * omega_w[work.globalw] ** 2
matrices[derivative] = rho_ia
dm = DensityMatrix(ksd=self.ksd, matrices=matrices, data_is_ravelled=False)
yield work, dm
[docs]
def parallel_prepare(self):
self.rho_nn_fft.dist_buffer # Perform the redistribution