Source code for rhodent.density_matrices.distributed.frequency
from __future__ import annotations
from typing import Generator
import numpy as np
from numpy.typing import NDArray
from gpaw.tddft.units import au_to_eV
from .base import BaseDistributor, RhoParameters
from .time import TimeDistributor, AlltoallvTimeDistributor
from ..buffer import DensityMatrixBuffer
from ..readers.gpaw import KohnShamRhoWfsReader
from ...utils import get_array_filter, safe_fill_larger, fast_pad
from ...utils.logging import format_frequencies
from ...utils.memory import MemoryEstimate
from ...perturbation import create_perturbation, PerturbationLike
[docs]
class FourierTransformer(BaseDistributor):
""" Iteratively take the Fourier transform of density matrices.
Parameters
----------
rho_nn_reader
Object that can iteratively read density matrices in the time domain,
distributed such that different ranks have different chunks of the density
matrix, but each ranks has all times for the same chunk.
perturbation
The perturbation which the density matrices are a response to.
filter_frequencies
After Fourier transformation keep only these frequencies (or the frequencies
closest to them). In atomic units.
frequency_broadening
Gaussian broadening width in atomic units. Default (0) is no broadening.
result_on_ranks
List of ranks among which the resulting arrays will be distributed.
Empty list (default) to distribute among all ranks.
"""
def __init__(self,
rho_nn_reader: TimeDistributor,
perturbation: PerturbationLike,
filter_frequencies: list[float] | NDArray[np.float64] | None = None,
frequency_broadening: float = 0,
result_on_ranks: list[int] = []):
super().__init__(rho_nn_reader.rho_wfs_reader,
rho_nn_reader._parameters,
comm=rho_nn_reader.comm)
self.rho_nn_reader = rho_nn_reader
self.perturbation = create_perturbation(perturbation)
self.frequency_broadening = frequency_broadening
self._flt_w = get_array_filter(self._omega_w, filter_frequencies)
if len(result_on_ranks) == 0:
self._result_on_ranks = set(range(self.comm.size))
else:
assert all(isinstance(rank, int) and rank >= 0 and rank < self.comm.size
for rank in result_on_ranks), result_on_ranks
self._result_on_ranks = set(result_on_ranks)
self._dist_buffer: DensityMatrixBuffer | None = None
@property
def dtype(self):
return np.complex128
@property
def xshape(self):
return (self.nw, )
@property
def freq_w(self) -> NDArray[np.float64]:
return self._omega_w[self.flt_w]
@property
def _omega_w(self) -> NDArray[np.float64]:
padnt = fast_pad(self.rho_nn_reader.nt)
dt = self.rho_nn_reader.dt
omega_w = 2 * np.pi * np.fft.rfftfreq(padnt, dt)
return omega_w
@property
def nw(self) -> int:
return len(self.freq_w)
@property
def nlocalw(self) -> int:
return (self.nw + self.nranks_result - 1) // self.nranks_result
@property
def flt_w(self) -> slice | NDArray[np.bool_]:
return self._flt_w
@property
def result_on_ranks(self) -> list[int]:
""" Set of ranks among which the result will be distributed """
return sorted(self._result_on_ranks)
@property
def nranks_result(self) -> int:
""" Number of ranks that the resulting arrays will be distributed among """
return len(self._result_on_ranks)
def distributed_work(self) -> list[list[int]]:
freqw_r = self.comm.size * [[]]
for r, rank in enumerate(self.result_on_ranks):
freqw_r[rank] = list(range(r, self.nw, self.nranks_result))
return freqw_r
def my_work(self) -> list[int]:
freqw_r = self.distributed_work()
return freqw_r[self.comm.rank]
def __str__(self) -> str:
nt = len(self.rho_nn_reader.time_t)
niters = len(list(self.work_loop_by_ranks()))
lines = []
lines.append('Fourier transformer')
lines.append(f' Calculating Fourier transform on {self.maxnchunks} ranks')
lines.append(' Fast Fourier transform')
lines.append(f' matrix dimensions {self.rho_nn_reader._parameters.nnshape}')
lines.append(f' grid of {nt} times')
lines.append(f' {self.describe_reim()}')
if self.frequency_broadening == 0:
lines.append(' No frequency broadening')
else:
lines.append(f' Applying frequency broadening of {self.frequency_broadening * au_to_eV:.2f}eV')
lines.append(f' keeping frequency grid of {self.nw} frequencies')
lines.append(f' {format_frequencies(self.freq_w, units="au")}')
lines.append('')
lines.append(' Redistributing into full density matrices')
lines.append(f' {niters} iterations to process all chunks')
lines.append(f' matrix dimensions {self.rho_nn_reader._parameters.full_nnshape}')
lines.append(f' result stored on {self.nranks_result} ranks')
return '\n'.join(lines)
def get_memory_estimate(self) -> MemoryEstimate:
parameters = self.rho_nn_reader._parameters
narrays = (2 if self.yield_re and self.yield_im else 1) * len(self.derivative_order_s)
temp_shape = parameters.nnshape + (self.maxnchunks, self.nlocalw, narrays)
result_shape = parameters.full_nnshape + (self.nlocalw, narrays)
total_result_size = int(np.prod(parameters.full_nnshape)) * self.nw * narrays
comment = f'Buffers hold {narrays} arrays ({self.describe_reim()})'
own_memory_estimate = MemoryEstimate(comment=comment)
own_memory_estimate.add_key('Temporary buffer', temp_shape, complex,
on_num_ranks=self.nranks_result)
own_memory_estimate.add_key('Result buffer', result_shape, complex,
total_size=total_result_size,
on_num_ranks=self.nranks_result)
memory_estimate = MemoryEstimate()
memory_estimate.add_child('Time-dependent wave functions reader',
self.rho_nn_reader.rho_wfs_reader.get_memory_estimate())
memory_estimate.add_child('Parallel density matrices reader',
self.rho_nn_reader.get_memory_estimate())
memory_estimate.add_child('Fourier transformer',
own_memory_estimate)
return memory_estimate
def __iter__(self) -> Generator[DensityMatrixBuffer, None, None]:
time_t = self.rho_nn_reader.time_t # Times in wave functions file
dt = self.rho_nn_reader.dt # Time step
padnt = fast_pad(len(time_t)) # Pad with zeros
dm_buffer = DensityMatrixBuffer(self.rho_nn_reader._parameters.nnshape,
(self.nw, ),
np.complex128)
if self.yield_re:
dm_buffer.zeros(True, 0)
if self.yield_im:
dm_buffer.zeros(False, 0)
for read_buffer in self.rho_nn_reader:
for data_nnt, buffer_nnw in zip(read_buffer._iter_buffers(), dm_buffer._iter_buffers()):
if self.frequency_broadening == 0:
data_nnw = self.perturbation.normalize_frequency_response(data_nnt, time_t, padnt, axis=-1)
else:
data_nnt = self.perturbation.normalize_time_response(data_nnt, time_t, axis=-1)
data_nnt[..., :len(time_t)] *= np.exp(-0.5 * self.frequency_broadening ** 2 * time_t**2)
data_nnw = np.fft.rfft(data_nnt, n=padnt, axis=-1) * dt
buffer_nnw[:] = data_nnw[..., self.flt_w].conj() # Change sign convention
yield dm_buffer.copy()
@property
def dist_buffer(self) -> DensityMatrixBuffer:
""" Buffer of density matrices on this rank after redistribution """
if self._dist_buffer is None:
self._dist_buffer = self.redistribute()
return self._dist_buffer
[docs]
def create_out_buffer(self) -> DensityMatrixBuffer:
""" Create the DensityMatrixBuffer to hold the temporary density matrix after each redistribution """
parameters = self.rho_nn_reader._parameters
nlocalw = self.nlocalw if self.comm.rank in self.result_on_ranks else 0
out_dm = DensityMatrixBuffer(nnshape=parameters.nnshape,
xshape=(self.maxnchunks, nlocalw),
dtype=np.complex128)
out_dm.zero_buffers(real=self.yield_re, imag=self.yield_im, derivative_order_s=self.derivative_order_s)
return out_dm
[docs]
def create_result_buffer(self) -> DensityMatrixBuffer:
""" Create the DensityMatrixBuffer to hold the resulting density matrix """
parameters = self.rho_nn_reader._parameters
nnshape = parameters.full_nnshape
full_dm = DensityMatrixBuffer(nnshape=nnshape,
xshape=(len(self.my_work()), ),
dtype=np.complex128)
full_dm.zero_buffers(real=self.yield_re, imag=self.yield_im, derivative_order_s=self.derivative_order_s)
return full_dm
[docs]
def redistribute(self) -> DensityMatrixBuffer:
""" Perform the Fourier transform and redistribute the data
When the Fourier transform is performed, the data is distributed such that each rank
stores the entire time/frequency series for one chunk of the density matrices, i.e. indices n1, n2.
This function then performs a redistribution of the data such that each rank stores full
density matrices, for certain frequencies.
If the density matrices are split into more chunks than there are ranks, then the
chunks are read, Fourier transformed and distributed in a loop several times until all
data has been processed.
Returns
-------
Density matrix buffer with x-dimensions (Number of local frequencies, )
where the Number of local frequencies variers between the ranks.
"""
local_work = iter(self)
parameters = self.rho_nn_reader._parameters
log = self.log
self.rho_nn_reader.rho_wfs_reader.lcao_rho_reader.striden == 0, \
'n stride must be 0 (index all) for redistribute'
# Frequency indices of result on each rank
freqw_r = self.distributed_work()
niters = len(list(self.work_loop_by_ranks()))
out_dm = self.create_out_buffer()
full_dm = self.create_result_buffer()
_exhausted = object()
# Loop over the chunks of the density matrix
for chunki, indices_r in enumerate(self.work_loop_by_ranks()):
# At this point, each rank stores one unique chunk of the density matrix.
# All ranks have the entire time series of data for their own chunk.
# If there are more chunks than ranks, then this loop will run
# for several iterations. If the number of chunks is not divisible by the number of
# ranks then, during the last iteration, some of the chunks are None (meaning the rank
# currently has no data).
# List of chunks that each rank currently stores, where element r of the list
# contains the chunk that rank r works with. Ranks higher than the length of the list
# currently store no chunks.
# The list itself is identical on all ranks.
chunks_by_rank = [indices[2:] for indices in indices_r if indices is not None]
ntargets = len(chunks_by_rank)
if self.comm.rank < ntargets:
# This rank has data to send. Compute the Fourier transform and store the result
dm_buffer = next(local_work)
else:
# This rank has no data to send
assert next(local_work, _exhausted) is _exhausted
# Still, we need to create a dummy buffer
dm_buffer = DensityMatrixBuffer(nnshape=parameters.nnshape,
xshape=(0, ), dtype=np.complex128)
dm_buffer.zero_buffers(real=self.yield_re, imag=self.yield_im,
derivative_order_s=self.derivative_order_s)
log.start('alltoallv')
# Redistribute the data:
# - dm_buffer stores single chunks of density matrices, for all frequencies.
# - out_dm will store several chunks, for a few frequencies.
# source_indices_r describes which slices of dm_buffer should be sent to which rank
# target_indices_r describes to which positions of the out_dm buffer should be received
# from which rank
source_indices_r = [None if len(w) == 0 else w for w in freqw_r]
target_indices_r = [r if r < ntargets else None for r in range(self.comm.size)]
dm_buffer.redistribute(out_dm,
comm=self.comm,
source_indices_r=source_indices_r,
target_indices_r=target_indices_r,
log=log)
if self.comm.rank == 0:
log(f'Chunk {chunki+1}/{niters}: distributed frequency response in '
f'{log.elapsed("alltoallv"):.1f}s', flush=True, who='Response')
for array_nnrw, full_array_nnw in zip(out_dm._iter_buffers(), full_dm._iter_buffers()):
for r, nn_indices in enumerate(chunks_by_rank):
safe_fill_larger(full_array_nnw[nn_indices], array_nnrw[:, :, r])
assert next(local_work, _exhausted) is _exhausted
return full_dm
[docs]
@classmethod
def from_reader(cls, # type: ignore
rho_nn_reader: KohnShamRhoWfsReader,
parameters: RhoParameters,
*,
perturbation: PerturbationLike,
filter_frequencies: list[float] | NDArray[np.float64] | None = None,
frequency_broadening: float = 0,
result_on_ranks: list[int] = []) -> FourierTransformer:
time_distributor = AlltoallvTimeDistributor(rho_nn_reader, parameters)
fourier_transformer = FourierTransformer(time_distributor,
perturbation=perturbation,
filter_frequencies=filter_frequencies,
frequency_broadening=frequency_broadening,
result_on_ranks=result_on_ranks)
return fourier_transformer