Source code for rhodent.density_matrices.frequency

from __future__ import annotations

from typing import Generator

import numpy as np
from numpy.typing import NDArray

from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition
from gpaw.tddft.units import au_to_eV, eV_to_au
from gpaw.mpi import world

from .distributed import create_fourier_transformer
from .density_matrix import DensityMatrix
from .base import BaseDensityMatrices, WorkMetadata
from ..utils import Logger, two_communicator_sizes


class FrequencyDensityMatrixMetadata(WorkMetadata):
    """ Metadata to the density matrices """
    density_matrices: FrequencyDensityMatrices
    globalw: int
    localw: int
    globalr: int
    localr: int

    def __new__(cls,
                density_matrices: FrequencyDensityMatrices,
                globalw: int,
                globalr: int,
                localw: int,
                localr: int):
        self = WorkMetadata.__new__(cls, density_matrices=density_matrices)
        self.globalw = globalw
        self.globalr = globalr
        self.localw = localw
        self.localr = localr
        return self

    @property
    def global_indices(self):
        return (self.globalw, self.globalr)

    @property
    def freq(self) -> float:
        """ Frequency in eV """
        return self.density_matrices.frequencies[self.globalw]

    @property
    def reim(self) -> str:
        """ Returns real/imaginary tag 'Re' or 'Im'

        The tag corresponds to the Fourier transform of the real
        or imaginary part of the density matrix
        """
        return self.density_matrices.reim[self.globalr]

    @property
    def desc(self) -> str:
        return f'{self.reim} @ Freq. {self.freq:.3f}eV'


class FrequencyDensityMatrices(BaseDensityMatrices[FrequencyDensityMatrixMetadata]):

    """
    Collection of density matrices in the Kohn-Sham basis in the frequency
    domain, for different frequencies.

    Plain density matrices and/or derivatives thereof may be represented.

    Parameters
    ----------
    ksd
        KohnShamDecomposition object or filename
    frequencies
        Compute density matrices for these frequencies (or as close to them as possible). In eV
    derivative_order_s
        Compute density matrix derivatives of the following orders.
        0 for plain density matrix and positive integers for derivatives
    real
        Calculate the Fourier transform of the real part of the density matrix
    imag
        Calculate the Fourier transform of the imaginary part of the density matrix
    calc_size
        Size of the calculation communicator
    """

    def __init__(self,
                 ksd: KohnShamDecomposition | str,
                 frequencies: list[float] | NDArray[np.float64],
                 derivative_order_s: list[int] = [0],
                 real: bool = True,
                 imag: bool = True,
                 calc_size: int = 1,
                 log: Logger | None = None):
        self._freq_w = np.array(frequencies)

        super().__init__(ksd=ksd,
                         derivative_order_s=derivative_order_s,
                         real=real, imag=imag,
                         calc_size=calc_size, log=log)

    @property
    def frequencies(self) -> NDArray[np.float64]:
        """ Frequncies in eV """
        return self._freq_w

    def work_loop(self,
                  rank: int) -> Generator[FrequencyDensityMatrixMetadata | None, None, None]:
        nw = len(self.frequencies)
        nwperrank = (nw + self.loop_comm.size - 1) // self.loop_comm.size

        for localw in range(nwperrank):
            globalw = rank + localw * self.loop_comm.size
            for r in range(len(self.reim)):
                if globalw < nw:
                    yield FrequencyDensityMatrixMetadata(density_matrices=self, globalw=globalw,
                                                         localw=localw, globalr=r, localr=r)
                else:
                    yield None


