Source code for rhodent.density_matrices.distributed.time

from __future__ import annotations

from typing import Generator
import numpy as np

from .base import RhoParameters
from rhodent.density_matrices.buffer import DensityMatrixBuffer
from rhodent.density_matrices.readers.gpaw import KohnShamRhoWfsReader
from rhodent.utils import safe_fill
from .base import BaseDistributor, RhoIndices
from ...typing import Communicator
from ...utils.memory import MemoryEstimate


[docs] class TimeDistributor(BaseDistributor): """ Iteratively read density matrices in the time domain This class uses the KohnShamRhoWfsReader to iteratively read wave functions (each rank reading the same times) and construct density matrices in the ground state Kohn-Sham basis for each time. The different ranks are reading different chunks of the density matrices. The density matrices are accumulated in a buffer and yielded when all times have been read. Parameters ---------- rho_reader Density matrices reader comm Communicator log Logger """ def __init__(self, rho_reader: KohnShamRhoWfsReader, parameters: RhoParameters | None = None, comm: Communicator | None = None): if rho_reader.comm.size > 1: raise ValueError('Serial TimeDistributor must have serial reader.') super().__init__(rho_reader, parameters, comm=comm) @property def dtype(self): return np.float64 @property def xshape(self): return (self.nt, ) @property def dt(self): return self.rho_wfs_reader.dt @property def nt(self): return self.rho_wfs_reader.nt @property def time_t(self): return self.rho_wfs_reader.time_t def __str__(self) -> str: lines = [] lines.append('Density matrices reader') lines.append(' Receiving density matrices in continuous chunks.') lines.append(f' shape {self._parameters.nnshape}') lines.append(f' received by {self.maxntimes} ranks') lines.append(f' {self.niters} iterations to process all chunks') return '\n'.join(lines) def get_memory_estimate(self) -> MemoryEstimate: narrays = (2 if self.yield_re and self.yield_im else 1) * len(self.derivative_order_s) shape = self._parameters.nnshape + (self.nt, narrays) comment = f'Buffers hold {narrays} arrays ({self.describe_reim()})' memory_estimate = MemoryEstimate(comment=comment) memory_estimate.add_key('Density matrix buffers', shape, float, on_num_ranks=self.comm.size) return memory_estimate def __iter__(self) -> Generator[DensityMatrixBuffer, None, None]: """ Yield density matrices for all times, chunk by chunk The wave function file is read in chunks, time by time, with the reading of times in the inner loop. Yields ------ Chunks of the density matrix """ read_dm = DensityMatrixBuffer(self._parameters.nnshape, (self.nt, ), np.float64) if self.yield_re: read_dm.zeros(True, 0) if self.yield_im: read_dm.zeros(False, 0) self.rho_wfs_reader.parallel_prepare() # Loop over the chunks this rank should gather for indices in self.work_loop(self.comm.rank): if indices is None: continue # Convert to reading indices n1 = slice(self._parameters.n1min + indices.n1.start, self._parameters.n1min + indices.n1.stop) n2 = slice(self._parameters.n2min + indices.n2.start, self._parameters.n2min + indices.n2.stop) gen = self.rho_wfs_reader.iread(indices.s, indices.k, n1, n2) for t in self.rho_wfs_reader.work_loop(self.rho_wfs_reader.comm.rank): if t is None: continue dm_buffer = next(gen) for source_nn, dest_nn in zip(dm_buffer._iter_buffers(), read_dm[t]._iter_buffers()): dest_nn[:] = source_nn yield read_dm
[docs] @classmethod def from_reader(cls, rho_nn_reader: KohnShamRhoWfsReader, parameters: RhoParameters, **kwargs) -> TimeDistributor: return cls(rho_nn_reader, parameters, **kwargs)
[docs] class AlltoallvTimeDistributor(TimeDistributor): """ Iteratively read density matrices in the time domain This class uses the KohnShamRhoWfsReader to iteratively read wave functions (one time per rank) and construct density matrices in the ground state Kohn-Sham basis for each time. When all ranks have read one time each, this class performs a redistribution of data, such that each rank only gets one chunk of the density matrices, but for all times. The density matrices are accumulated in a buffer and yielded when all times have been read. Parameters ---------- rho_reader Density matrices reader """ def __init__(self, rho_reader: KohnShamRhoWfsReader, parameters: RhoParameters | None = None): if rho_reader.lcao_rho_reader.striden != 0: raise ValueError('n stride must be 0 (index all) for alltoallv parallelized method') BaseDistributor.__init__(self, rho_reader, parameters, comm=rho_reader.comm) def __str__(self) -> str: nnshape = self.rho_wfs_reader.nnshape(*self.first_indices()) lines = [] lines.append('Parallel density matrices reader') lines.append(' Receiving density matrices in continuous chunks.') lines.append(f' densiy matrix continous chunk {nnshape}') lines.append(f' split into smaller chunks {self._parameters.nnshape}') lines.append(f' received by {self.maxntimes} ranks') lines.append(' Redistributing into continuous time form.') lines.append(f' sent to {self.maxnchunks} ranks') lines.append(f' {self.niters} iterations to process all chunks') return '\n'.join(lines) def get_memory_estimate(self) -> MemoryEstimate: narrays = (2 if self.yield_re and self.yield_im else 1) * len(self.derivative_order_s) nnshape = self.rho_wfs_reader.nnshape(*self.first_indices()) before_shape = self._parameters.nnshape + (self.maxnchunks, narrays) after_shape = self._parameters.nnshape + (self.nt, narrays) total_after_size = np.prod(self._parameters.nnshape + (self.nt, )) * self.maxnchunks * narrays shape = nnshape + (narrays, ) comment = f'Buffers hold {narrays} arrays ({self.describe_reim()})' memory_estimate = MemoryEstimate(comment=comment) memory_estimate.add_key('Density matrix chunks', shape, float, on_num_ranks=self.maxntimes) memory_estimate.add_key('Before parallel redistribution', before_shape, float, on_num_ranks=self.maxntimes) memory_estimate.add_key('After parallel redistribution', after_shape, float, total_size=total_after_size, on_num_ranks=self.maxnchunks) return memory_estimate def first_indices(self): for first_indices_r in self.work_loop_by_ranks(): break indices_by_rank = [chunk for chunk in first_indices_r if chunk is not None] indices_concat, _ = RhoIndices.concatenate_indices(indices_by_rank) return indices_concat def __iter__(self) -> Generator[DensityMatrixBuffer, None, None]: """ Yield density matrices for all times, chunk by chunk The wave function file is read in chunks, time by time. However, chunks are grouped together so that the density matrix at each time is read in large chunks. Each rank reads the same chunk for a different time. Then, the chunks and times are redistributed, so that each rank now holds a small chunk, but for many times. The same chunk is read for all times before it is yielded. Yields ------ Chunks of the density matrix """ log = self.log self.rho_wfs_reader.parallel_prepare() # Here x is a compound index for a slice of both n and M for chunki, chunks_r in enumerate(self.work_loop_by_ranks()): log.start('read_alltoallv') # The work this rank is supposed to read indices = chunks_r[self.comm.rank] indices_by_rank = [chunk for chunk in chunks_r if chunk is not None] # Number of chunks of nn-indices being read nchunks = len(indices_by_rank) assert nchunks > 0 # Find out how much of the total density matrix need to be read to get only # the required chunks indices_concat, reduced_indices_by_rank = RhoIndices.concatenate_indices(indices_by_rank) if indices is None: # This rank does not want any slices of n1 and n2. # It will still potentially participate in the parallel reading of times assert self.comm.rank >= nchunks n1 = slice(self._parameters.n1min + indices_concat.n1.start, self._parameters.n1min + indices_concat.n1.stop) n2 = slice(self._parameters.n2min + indices_concat.n2.start, self._parameters.n2min + indices_concat.n2.stop) if self.comm.rank < self.maxntimes: # This rank will read contiguous_chunks_buffer = DensityMatrixBuffer(self._parameters.nnshape, (nchunks, ), np.float64) else: # This rank does not read any times contiguous_chunks_buffer = DensityMatrixBuffer(self._parameters.nnshape, (0, ), np.float64) if self.comm.rank < nchunks: # This rank will get a chunk of the density matrices after redistribution contiguous_time_buffer = DensityMatrixBuffer(self._parameters.nnshape, (self.nt, ), np.float64) else: contiguous_time_buffer = DensityMatrixBuffer(self._parameters.nnshape, (0, ), np.float64) if self.yield_re: contiguous_chunks_buffer.zeros(True, 0) contiguous_time_buffer.zeros(True, 0) if self.yield_im: contiguous_chunks_buffer.zeros(False, 0) contiguous_time_buffer.zeros(False, 0) gen = self.rho_wfs_reader.iread(indices_concat.s, indices_concat.k, n1, n2) for t_r in self.rho_wfs_reader.work_loop_by_ranks(): # Number of times being read ntimes = sum(1 for t in t_r if t is not None) # Time index this rank is reading, or None if not reading globalt = t_r[self.comm.rank] # Read the density matrices for one time per rank, # with each rank reading a large chunk of the density matrix if globalt is not None: read_dm = next(gen) for recvrank, readindices in enumerate(reduced_indices_by_rank): for source_nn, dest_nn in zip(read_dm._iter_buffers(), contiguous_chunks_buffer[recvrank]._iter_buffers()): safe_fill(dest_nn, source_nn[readindices.n1, readindices.n2]) else: # This rank actually has no time to read (number of times # is not evenly divisible by number of ranks, and this rank # is trying to read past the end) # This rank will still participate in the alltoallv operation assert self.comm.rank >= ntimes # Perform the redistributions, so that each rank now holds # a smaller chunk of the density matrix, but for many times. contiguous_chunks_buffer.redistribute( contiguous_time_buffer, comm=self.comm, source_indices_r=[(r, ) if r < nchunks else None for r in range(self.comm.size)], target_indices_r=[None if t is None else (t, ) for t in t_r], log=log if 0 in t_r else None) if self.comm.rank == 0: log(f'Chunk {chunki+1}/{self.niters}: Read and distributed density matrices in ' f'{log.elapsed("read_alltoallv"):.1f}s', who='Response', flush=True) if indices is not None: yield contiguous_time_buffer