from __future__ import annotations
from typing import Collection, Generator
import numpy as np
from numpy.typing import NDArray
from gpaw.tddft.units import au_to_as
from .base import BaseDistributor, RhoParameters
from .time import TimeDistributor, AlltoallvTimeDistributor
from ..buffer import DensityMatrixBuffer
from ..readers.gpaw import KohnShamRhoWfsReader
from ...utils import get_array_filter, safe_fill_larger, fast_pad
from ...perturbation import create_perturbation, PerturbationLike, PulsePerturbation
from ...utils.logging import format_times
from ...utils.memory import MemoryEstimate
[docs]
class PulseConvolver(BaseDistributor):
r""" Class performing pulse convolution of density matrices.
The procedure of the pulse convolution is the following:
- The entire time series of (real and/or imaginary parts of) density matrices
is read, in several chunks of indices (n1, n2). Each MPI rank works on
different chunks.
- Each chunk is Fourier transformed, divided by the Fourier transform of the
original perturbation and multiplied by the Fourier transform of the
new pulse(s).
- Optionally, derivatives are computed by multiplying the density matrices
in the frequency domain by factors of :math:`i \omega`.
- Each chunk is inverse Fourier tranformed, and only selected times are kept.
Additionally, this class can redistribute the resulting convoluted density matrices
so that the each rank holds the entire density matrix, for a few times.
Parameters
----------
rho_nn_reader
Object begin able to iteratively read density matrices in the time domain.
Density matrices are split in chunks and distributed among ranks.
perturbation
The perturbation which the density matrices read by :attr:`rho_nn_reader`
are a response to.
pulses
List of pulses to perform to convolution with.
derivative_order_s
List of derivative orders to compute.
filter_times
After convolution keep only these times (or the times closest to them).
In atomic units.
result_on_ranks
List of ranks among which the resulting arrays will be distributed.
Empty list (default) to distribute among all ranks.
"""
def __init__(self,
rho_nn_reader: TimeDistributor,
perturbation: PerturbationLike,
pulses: Collection[PerturbationLike],
derivative_order_s: list[int] = [0],
filter_times: list[float] | NDArray[np.float64] | None = None,
result_on_ranks: list[int] = []):
super().__init__(rho_nn_reader.rho_wfs_reader,
rho_nn_reader._parameters,
comm=rho_nn_reader.comm)
self.rho_nn_reader = rho_nn_reader
# Check if we need to perform upscaling
wfs_time_t = self.rho_nn_reader.time_t
dt = self.rho_nn_reader.dt
self._warn_too_small_dt = False
if filter_times is None or len(filter_times) < 2:
self._upscaling = 1
else:
# See if there is a mismatch between wanted and existing time grids.
# If so, then upscale the data in the Fourier transform step.
self._requested_dt = min(np.diff(np.sort(filter_times)))
upscaling = int(np.round(dt / self._requested_dt))
if upscaling < 1:
upscaling = 1
elif upscaling > 100:
self._warn_too_small_dt = True
upscaling = 1
self._upscaling = upscaling
# Construct an upscaled times grid
self._time_t = wfs_time_t[0] + dt/self.upscaling * np.arange(len(wfs_time_t) * self.upscaling)
# And filter it
self._flt_t = get_array_filter(self._time_t, filter_times)
# Set up pulses and perturbation
self.pulses = [create_perturbation(pulse) for pulse in pulses]
if not all(isinstance(pulse, PulsePerturbation) for pulse in self.pulses):
raise ValueError('Pulse convolution can only be performed with pulses of type PulsePerturbation.')
self.perturbation = create_perturbation(perturbation)
# Set up derivatives
assert all(order in [0, 1, 2] for order in derivative_order_s)
assert all(np.diff(derivative_order_s) > 0), 'Derivative orders must be strictly increasing'
self.derivative_order_s = derivative_order_s
# Check which ranks the result should be stored on
if len(result_on_ranks) == 0:
self._result_on_ranks = set(range(self.comm.size))
else:
assert all(isinstance(rank, int) and rank >= 0 and rank < self.comm.size
for rank in result_on_ranks), result_on_ranks
self._result_on_ranks = set(result_on_ranks)
self._dist_buffer: DensityMatrixBuffer | None = None
@property
def dtype(self):
return np.float64
@property
def xshape(self):
return (len(self.pulses), self.nt)
@property
def time_t(self) -> NDArray[np.float64]:
""" Array of times corresponding to convoluted density matrices; in atomic units. """
return self._time_t[self._flt_t]
@property
def nt(self) -> int:
""" Number of times for which convoluted density matrices are calculated. """
return len(self.time_t)
@property
def nlocalt(self) -> int:
""" Number of times stored on this rank after redistribution of the result. """
return (self.nt + self.nranks_result - 1) // self.nranks_result
@property
def upscaling(self) -> int:
""" Upscaling factor.
Data is upscaled in time by this factor during the Fourier transformation step,
in order to calculate convoluted density matrices at a denser grid of times than
what is present in the time-dependent wave functions file.
"""
return self._upscaling
@property
def result_on_ranks(self) -> list[int]:
""" Set of ranks among which the result will be distributed. """
return sorted(self._result_on_ranks)
@property
def nranks_result(self) -> int:
""" Number of ranks storing part of the result after redistribution. """
return len(self._result_on_ranks)
def distributed_work(self) -> list[list[int]]:
# Empty list for ranks that will not have any part of the result
timet_r = self.comm.size * [[]]
for r, rank in enumerate(self.result_on_ranks):
timet_r[rank] = list(range(r, self.nt, self.nranks_result))
return timet_r
def my_work(self) -> list[int]:
timet_r = self.distributed_work()
return timet_r[self.comm.rank]
def __str__(self) -> str:
wfs_nt = len(self.rho_nn_reader.time_t)
dt = self.rho_nn_reader.dt
lines = []
lines.append('Pulse convolver')
lines.append(f' Performing convolution trick on {self.maxnchunks} ranks')
lines.append(' Fast Fourier transform')
lines.append(f' matrix dimensions {self.rho_nn_reader._parameters.nnshape}')
lines.append(f' grid of {wfs_nt} times')
lines.append(f' {self.describe_reim()}')
lines.append(' In frequency domain')
lines.append(f' calculating {self.describe_derivatives()}')
if self._warn_too_small_dt:
lines.append('WARNING:, the smallest spacing between requested times is ')
lines.append(f'{self._requested_dt * au_to_as:.2f}. This is much smaller than the time step ')
lines.append(f'in the time-dependent wave functions file ({dt * au_to_as:.2f} as). ')
lines.append('No upscaling will be done.')
elif self.upscaling == 1:
lines.append(' not upscaling data')
else:
lines.append(f' upscaling by factor {self.upscaling}')
lines.append(f' requested time step {self._requested_dt * au_to_as:.2f} as')
lines.append(f' time stpe in file {dt * au_to_as:.2f} as.')
lines.append(f' convolution with {len(self.pulses)} pulses')
lines.append(' Fast inverse Fourier transform')
lines.append(f' keeping time grid of {self.nt} times')
lines.append(f' {format_times(self.time_t, units="au")}')
lines.append('')
lines.append(' Redistributing into full density matrices')
lines.append(f' {self.niters} iterations to process all chunks')
lines.append(f' matrix dimensions {self.rho_nn_reader._parameters.full_nnshape}')
lines.append(f' result stored on {self.nranks_result} ranks')
return '\n'.join(lines)
def get_memory_estimate(self) -> MemoryEstimate:
parameters = self.rho_nn_reader._parameters
narrays = (2 if self.yield_re and self.yield_im else 1) * len(self.derivative_order_s)
temp_shape = parameters.nnshape + (self.maxnchunks, len(self.pulses), self.nlocalt, narrays)
result_shape = parameters.full_nnshape + (len(self.pulses), self.nlocalt, narrays)
total_result_size = int(np.prod(parameters.full_nnshape + (len(self.pulses), self.nt))) * narrays
comment = f'Buffers hold {narrays} arrays ({self.describe_reim()}, {self.describe_derivatives()})'
own_memory_estimate = MemoryEstimate(comment=comment)
own_memory_estimate.add_key('Temporary buffer', temp_shape, float,
on_num_ranks=self.nranks_result)
own_memory_estimate.add_key('Result buffer', result_shape, float,
total_size=total_result_size,
on_num_ranks=self.nranks_result)
memory_estimate = MemoryEstimate()
memory_estimate.add_child('Time-dependent wave functions reader',
self.rho_nn_reader.rho_wfs_reader.get_memory_estimate())
memory_estimate.add_child('Parallel density matrices reader',
self.rho_nn_reader.get_memory_estimate())
memory_estimate.add_child('Pulse convolver',
own_memory_estimate)
return memory_estimate
def _freq_domain_derivative(self,
order: int) -> NDArray[np.complex128 | np.float64]:
r""" Take derivative in frequency space by multiplying by .. math:
(i \omega)^n.
Parameters
----------
order
Order :math:`n` of the derivative.
"""
if order == 0:
return np.array([1])
padnt = fast_pad(self.rho_nn_reader.nt)
dt = self.rho_nn_reader.dt
omega_w = 2 * np.pi * np.fft.rfftfreq(padnt, dt)
return (1.0j * omega_w) ** order
@property
def dist_buffer(self) -> DensityMatrixBuffer:
""" Buffer of denisty matrices on this rank after redistribution. """
if self._dist_buffer is None:
self._dist_buffer = self.redistribute()
return self._dist_buffer
def __iter__(self) -> Generator[DensityMatrixBuffer, None, None]:
""" Iteratively read density matrices and perform the pulse convolution.
Each iteration performs the calculation for a different chunk of the
density matrix. Each rank works on a different set of chunks.
All ranks always work on the entire grid of times, during all interations.
Yields
------
A chunk of the convoluted density matrices, for the requested times.
"""
wfs_time_t = self.rho_nn_reader.time_t # Times in wave functions file
padnt = fast_pad(len(wfs_time_t)) # Pad with zeros
# Take Fourier transform of pulses
pulse_pt = [pulse.pulse.strength(wfs_time_t) for pulse in self.pulses]
pulse_pw = np.fft.rfft(pulse_pt, axis=-1, n=padnt)
# Create buffer for result
dm_buffer = DensityMatrixBuffer(self.rho_nn_reader._parameters.nnshape,
(len(self.pulses), self.nt),
np.float64)
dm_buffer.zero_buffers(real=self.yield_re, imag=self.yield_im, derivative_order_s=self.derivative_order_s)
for read_buffer in self.rho_nn_reader:
x = []
if self.yield_re:
x.append((read_buffer._re_buffers[0], dm_buffer._re_buffers))
if self.yield_im:
x.append((read_buffer._im_buffers[0], dm_buffer._im_buffers))
for data_nnt, buffers in x:
# Take the Fourier transform of the data (Rerho or Imrho) and divide by
# the Fourier transform of the perturbation
# The data is padded by zeros, circumventing the periodicity of the Fourier transform
data_nnw = self.perturbation.normalize_frequency_response(data_nnt, wfs_time_t, padnt, axis=-1)
# Loop over the desired outputs (and which derivative orders they are)
for derivative, buffer_nnpt in buffers.items():
deriv_w = self._freq_domain_derivative(derivative)
for p, pulse_w in enumerate(pulse_pw):
# Multiply factor for derivative (power of I*omega)
# All timesteps cancel when taking fft->ifft, so do not scale by it
_data_nnw = data_nnw * (deriv_w * pulse_w)
# Inverse Fourier transform
# Optionally, the data is upscaled by padding with even more zeros
conv_nnt = np.fft.irfft(_data_nnw, n=padnt * self.upscaling, axis=-1) * self.upscaling
buffer_nnpt[..., p, :] = conv_nnt[..., :len(self._time_t)][..., self._flt_t]
yield dm_buffer.copy()
[docs]
def create_out_buffer(self) -> DensityMatrixBuffer:
""" Create the DensityMatrixBuffer to hold the temporary density matrix after each redistribution """
parameters = self.rho_nn_reader._parameters
nlocalt = self.nlocalt if self.comm.rank in self.result_on_ranks else 0
out_dm = DensityMatrixBuffer(nnshape=parameters.nnshape,
xshape=(self.maxnchunks, len(self.pulses), nlocalt),
dtype=np.float64)
out_dm.zero_buffers(real=self.yield_re, imag=self.yield_im, derivative_order_s=self.derivative_order_s)
return out_dm
[docs]
def create_result_buffer(self) -> DensityMatrixBuffer:
""" Create the DensityMatrixBuffer to hold the resulting density matrix """
parameters = self.rho_nn_reader._parameters
nnshape = parameters.full_nnshape
full_dm = DensityMatrixBuffer(nnshape=nnshape,
xshape=(len(self.pulses), len(self.my_work())),
dtype=np.float64)
full_dm.zero_buffers(real=self.yield_re, imag=self.yield_im, derivative_order_s=self.derivative_order_s)
return full_dm
[docs]
def redistribute(self) -> DensityMatrixBuffer:
""" Perform the pulse convolution and redistribute the resulting density matrices.
During the pulse convolution step, the data is distributed such that each rank
stores the entire time series for one chunk of the density matrices, i.e. indices n1, n2.
This function then performs a redistribution of the data such that each rank stores full
density matrices, for certain times.
If the density matrices are split into more chunks than there are ranks, then the
chunks are read, convoluted with pulses and distributed in a loop several times until all
data has been processed.
Returns
-------
Density matrix buffer with x-dimensions (number of pulses, number of local times)
where the number of local times variers between the ranks.
"""
local_work = iter(self)
parameters = self.rho_nn_reader._parameters
log = self.log
self.rho_nn_reader.rho_wfs_reader.lcao_rho_reader.striden == 0, \
'n stride must be 0 (index all) for redistribute'
# Time indices of result on each rank
timet_r = self.distributed_work()
out_dm = self.create_out_buffer()
full_dm = self.create_result_buffer()
_exhausted = object()
# Loop over the chunks of the density matrix
for chunki, indices_r in enumerate(self.work_loop_by_ranks()):
# At this point, each rank stores one unique chunk of the density matrix.
# All ranks have the entire time series of data for their own chunk.
# If there are more chunks than ranks, then this loop will run
# for several iterations. If the number of chunks is not divisible by the number of
# ranks then, during the last iteration, some of the chunks are None (meaning the rank
# currently has no data).
# List of chunks that each rank currently stores, where element r of the list
# contains the chunk that rank r works with. Ranks higher than the length of the list
# currently store no chunks.
# The list itself is identical on all ranks.
chunks_by_rank = [indices[2:] for indices in indices_r if indices is not None]
ntargets = len(chunks_by_rank)
if self.comm.rank < ntargets:
# This rank has data to send. Compute the pulse convolution and store the result
dm_buffer = next(local_work)
else:
# This rank has no data to send
assert next(local_work, _exhausted) is _exhausted
# Still, we need to create a dummy buffer
dm_buffer = DensityMatrixBuffer(nnshape=parameters.nnshape,
xshape=(0, 0), dtype=np.float64)
dm_buffer.zero_buffers(real=self.yield_re, imag=self.yield_im,
derivative_order_s=self.derivative_order_s)
log.start('alltoallv')
# Redistribute the data:
# - dm_buffer stores single chunks of density matrices, for all times and pulses.
# - out_dm will store several chunks, for a few times
# source_indices_r describes which slices of dm_buffer should be sent to which rank
# target_indices_r describes to which positions of the out_dm buffer should be received
# from which rank
source_indices_r = [None if len(t) == 0 else (slice(None), t) for t in timet_r]
target_indices_r = [r if r < ntargets else None for r in range(self.comm.size)]
dm_buffer.redistribute(out_dm,
comm=self.comm,
source_indices_r=source_indices_r,
target_indices_r=target_indices_r,
log=log)
if self.comm.rank == 0:
log(f'Chunk {chunki+1}/{self.niters}: distributed convoluted response in '
f'{log.elapsed("alltoallv"):.1f}s', who='Response', flush=True)
# Copy the redistributed data into the aggregated results buffer
for array_nnrpt, full_array_nnpt in zip(out_dm._iter_buffers(), full_dm._iter_buffers()):
for r, nn_indices in enumerate(chunks_by_rank):
safe_fill_larger(full_array_nnpt[nn_indices], array_nnrpt[:, :, r])
assert next(local_work, _exhausted) is _exhausted
return full_dm
[docs]
@classmethod
def from_reader(cls, # type: ignore
rho_nn_reader: KohnShamRhoWfsReader,
parameters: RhoParameters,
perturbation: PerturbationLike,
pulses: Collection[PerturbationLike],
derivative_order_s: list[int] = [0],
filter_times: list[float] | NDArray[np.float64] | None = None,
result_on_ranks: list[int] = []) -> PulseConvolver:
time_distributor = AlltoallvTimeDistributor(rho_nn_reader, parameters)
pulse_convolver = cls(time_distributor,
pulses=pulses,
perturbation=perturbation,
derivative_order_s=derivative_order_s,
filter_times=filter_times,
result_on_ranks=result_on_ranks)
return pulse_convolver