Source code for rhodent.density_matrices.time

from __future__ import annotations

from typing import Generator, Collection

import numpy as np
from numpy.typing import NDArray

from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition
from gpaw.lcaotddft.laser import GaussianPulse, Laser
from gpaw.tddft.units import as_to_au, au_to_as, eV_to_au, fs_to_au
from gpaw.mpi import world

from .distributed.pulse import PulseConvolver
from .readers.gpaw import KohnShamRhoWfsReader
from .density_matrix import DensityMatrix
from .base import BaseDensityMatrices, WorkMetadata
from .frequency import FrequencyDensityMatricesFromDisk
from ..perturbation import create_perturbation, Perturbation, PerturbationLike, PulsePerturbation
from ..utils import Logger, two_communicator_sizes, get_gaussian_pulse_values
from ..utils.logging import format_times
from ..utils.memory import MemoryEstimate


[docs] class ConvolutionDensityMatrixMetadata(WorkMetadata): """ Metadata to the density matrices. Properties ---------- density_matrices Parent of this object. globalt Time index. localt Time index on this rank. globalp Pulse index. localp Pulse index on this rank. """ density_matrices: ConvolutionDensityMatrices globalt: int globalp: int localt: int localp: int def __new__(cls, density_matrices: ConvolutionDensityMatrices, globalt: int, globalp: int, localt: int, localp: int): self = WorkMetadata.__new__(cls, density_matrices=density_matrices) self.globalt = globalt self.globalp = globalp self.localt = localt self.localp = localp return self @property def global_indices(self): return (self.globalp, self.globalt) @property def time(self) -> float: """ Simulation time in units of as. """ return self.density_matrices.times[self.globalt] @property def pulse(self) -> Perturbation: """ Pulse. """ return self.density_matrices.pulses[self.globalp] @property def desc(self) -> str: timestr = f'{self.time:.1f}as' if len(self.density_matrices.pulses) == 1: return timestr d = self.pulse.todict() if d['name'] == 'GaussianPulse': pulsestr = f'{d["frequency"]:.1f}eV' else: pulsestr = f'#{self.globalp}' return f'{timestr} @ Pulse {pulsestr}'
class ConvolutionDensityMatrices(BaseDensityMatrices[ConvolutionDensityMatrixMetadata]): """ Collection of density matrices in the Kohn-Sham basis for different times, after convolution with various pulses. Plain density matrices and/or derivatives thereof may be represented. Parameters ---------- ksd KohnShamDecomposition object or filename. pulses Convolute the density matrices with these pulses. times Compute density matrices for these times (or as close to them as possible). In units of as. derivative_order_s Compute density matrix derivatives of the following orders. 0 for plain density matrix and positive integers for derivatives. real Calculate the real part of density matrices. imag Calculate the imaginary part of density matrices. calc_size Size of the calculation communicator. """ def __init__(self, ksd: KohnShamDecomposition | str, pulses: Collection[PerturbationLike], times: list[float] | NDArray[np.float64], derivative_order_s: list[int] = [0], real: bool = True, imag: bool = True, calc_size: int = 1, log: Logger | None = None): self._time_t = np.array(times) self._pulses = [create_perturbation(pulse) for pulse in pulses] super().__init__(ksd=ksd, derivative_order_s=derivative_order_s, real=real, imag=imag, calc_size=calc_size, log=log) @property def times(self) -> NDArray[np.float64]: """ Simulation time in units of as. """ return self._time_t @property def nt(self) -> int: """ Number of times. """ return len(self.times) @property def pulses(self) -> list[Perturbation]: """ Pulses with which density matrices are convoluted. """ return self._pulses def work_loop(self, rank: int) -> Generator[ConvolutionDensityMatrixMetadata | None, None, None]: nt = len(self.times) ntperrank = (nt + self.loop_comm.size - 1) // self.loop_comm.size # Do convolution pulse-by-pulse, time-by-time for p, pulse in enumerate(self.pulses): # Determine which times to compute on this loop_comm rank for good load balancing shift = (p * nt + rank) % self.loop_comm.size for localt in range(ntperrank): globalt = shift + localt * self.loop_comm.size if globalt < nt: yield ConvolutionDensityMatrixMetadata(density_matrices=self, globalt=globalt, localt=localt, globalp=p, localp=p) else: yield None
[docs] class TimeDensityMatricesFromWaveFunctions(ConvolutionDensityMatrices): """ Collection of density matrices in the Kohn-Sham basis for different times, obtained by reading the time-dependent wave functions file. Parameters ---------- ksd KohnShamDecomposition object or filename. wfs_fname Filename of the time-dependent wave functions file. times Compute density matrices for these times (or as close to them as possible). In units of as. real Calculate the real part of density matrices. imag Calculate the imaginary part of density matrices. calc_size Size of the calculation communicator. stridet Skip this many steps when reading the time-dependent wave functions file. """ def __init__(self, ksd: KohnShamDecomposition | str, wfs_fname: str, times: list[float] | NDArray[np.float64], real: bool = True, imag: bool = True, calc_size: int = 1, log: Logger | None = None, stridet: int = 1): rho_nn_direct = KohnShamRhoWfsReader(wfs_fname=wfs_fname, ksd=ksd, yield_re=real, yield_im=imag, filter_times=np.array(times) * as_to_au, stridet=stridet, log=log) self.rho_nn_direct = rho_nn_direct super().__init__(ksd=rho_nn_direct.ksd, times=rho_nn_direct.time_t * au_to_as, real=real, imag=imag, pulses=[None], calc_size=1, log=rho_nn_direct.log) imin, imax, amin, amax = self.ksd.ialims() # Read density matrices corresponding to ksd ialims self._n1slice = slice(imin, imax + 1) self._n2slice = slice(amin, amax + 1) def __str__(self) -> str: lines = ['Response from time-dependent wave functions'] lines.append('') lines += [' ' + line for line in str(self.rho_nn_direct).split('\n')] shape = (self._n1slice.stop - self._n1slice.start, self._n2slice.stop - self._n2slice.start,) if 'Re' in self.reim and 'Im' in self.reim: reim_desc = 'real and imaginary parts' elif 'Re' in self.reim: reim_desc = 'real parts' else: reim_desc = 'imaginary parts' lines.append('') lines.append(f'Density matrix shape {shape}') lines.append(f'reading {reim_desc}') return '\n'.join(lines) def get_memory_estimate(self) -> MemoryEstimate: shape = (self._n1slice.stop - self._n1slice.start, self._n2slice.stop - self._n2slice.start,) narrays = 2 if 'Re' in self.reim and 'Im' in self.reim else 1 for t_r in self.rho_nn_direct.work_loop_by_ranks(): nreading = len(t_r) break memory_estimate = MemoryEstimate() memory_estimate.add_child('Time-dependent wave functions reader', self.rho_nn_direct.get_memory_estimate()) memory_estimate.add_key('Density matrix', (shape) + (narrays, ), on_num_ranks=nreading) return memory_estimate def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]: assert self.calc_comm.size == 1 # TODO for work, dm_buffer in zip(self.work_loop(self.loop_comm.rank), self.rho_nn_direct.iread(0, 0, self._n1slice, self._n2slice)): assert work is not None if 'Re' in self.reim: Rerho_ia = dm_buffer._get_real(0) if 'Im' in self.reim: Imrho_ia = dm_buffer._get_imag(0) if 'Re' in self.reim and 'Im' in self.reim: # Complex result # Compared to numpy, we use another convention, hence the minus sign rho_ia = Rerho_ia - 1j * Imrho_ia elif 'Re' in self.reim: # Real result rho_ia = Rerho_ia else: rho_ia = -1j * Imrho_ia matrices = {0: rho_ia} dm = DensityMatrix(ksd=self.ksd, matrices=matrices, data_is_ravelled=False) yield work, dm
[docs] def parallel_prepare(self): self.rho_nn_direct.parallel_prepare()
[docs] class ConvolutionDensityMatricesFromDisk(ConvolutionDensityMatrices): """ Collection of density matrices in the Kohn-Sham basis after convolution with one or several pulses, for different times. Read from disk. Parameters ---------- ksd KohnShamDecomposition object or filename. pulserho_fmt Formatting string for the density matrices saved to disk. Example: * pulserho_fmt = `pulserho/t{time:09.1f}{tag}.npy`. Accepts variables * `{time}` - Time in units of as. * `{tag}` - Derivative tag, `''`, `'-Iomega'`, or `'-omega2'`. * `{pulsefreq}` - Pulse frequency in units of eV. * `{pulsefwhm}` - Pulse FWHM in units of fs. pulses Convolute the density matrices with these pulses. times Compute density matrices for these times (or as close to them as possible). In units of as. derivative_order_s Compute density matrix derivatives of the following orders. ``0`` for plain density matrix and positive integers for derivatives. real Calculate real part of density matrices. imag Calculate imaginary part of density matrices. calc_size Size of the calculation communicator. """ def __init__(self, ksd: KohnShamDecomposition | str, pulserho_fmt: str, pulses: Collection[PerturbationLike], times: list[float] | NDArray[np.float64], derivative_order_s: list[int] = [0], log: Logger | None = None, real: bool = True, imag: bool = True, calc_size: int = 1): super().__init__(ksd=ksd, pulses=pulses, times=times, real=real, imag=imag, derivative_order_s=derivative_order_s, calc_size=calc_size, log=log) self.ksd.distribute(self.calc_comm) self.pulserho_fmt = pulserho_fmt def __str__(self) -> str: lines = ['Response from density matrices on disk'] lines.append('') lines.append(f'Calculating response for {self.nt} times and {len(self.pulses)} pulses') lines.append(f' times: {format_times(self.times)}') return '\n'.join(lines) def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]: tag_s = ['', '-Iomega', '-omega2'] for work in self.local_work_plan: self.log.start('read') matrices: dict[int, NDArray[np.complex128] | None] = dict() for derivative in self.derivative_order_s: if self.calc_comm.rank > 0: # Don't read on non calc comm root ranks matrices[derivative] = None continue fname_kw = dict(time=work.time, tag=tag_s[derivative], **get_gaussian_pulse_values(work.pulse)) fname = self.pulserho_fmt.format(**fname_kw) f = np.load(fname) if isinstance(f, np.lib.npyio.NpzFile): # Read npz file rho = f['rho_p'] f.close() else: # Read npy file assert isinstance(f, np.ndarray) rho = f matrices[derivative] = rho dm = DensityMatrix(ksd=self.ksd, matrices=matrices, data_is_ravelled=True) yield work, dm
[docs] class ConvolutionDensityMatricesFromFrequency(ConvolutionDensityMatrices): """ Collection of density matrices in the Kohn-Sham basis after convolution with one or several pulses, for different times. Obtained from the the density matrices in frequency space, which are read from disk. Parameters ---------- ksd KohnShamDecomposition object or filename. frho_fmt Formatting string for the density matrices in frequency space saved to disk. Example: * frho_fmt = `frho/w{freq:05.2f}-{reim}.npy'`. Accepts variables: * `{freq}` - Frequency in units of eV. * `{reim}` - 'Re' or 'Im' for Fourier transform of real/imaginary part of density matrix. perturbation Perturbation that was present during time propagation. pulses Convolute the density matrices with these pulses. times Compute density matrices for these times (or as close to them as possible). In units of as. derivative_order_s Compute density matrix derivatives of the following orders. `0` for plain density matrix and positive integers for derivatives. real Calculate real part of density matrices. imag Calculate imaginary part of density matrices. calc_size Size of the calculation communicator. """ def __init__(self, ksd: KohnShamDecomposition | str, frho_fmt: str, perturbation: PerturbationLike, pulses: Collection[PerturbationLike], times: list[float] | NDArray[np.float64], derivative_order_s: list[int] = [0], log: Logger | None = None, real: bool = True, imag: bool = True, calc_size: int = 1): super().__init__(ksd=ksd, pulses=pulses, times=times, real=real, imag=imag, derivative_order_s=derivative_order_s, calc_size=calc_size, log=log) # Frequency grid for convolution if not all(pulse.todict()['name'] == 'GaussianPulse' for pulse in self.pulses): raise NotImplementedError('Only Gaussian pulses implemented for ResponseFromFourierTransform') full_freq_w = convolution_freq_w_many([pulse.pulse for pulse in self.pulses]) # type: ignore freq_density_matrices = FrequencyDensityMatricesFromDisk( ksd=self.ksd, frho_fmt=frho_fmt, perturbation=perturbation, frequencies=full_freq_w, real=real, imag=imag, derivative_order_s=derivative_order_s, calc_size=calc_size) self.freq_density_matrices = freq_density_matrices # Currently, both objects are creating their own communicators assert self.loop_comm.rank == freq_density_matrices.loop_comm.rank assert self.loop_comm.size == freq_density_matrices.loop_comm.size assert self.calc_comm.rank == freq_density_matrices.calc_comm.rank assert self.calc_comm.size == freq_density_matrices.calc_comm.size def __str__(self) -> str: lines = ['Response from Fourier transform of density matrices on disk'] lines.append('') lines.append(f'Calculating response for {self.nt} times and {len(self.pulses)} pulses') lines.append(f' times: {format_times(self.times)}') return '\n'.join(lines) def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]: import gc # Begin by reading the full matrix of all frequency density matrices on all ranks # Distribute the ksd over the calc_comm self.freq_density_matrices.ksd.distribute(self.calc_comm) self.ksd.distribute(self.calc_comm) # Read density matrix wsrq_shape = (len(self.freq_density_matrices.frequencies), len(self.tag_s), 2, len(self.ksd.w_q)) full_rho_wsrq = np.zeros(wsrq_shape, dtype=complex) for fwork, dm in self.freq_density_matrices.iread_gather_on_root(): w = fwork.globalw r = fwork.globalr self.log.start('distribute') for s, derivative in enumerate(self.derivative_order_s): rho_p = dm._get_rho(derivative, ravelled=True) if self.calc_comm.rank == 0 else None self.ksd.distribute_p(rho_p, full_rho_wsrq[w, s, r], root=0) del rho_p if self.loop_comm.rank == 0 and self.calc_comm.rank == 0: self.log(f'Distributed density matrix in {self.log.elapsed("distribute"):.1f} s', who='Response', if_elapsed=5, flush=True) gc.collect() full_freq_w = self.freq_density_matrices.frequencies matrices: dict[int, NDArray[np.complex128] | None] for work in self.local_work_plan: # Determine which subset of the full_freq_w is needed freq_w = convolution_freq_w(self.pulses[work.globalp].pulse) # type: ignore after_w = full_freq_w > freq_w[-1] + 1e-3 startw: int = np.argmax(full_freq_w > freq_w[0] - 1e-3) # type: ignore stopw: int = len(full_freq_w) if not any(after_w) else np.argmax(after_w) # type: ignore # Extract the same subset of the density matrices rho_wsrq = full_rho_wsrq[startw:stopw] # Convolve assert isinstance(work.pulse, PulsePerturbation) convolver = FourierConvolver(work.pulse.pulse, freq_w * eV_to_au, rho_wsrq) self.log.start('convolve') pulserho_srq = convolver.convolve(work.time, units='as') if self.loop_comm.rank == 0 and self.calc_comm.rank == 0: self.log(f'Convolved density matrices in {self.log.elapsed("convolve"):.1f} s', who='Response', flush=True) matrices = {} for derivative, pulserho_q in zip(self.derivative_order_s, pulserho_srq[:, 0] + 1.0j * pulserho_srq[:, 1]): pulserho_p = self.ksd.collect_q(pulserho_q, root=0) if self.calc_comm.rank == 0: matrices[derivative] = pulserho_p else: assert pulserho_p is None matrices[derivative] = None del pulserho_srq del pulserho_p gc.collect() dm = DensityMatrix(ksd=self.ksd, matrices=matrices, data_is_ravelled=True) yield work, dm
[docs] class ConvolutionDensityMatricesFromWaveFunctions(ConvolutionDensityMatrices): """ Collection of density matrices in the Kohn-Sham basis after convolution with one or several pulses, for different times. Obtained from the time-dependent wave functions file, which is read from disk. Parameters ---------- ksd KohnShamDecomposition object or filename. wfs_fname Filename of the time-dependent wave functions file. perturbation Perturbation that was present during time propagation. pulses Convolute the density matrices with these pulses. times Compute density matrices for these times (or as close to them as possible). In units of as. derivative_order_s Compute density matrix derivatives of the following orders. ``0`` for plain density matrix and positive integers for derivatives. real Calculate real part of density matrices. imag Calculate imaginary part of density matrices. calc_size Size of the calculation communicator. stridet Skip this many steps when reading the time-dependent wave functions file. """ def __init__(self, ksd: KohnShamDecomposition | str, wfs_fname: str, perturbation: PerturbationLike, pulses: Collection[PerturbationLike], times: list[float] | NDArray[np.float64], derivative_order_s: list[int] = [0], real: bool = True, imag: bool = True, calc_size: int = 1, log: Logger | None = None, stridet: int = 1): _, calc_size = two_communicator_sizes(-1, calc_size) # The calc_comm rank 0's are world ranks 0, with a spacing of calc_size result_on_ranks = list(range(0, world.size, calc_size)) rho_nn_conv = PulseConvolver.from_parameters( wfs_fname, ksd, pulses=pulses, perturbation=perturbation, yield_re=real, yield_im=imag, filter_times=np.array(times) * as_to_au, derivative_order_s=list(sorted(derivative_order_s)), stridet=stridet, result_on_ranks=result_on_ranks, log=log) self.rho_nn_conv = rho_nn_conv super().__init__(ksd=rho_nn_conv.ksd, pulses=pulses, times=rho_nn_conv.time_t * au_to_as, real=real, imag=imag, derivative_order_s=derivative_order_s, calc_size=calc_size, log=rho_nn_conv.log) def __str__(self) -> str: lines = [] lines.append('Response from time-dependent wave functions ') lines.append('') lines += ['' + line for line in str(self.rho_nn_conv.rho_nn_reader.rho_wfs_reader).split('\n')] lines.append('') lines += ['' + line for line in str(self.rho_nn_conv.rho_nn_reader).split('\n')] lines.append('') lines += ['' + line for line in str(self.rho_nn_conv).split('\n')] return '\n'.join(lines) def get_memory_estimate(self) -> MemoryEstimate: return self.rho_nn_conv.get_memory_estimate() @property def myt(self) -> list[int]: """ List of indices corresponding to the time indices on held on this rank. """ return self.rho_nn_conv.my_work()
[docs] def work_loop(self, rank: int) -> Generator[ConvolutionDensityMatrixMetadata | None, None, None]: nt = len(self.times) ntperrank = (nt + self.loop_comm.size - 1) // self.loop_comm.size for p, pulse in enumerate(self.pulses): for localt in range(ntperrank): globalt = rank + localt * self.loop_comm.size if globalt < nt: yield ConvolutionDensityMatrixMetadata(density_matrices=self, globalt=globalt, localt=localt, globalp=p, localp=p) else: yield None
def __iter__(self) -> Generator[tuple[ConvolutionDensityMatrixMetadata, DensityMatrix], None, None]: parameters = self.rho_nn_conv.rho_nn_reader._parameters flt = (slice(parameters.n1size), slice(parameters.n2size)) dist_buffer = self.rho_nn_conv.dist_buffer # Perform the redistribution self.ksd.distribute(self.calc_comm) if self.calc_comm.rank != 0: assert len(self.myt) == 0, self.myt for work in self.local_work_plan: if self.calc_comm.rank == 0: assert self.myt[work.localt] == work.globalt localflt = flt + (work.localp, work.localt) matrices: dict[int, NDArray[np.complex128] | None] = dict() for derivative in self.derivative_order_s: if self.calc_comm.rank > 0: matrices[derivative] = None continue # Buffer shape is i, a, pulses, times if 'Re' in self.reim: Rerho_ia = dist_buffer._get_real(derivative)[localflt] if 'Im' in self.reim: Imrho_ia = dist_buffer._get_imag(derivative)[localflt] if 'Re' in self.reim and 'Im' in self.reim: # Complex result # Compared to numpy, we use another convention, hence the minus sign rho_ia = Rerho_ia - 1j * Imrho_ia elif 'Re' in self.reim: # Real result rho_ia = Rerho_ia else: rho_ia = -1j * Imrho_ia matrices[derivative] = rho_ia dm = DensityMatrix(ksd=self.ksd, matrices=matrices, data_is_ravelled=False) yield work, dm
[docs] def parallel_prepare(self): self.rho_nn_conv.dist_buffer # Perform the redistribution
class FourierConvolver: """ Convolve using Fourier transforms. Use given frequencies in Fourier transform. """ def __init__(self, pulse: Laser, omega_w: NDArray[np.float64], data_wX: NDArray[np.complex128]): self.pulse = pulse # Impulse response data self.omega_w = omega_w d_wX = data_wX # Pulse in frequency space pulse_w = self.pulse.fourier(omega_w) # Convolution product d_Xw = np.moveaxis(d_wX, 0, -1) pd_Xw = d_Xw * pulse_w self.pd_wX = np.moveaxis(pd_Xw, -1, 0) def convolve(self, time, units='au'): # Convert time to ndarray time_t = np.array(time, dtype=float) if units == 'fs': time_t *= fs_to_au elif units == 'as': time_t *= as_to_au else: assert units == 'au' if time_t.ndim == 0: # If argument is a single number, # return value without time axis return self._convolve(time_t[np.newaxis])[0] return self._convolve(time_t) def _convolve(self, time_t): pd_tX = inversefourier(self.omega_w, self.pd_wX, time_t, None) return pd_tX def fourier(time_t, a_tX, omega_w, folding='Gauss', width=0.08, sign=1.0): if width is None: folding = None if folding is None: def envelope(t): return 1.0 else: width = width * eV_to_au if folding == 'Gauss': # f(w) = Nw exp(-w^2/2/sigma^2) # f(t) = exp(-t^2 sigma^2/2) def envelope(t): return np.exp(-0.5 * width**2 * t**2) elif folding == 'Lorentz': # f(w) = Nw eta / [omega^2 + (eta)^2] # f(t) = exp(-t eta) def envelope(t): return np.exp(-width * t) else: raise RuntimeError('unknown folding "' + folding + '"') dt_t = np.insert(time_t[1:] - time_t[:-1], 0, 0.0) f_wt = np.exp(sign * 1j * np.outer(omega_w, time_t)) a_Xt = np.rollaxis(a_tX, 0, len(a_tX.shape)) a_wX = np.tensordot(f_wt, dt_t * envelope(time_t) * a_Xt, axes=(1, len(a_Xt.shape) - 1)) return a_wX def inversefourier(omega_w, a_wX, time_t, folding='Gauss', width=0.08): # This function assumes that time-domain quantity is real a_tX = fourier(omega_w, a_wX / (2 * np.pi), time_t, folding, width, sign=-1) return 2 * a_tX.real def convolution_freq_w(pulse: GaussianPulse) -> NDArray[np.float64]: """ Compute necessary frequency grid for the supplied pulses Parameters ---------- pulse Laser pulse. The wider the pulse is in frequency the larger frequency grid is to be returned Returns ------- Necessary frequency grid to perform the convolution """ from gpaw.tddft.units import au_to_eV frequency = pulse.omega0 * au_to_eV sigma = pulse.sigma * au_to_eV freqmin = np.floor((frequency - 4 * sigma) * 20) / 20 freqmin = max(freqmin, 0.05) freqmax = np.ceil((frequency + 4 * sigma) * 20) / 20 freq_w = np.arange(freqmin, freqmax + 1e-4, 0.05) return freq_w def convolution_freq_w_many(pulses: Collection[GaussianPulse]) -> NDArray[np.float64]: """ Compute necessary frequency grid for the supplied pulses Parameters ---------- pulses List of laser pulses to consider. The wider the pulses are in frequency the larger frequency grid is to be returned Returns ------- Necessary frequency grid to perform the convolution """ from gpaw.tddft.units import au_to_eV omega0s = np.array([pulse.omega0 for pulse in pulses]) sigmas = np.array([pulse.sigma for pulse in pulses]) sigma = np.max(sigmas) * au_to_eV freqmin = np.floor((np.min(omega0s) * au_to_eV - 4 * sigma) * 20) / 20 freqmin = max(freqmin, 0.05) freqmax = np.ceil((np.max(omega0s) * au_to_eV + 4 * sigma) * 20) / 20 freq_w = np.arange(freqmin, freqmax + 1e-4, 0.05) return freq_w