Source code for rhodent.density_matrices.buffer

from __future__ import annotations

from os import getenv
from typing import Generator, Generic, Sequence
import numpy as np
from numpy.typing import NDArray
from numpy._typing import _DTypeLike as DTypeLike  # parametrizable wrt generic

from ..utils import DTypeT, Logger
from ..typing import ArrayIndex


[docs] class DensityMatrixBuffer(Generic[DTypeT]): """ Buffer for the density matrix. Objects of this class can hold buffers for real and imaginary parts and various derivatives at the same time. Each buffer has two dimensions corresponding to (part of) the density matrix, and optionally additional dimensions for e.g. time. Parameters ---------- nnshape Shape of the dimension corresponding to the density matrix. Must have dimension 2 xshape Shape of the additional dimension corresponding to, e.g., time dtype Data type of the density matrices re_buffers Buffers for the different derivatives of the real part of the density matrix as dictionaries, where the keys is the derivative order (0, 1, 2, etc.) and the value is a numpy array of shape ``(nnshape, xshape)`` im_buffers Same as :attr:`re_buffers` but for imaginary part """ def __init__(self, nnshape: tuple[int, int], xshape: tuple[int, ...], dtype: DTypeLike[DTypeT], re_buffers: dict[int, NDArray[DTypeT]] = dict(), im_buffers: dict[int, NDArray[DTypeT]] = dict()): assert len(nnshape) == 2 assert all(isinstance(dim, (int, np.integer)) and dim >= 0 for dim in nnshape) assert all(isinstance(dim, (int, np.integer)) and dim >= 0 for dim in xshape) assert isinstance(np.dtype(dtype), np.dtype) self._nnshape = nnshape self._xshape = xshape self._dtype = np.dtype(dtype) self._re_buffers: dict[int, NDArray[DTypeT]] = dict() self._im_buffers: dict[int, NDArray[DTypeT]] = dict() for derivative, buffer_nnx in re_buffers.items(): self.store(True, derivative, buffer_nnx) for derivative, buffer_nnx in im_buffers.items(): self.store(False, derivative, buffer_nnx) @property def real(self) -> NDArray[DTypeT]: """ Buffer of shape nnshape + xshape corresponding to real data """ return self._get_real(0) @property def real1(self) -> NDArray[DTypeT]: """ Buffer of shape nnshape + xshape corresponding to real part of first derivative """ return self._get_real(1) @property def real2(self) -> NDArray[DTypeT]: """ Buffer of shape nnshape + xshape corresponding to real part of second derivative """ return self._get_real(2) @property def imag(self) -> NDArray[DTypeT]: """ Buffer of shape nnshape + xshape corresponding to imaginary data """ return self._get_imag(0) @property def imag1(self) -> NDArray[DTypeT]: """ Buffer of shape nnshape + xshape corresponding to imaginary part of first derivative """ return self._get_imag(1) @property def imag2(self) -> NDArray[DTypeT]: """ Buffer of shape nnshape + xshape corresponding to imaginary part of second derivative """ return self._get_imag(2) def _get_real(self, derivative: int) -> NDArray[DTypeT]: """ Fetch density matrix buffer for real data Parameters ---------- derivative Derivative order """ return self._re_buffers[derivative] def _get_imag(self, derivative: int) -> NDArray[DTypeT]: """ Fetch density matrix buffer for imaginary data Parameters ---------- derivative Derivative order """ return self._im_buffers[derivative] def _get_data(self, real: bool, derivative: int) -> NDArray[DTypeT]: """ Fetch density matrix buffer Parameters ---------- real True if real, False if imaginary derivative Derivative order """ return self._get_real(derivative) if real else self._get_imag(derivative)
[docs] def copy(self) -> DensityMatrixBuffer: """ Return a deep copy of this object (buffers are copied) """ re_buffers = {derivative: np.array(buffer_nnx) for derivative, buffer_nnx in self._re_buffers.items()} im_buffers = {derivative: np.array(buffer_nnx) for derivative, buffer_nnx in self._im_buffers.items()} dm_buffer = DensityMatrixBuffer(self.nnshape, self.xshape, dtype=self.dtype, re_buffers=re_buffers, im_buffers=im_buffers) return dm_buffer
def __getitem__(self, value) -> DensityMatrixBuffer: """ Index the buffers and return a new DensityMatrixBuffer with buffers that are views of the buffers of this DensityMatrixBuffer """ if len(self._im_buffers) == 0 and len(self._re_buffers) == 0: # This case needs some special handing to get the dimension of # the output raise NotImplementedError # Wrap in a tuple if not isinstance(value, tuple): value = (value, ) s = (slice(None), slice(None)) + value re_buffers = {derivative: buffer_nnx[s] for derivative, buffer_nnx in self._re_buffers.items()} im_buffers = {derivative: buffer_nnx[s] for derivative, buffer_nnx in self._im_buffers.items()} # Ugly hack. Get any of the buffers xshape = (list(re_buffers.values()) + list(im_buffers.values()))[0].shape[2:] return DensityMatrixBuffer(self.nnshape, xshape, dtype=self.dtype, re_buffers=re_buffers, im_buffers=im_buffers) @property def narrays(self) -> int: """ Number of arrays stored in this object """ return len(self.derivatives_imag) + len(self.derivatives_real) @property def nnshape(self) -> tuple[int, int]: """ Shape of the part of the density matrix """ return self._nnshape @property def xshape(self) -> tuple[int, ...]: """ Shape of the additional dimension of the buffers """ return self._xshape @property def shape(self) -> tuple[int, ...]: """ Shape of the buffers """ return self.nnshape + self.xshape @property def dtype(self) -> np.dtype[DTypeT]: """ Dtype of the buffers """ return self._dtype
[docs] def store(self, real: bool, derivative: int, buffer_nnx: NDArray[DTypeT]): """ Store buffer for part of density matrix Parameters ---------- real True if real, False if imaginary derivative Derivative order buffer_nnx Buffer of shape (nnshape, xshape) """ assert isinstance(derivative, int) and derivative >= 0, derivative assert isinstance(buffer_nnx, np.ndarray) assert buffer_nnx.shape == self.shape, f'{buffer_nnx.shape} != {self.shape}' assert buffer_nnx.dtype == self.dtype if real: self._re_buffers[derivative] = buffer_nnx else: self._im_buffers[derivative] = buffer_nnx
[docs] def zero_buffers(self, real: bool, imag: bool, derivative_order_s: list[int]): """ Initialize many buffers at once to and write zeros Parameters ---------- real Initialize buffers for real parts imag Initialize buffers for imaginary parts derivative_order_s Initialize buffers for these derivative orders """ for derivative in derivative_order_s: if real: self.zeros(True, derivative) if imag: self.zeros(False, derivative)
[docs] def zeros(self, real: bool, derivative: int): """ Initialize buffer with zeros for part of density matrix Parameters ---------- real True if real, False if imaginary derivative Derivative order """ self.store(real, derivative, np.zeros(self.shape, dtype=self.dtype))
[docs] def broadcast_x(self, data_nnx: NDArray[DTypeT]) -> NDArray[DTypeT]: """ Broadcast the x dimensions of data_nnx """ nnshape = data_nnx.shape[:2] data_xnn = np.moveaxis(np.moveaxis(data_nnx, 0, -1), 0, -1) data_xnn = np.broadcast_to(data_xnn, self.xshape + nnshape) data_nnx = np.moveaxis(np.moveaxis(data_xnn, -1, 0), -1, 0) return data_nnx
@property def nnellipsis(self) -> tuple[slice, slice]: return (slice(None), slice(None)) @property def xellipsis(self) -> tuple[slice, ...]: return tuple(len(self.xshape) * [slice(None)])
[docs] def safe_fill(self, real: bool, derivative: int, data_nnx: NDArray[DTypeT]): """ Write data_nnx to the corrsponding buffer, if the dimensions of data_nnx are equal to or smaller than the buffer If the dimensions of data_nnx are smaller than or equal to the dimensions of the buffer, write to the first elements of the buffer. Otherwise raise and error. Parameters ---------- real True if real, False if imaginary derivative Derivative order buffer_nnx Data of shape (nnshape, xshape) """ assert len(data_nnx.shape) <= len(self.shape), f'{data_nnx.shape} > {self.shape}' assert all([dima >= dimb for dima, dimb in zip(self.nnshape, data_nnx.shape[:2])]), \ f'{self.nnshape} < {data_nnx.shape[:2]}' data_nnx = self.broadcast_x(data_nnx) # Broadcast the last dimensions assert data_nnx.shape[2:] == self.xshape, f'{data_nnx.shape[2:]} != {self.xshape}' s = tuple([slice(dim) for dim in data_nnx.shape[:2]]) + self.xellipsis buffer_nnx = self._get_data(real, derivative) buffer_nnx[s] = data_nnx
[docs] def safe_add(self, real: bool, derivative: int, data_nnx: NDArray[DTypeT]): """ Add data_nnx to the corrsponding buffer, if the dimensions of data_nnx are equal to or smaller than the buffer If the dimensions of data_nnx are smaller than or equal to the dimensions of the buffer, add to the first elements of the buffer. Otherwise raise and error. Parameters ---------- real True if real, False if imaginary derivative Derivative order buffer_nnx Data of shape (nnshape, xshape) """ assert len(data_nnx.shape) <= len(self.shape), f'{data_nnx.shape} > {self.shape}' assert all([dima >= dimb for dima, dimb in zip(self.nnshape, data_nnx.shape[:2])]), \ f'{self.nnshape} < {data_nnx.shape[:2]}' data_nnx = self.broadcast_x(data_nnx) # Broadcast the last dimensions assert data_nnx.shape[2:] == self.xshape, f'{data_nnx.shape[2:]} != {self.xshape}' s = tuple([slice(dim) for dim in data_nnx.shape[:2]]) + self.xellipsis buffer_nnx = self._get_data(real, derivative) # Regarding ignore: # https://stackoverflow.com/questions/74633074/how-to-type-hint-a-generic-numpy-array/74634650#74634650 buffer_nnx[s] += data_nnx # type: ignore
@property def derivatives_real(self) -> list[int]: """ Stored derivative order of real density matrices in sorted order """ return list(sorted(self._re_buffers.keys())) @property def derivatives_imag(self) -> list[int]: """ Stored derivative order of real density matrices in sorted order """ return list(sorted(self._im_buffers.keys())) def _iter_buffers(self) -> Generator[NDArray[DTypeT], None, None]: """ Loop over all real imaginary buffers in a sorted order """ for derivative in self.derivatives_real: yield self._re_buffers[derivative] for derivative in self.derivatives_imag: yield self._im_buffers[derivative] def _iter_reim_derivatives(self) -> Generator[tuple[bool, int], None, None]: """ Loop over tuples (real, derivative) in sorted order The parameter real is True for real buffers and the parameter derivative denotes the derivative order of the buffer """ for derivative in self.derivatives_real: yield (True, derivative) for derivative in self.derivatives_imag: yield (False, derivative)
[docs] def ensure_contiguous_buffers(self): """ Make the buffers contiguous if they are not already """ for derivative in self.derivatives_real: self._re_buffers[derivative] = np.ascontiguousarray(self._re_buffers[derivative]) for derivative in self.derivatives_imag: self._im_buffers[derivative] = np.ascontiguousarray(self._im_buffers[derivative])
[docs] def send_arrays(self, comm, rank: int, log: Logger | None = None): """ Send data to another MPI rank Parameters ---------- comm Communicator rank Send to this rank log Optional logger """ if log is not None: log.start('send_to_root') requests = [] for mpitag, buffer_nnx in enumerate(self._iter_buffers(), start=987): buffer_nnx = np.ascontiguousarray(buffer_nnx) requests.append(comm.send(buffer_nnx, 0, tag=mpitag, block=False)) comm.waitall(requests) if log is not None: log.log(f'Sending to root {log.elapsed("send_to_root"):.1f}s', flush=True)
[docs] def recv_arrays(self, comm, rank: int, log: Logger | None = None): """ Receive data to another MPI rank Parameters ---------- comm Communicator rank Send to this rank log Optional logger """ if log is not None: log.start('root_recv') requests = [] for mpitag, buffer_nnx in enumerate(self._iter_buffers(), start=987): requests.append(comm.receive(buffer_nnx, rank, tag=mpitag, block=False)) comm.waitall(requests) if log is not None: log.log(f'Root received {log.elapsed("root_recv"):.1f}s from {rank}', flush=True)
[docs] def redistribute(self, target: DensityMatrixBuffer, comm, source_indices_r: Sequence[tuple[ArrayIndex, ...] | ArrayIndex | None], target_indices_r: Sequence[tuple[ArrayIndex, ...] | ArrayIndex | None], log: Logger | None = None, ): """ Redistribute this DensityMatrixBuffer to another according to user specified options The nn dimensions of the self and target buffers should be the same, but the x dimensions can be different. However, self need to have the same shape on all ranks and target needs to have the same shape on all ranks. Parameters ---------- target Target DensityMatrixBuffer comm MPI communicator source_indices_r List of x-indices. The length of the list must equal to the communicator size. The x-index that is element r of the list corresponds to the data from the source that will be sent to rank r. If the x-index is None, then the rank corresponding to that element will not be receiving data. This argument must be the same on all ranks recv_indices_r List of x-indices. The length of the list must equal to the communicator size. The x-index that is element r of the list corresponds to the data in the target that will be received from rank r. If the x-index is None, then the rank corresponding to that element will not be sending data. This argument must be the same on all ranks log Optional logger """ # Size of each density matrix (the nn-dimensions) nnsize = int(np.prod(self.nnshape)) # Convert maxsize to maximum number of elements maxsize = float(getenv('RHODENT_REDISTRIBUTE_MAXSIZE', 1e7)) maxtotalelems = int(np.ceil(maxsize / self.dtype.itemsize)) assert len(source_indices_r) == comm.size, len(source_indices_r) assert len(target_indices_r) == comm.size, len(target_indices_r) # Extract source and target indices that are not None and make sure they are tuples source_indices_by_rank = {rank: x_indices if isinstance(x_indices, tuple) else (x_indices, ) for rank, x_indices in enumerate(source_indices_r) if x_indices is not None} target_indices_by_rank = {rank: x_indices if isinstance(x_indices, tuple) else (x_indices, ) for rank, x_indices in enumerate(target_indices_r) if x_indices is not None} assert len(source_indices_by_rank) > 0 assert len(target_indices_by_rank) > 0 # Make sure that same derivatives and real/imaginary parts are stored and that dtypes are same assert tuple(self.derivatives_real) == tuple(target.derivatives_real) assert tuple(self.derivatives_imag) == tuple(target.derivatives_imag) assert self.dtype == target.dtype, f'{self.dtype} != {target.dtype}' # Get the xshapes of all sources source_xshape_by_rank: dict[int, tuple[int, ...]] = dict() if comm.rank in target_indices_by_rank.keys(): for buf_nnx in self._iter_buffers(): source_xshape_by_rank = {rank: buf_nnx[self.nnellipsis + x_indices].shape[2:] for rank, x_indices in source_indices_by_rank.items()} break # Get the xshapes of the targets by an alltoall operation with the sources # -2 means nothing, -1 in first field means empty tuple xdims = max(len(self.xshape), len(target.xshape)) pad_target_xshape_r = -2 * np.ones((comm.size, xdims), dtype=int) pad_source_xshape_r = -2 * np.ones((comm.size, xdims), dtype=int) for rank, xshape in source_xshape_by_rank.items(): pad_source_xshape_r[rank, :len(xshape)] = xshape if xshape == (): pad_source_xshape_r[rank, 0] = -1 comm.alltoallv(pad_source_xshape_r, xdims * np.ones(comm.size, dtype=int), xdims * np.arange(comm.size, dtype=int), pad_target_xshape_r, xdims * np.ones(comm.size, dtype=int), xdims * np.arange(comm.size, dtype=int)) target_xshape_by_rank = {rank: tuple(xshape[xshape > -1]) for rank, xshape in enumerate(pad_target_xshape_r) if xshape[0] > -2} # Check that target sizes supplied by the user are shorter than # or equal to the sizes from the alltoall operation if comm.rank in source_indices_by_rank.keys(): for buf_nnx in target._iter_buffers(): for rank, x_indices in target_indices_by_rank.items(): target_xshape = target_xshape_by_rank[rank] xshape = buf_nnx[self.nnellipsis + x_indices].shape[2:] assert all(np.less_equal(target_xshape, xshape)) break # Get the total number of density matrices that this rank sends/receives source_xsize_by_rank = {rank: np.prod(xshape, dtype=int) for rank, xshape in source_xshape_by_rank.items()} target_xsize_by_rank = {rank: np.prod(xshape, dtype=int) for rank, xshape in target_xshape_by_rank.items()} my_sourcexsize = sum(source_xsize_by_rank.values()) my_targetxsize = sum(target_xsize_by_rank.values()) # Get the total number of array elements to be sent across all ranks sizes = np.array([my_sourcexsize, my_targetxsize], dtype=int) comm.sum(sizes, root=-1) total_sourcexsize, total_targetxsize = sizes totalsize = max(total_sourcexsize, total_targetxsize) * nnsize # Split the data across the nn-dimensions since they are always the same; how many times? factortoolarge = totalsize / maxtotalelems nnstride = int(np.ceil(nnsize / factortoolarge)) nnstride = min(nnsize, nnstride) nsplits = int(np.ceil(nnsize / nnstride)) if log is not None and comm.rank == 0: total_MiB = totalsize * self.dtype.itemsize / (1024 ** 2) buftotal_MiB = totalsize / nsplits * self.dtype.itemsize / (1024 ** 2) log.log(f'Redistribute: {len(target_indices_by_rank)} sending ' f'and {len(source_indices_by_rank)} receiving. ' f'Total size on all ranks ({total_MiB:.1f}MiB) ' f'splitting in {nsplits} parts ({buftotal_MiB:.1f}MiB on all ranks)', flush=True) # Perpare buffers for sending and receiving # counts - Number of elements to send to (s) or receive from (r) each rank # displs - Position of data to send to (s) or receive from (r) each rank sendbuf = np.zeros(my_sourcexsize * nnstride, dtype=self.dtype) recvbuf = np.zeros(my_targetxsize * nnstride, dtype=self.dtype) scounts_r = np.zeros(comm.size, dtype=int) sdispls_r = np.zeros(comm.size, dtype=int) rcounts_r = np.zeros(comm.size, dtype=int) rdispls_r = np.zeros(comm.size, dtype=int) displ = 0 if comm.rank in target_indices_by_rank.keys(): # This rank has some data to send. It will send to the ranks that are among the source keys sendbuf_by_rank = dict() for buf_nnx in self._iter_buffers(): for destrank, xsize in source_xsize_by_rank.items(): size = xsize * nnstride sendbuf_by_rank[destrank] = sendbuf[displ:displ+size] scounts_r[destrank] = size if size > 0: sdispls_r[destrank] = displ displ += size break displ = 0 if comm.rank in source_indices_by_rank.keys(): # This rank has some data to receive. It will receive from the ranks that are among the target keys recvbuf_by_rank = dict() for buf_nnx in self._iter_buffers(): for destrank, xsize in target_xsize_by_rank.items(): size = xsize * nnstride recvbuf_by_rank[destrank] = recvbuf[displ:displ+size] rcounts_r[destrank] = size if size > 0: rdispls_r[destrank] = displ displ += size break # Flattened nn-dimensions for splitting flatslices = [slice(start, start + nnstride, 1) for start in range(0, nnsize, nnstride)] grid = np.mgrid[:self.nnshape[0], :self.nnshape[1]] # Loop over real and imaginary parts and derivatives for (real, derivative), sendbuf_nnx, recvbuf_nnx in zip( self._iter_reim_derivatives(), self._iter_buffers(), target._iter_buffers()): # List of data to send and list of buffers where data should be received if comm.rank in target_indices_by_rank.keys(): senddata_by_rank = {rank: sendbuf_nnx[self.nnellipsis + x_indices] for rank, x_indices in source_indices_by_rank.items()} if comm.rank in source_indices_by_rank.keys(): recvdata_by_rank = {rank: recvbuf_nnx[self.nnellipsis + x_indices] for rank, x_indices in target_indices_by_rank.items()} # The target data may be smaller than what is given by the user recvdata_by_rank = {rank: recvdata_by_rank[rank][ self.nnellipsis + tuple([slice(dim) for dim in xshape])] for rank, xshape in target_xshape_by_rank.items()} for data, xshape in zip(recvdata_by_rank.values(), target_xshape_by_rank.values()): assert data.shape[2:] == xshape, str(data.shape[2:]) + ' != ' + str(xshape) # Loop over the data in splits for flatslice in flatslices: # Slices of nn nnslices = (grid[0].ravel()[flatslice], grid[1].ravel()[flatslice]) # Copy data to the contiguous send buffer if comm.rank in target_indices_by_rank.keys(): for destrank, buf in sendbuf_by_rank.items(): data = senddata_by_rank[destrank][nnslices].ravel() buf[:len(data)] = data # Send the data comm.alltoallv(sendbuf, scounts_r, sdispls_r, recvbuf, rcounts_r, rdispls_r) # Copy data from the contiguous receive buffer if comm.rank in source_indices_by_rank.keys(): for destrank, buf in recvbuf_by_rank.items(): # Copy the first elements of the receive buffer to the data position datashape = recvdata_by_rank[destrank][nnslices].shape datalen = np.prod(datashape, dtype=int) buf = buf[:datalen] recvdata_by_rank[destrank][nnslices] = buf.reshape(datashape)