[docs] class FrequencyDensityMatricesFromDisk(FrequencyDensityMatrices): """ Collection of density matrices in the Kohn-Sham basis in the frequency domain, for different frequencies. Read from disk. Plain density matrices and/or derivatives thereof may be represented. Parameters ---------- ksd KohnShamDecomposition object or filename frho_fmt The pulserho_fmt is a formatting string for the density matrices in frequency space saved to disk. Example: frho_fmt = 'frho/w{freq:05.2f}-{reim}.npy' Should accept variables {freq} and {reim} frequencies Compute density matrices for these frequencies (or as close to them as possible). In eV derivative_order_s Compute density matrix derivatives of the following orders. 0 for plain density matrix and positive integers for derivatives real Calculate the Fourier transform of the real part of the density matrix imag Calculate the Fourier transform of the imaginary part of the density matrix calc_size Size of the calculation communicator kickstr Strength of the delta kick used during time propagation """ def __init__(self, ksd: KohnShamDecomposition | str, frho_fmt: str, frequencies: list[float] | NDArray[np.float64], real: bool = True, imag: bool = True, derivative_order_s: list[int] = [0], calc_size: int = 1, kickstr: float = 1e-5): super().__init__(ksd=ksd, frequencies=frequencies, real=real, imag=imag, derivative_order_s=derivative_order_s, calc_size=calc_size) self.frho_fmt = frho_fmt self.kickstr = kickstr def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]: from gpaw.tddft.units import eV_to_au omega_w = self.frequencies * eV_to_au for work in self.local_work_plan: self.log.start('read') matrices: dict[int, NDArray[np.complex64] | None] = dict() for derivative in self.derivative_order_s: if self.calc_comm.rank > 0: # Don't read on non calc comm root ranks matrices[derivative] = None continue fname_kw = dict(freq=work.freq, reim=work.reim) fname = self.frho_fmt.format(**fname_kw) f = np.load(fname) if isinstance(f, np.lib.npyio.NpzFile): # Read npz file rho = f['rho_p'] f.close() else: # Read npy file assert isinstance(f, np.ndarray) rho = f # Apply scale if derivative == 1: rho *= -1.0j * omega_w[work.globalw] elif derivative == 2: rho *= -1.0 * omega_w[work.globalw] ** 2 if self.ksd.only_ia: # Twice the rho is saved by the KohnShamDecomposition transform rho /= 2 matrices[derivative] = rho dm = DensityMatrix(ksd=self.ksd, matrices=matrices, data_is_ravelled=True) yield work, dm
[docs] class FrequencyDensityMatricesFromWaveFunctions(FrequencyDensityMatrices): """ Collection of density matrices in the Kohn-Sham basis in the frequency domain, for different frequencies. Obtained from the wave functions dump file, which is read from disk. Plain density matrices and/or derivatives thereof may be represented. Parameters ---------- ksd KohnShamDecomposition object or filename wfs_fname Filename of the GPAW wave functions dump file frequencies Compute density matrices for these frequencies (or as close to them as possible). In eV derivative_order_s Compute density matrix derivatives of the following orders. 0 for plain density matrix and positive integers for derivatives real Calculate the real part of density matrices imag Calculate the imaginary part of density matrices calc_size Size of the calculation communicator stride_opts Options passed to the GPAW wave functions reader stridet Skip this many steps when reading wave function dump file """ def __init__(self, ksd: KohnShamDecomposition | str, wfs_fname: str, frequencies: list[float] | NDArray[np.float64], real: bool = True, imag: bool = True, derivative_order_s: list[int] = [0], calc_size: int = 1, stride_opts=None, stridet: int = 1): _, calc_size = two_communicator_sizes(-1, calc_size) # The calc_comm rank 0's are world ranks 0, with a spacing of calc_size result_on_ranks = list(range(0, world.size, calc_size)) rho_nn_fft = create_fourier_transformer(wfs_fname, ksd, yield_re=real, yield_im=imag, filter_frequencies=np.array(frequencies) * eV_to_au, stride_opts=stride_opts, stridet=stridet, result_on_ranks=result_on_ranks) self.rho_nn_fft = rho_nn_fft super().__init__(ksd=rho_nn_fft.ksd, frequencies=rho_nn_fft.freq_w * au_to_eV, real=real, imag=imag, derivative_order_s=derivative_order_s, calc_size=calc_size, log=rho_nn_fft.log) @property def myw(self) -> list[int]: """ List of indices corresponding to the frequency indices on held on this rank """ return self.rho_nn_fft.my_work() def __iter__(self) -> Generator[tuple[FrequencyDensityMatrixMetadata, DensityMatrix], None, None]: parameters = self.rho_nn_fft.rho_nn_reader._parameters flt = (slice(parameters.n1size), slice(parameters.n2size)) dist_buffer = self.rho_nn_fft.dist_buffer # Perform the redistribution self.log.comm = self.loop_comm # Indicate in the logger that we are now working on the loop comm self.ksd.distribute(self.calc_comm) omega_w = self.frequencies * eV_to_au for work in self.local_work_plan: if self.calc_comm.rank == 0: assert self.myw[work.localw] == work.globalw matrices: dict[int, NDArray[np.complex64] | None] = dict() for derivative in self.derivative_order_s: if self.calc_comm.rank > 0: matrices[derivative] = None continue # Buffer shape is i, a, frequencies rho_ia = dist_buffer._get_data(work.reim == 'Re', 0)[flt + (work.localw, )] if derivative == 1: rho_ia = rho_ia * 1.0j * omega_w[work.globalw] # TODO * -1j elif derivative == 2: rho_ia = - rho_ia * omega_w[work.globalw] ** 2 matrices[derivative] = rho_ia dm = DensityMatrix(ksd=self.ksd, matrices=matrices, data_is_ravelled=False) yield work, dm
[docs] def parallel_prepare(self): self.rho_nn_fft.dist_buffer # Perform the redistribution