Coverage for rhodent/density_matrices/distributed/time.py: 85%
145 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3from typing import Generator
4import numpy as np
6from .base import RhoParameters
7from rhodent.density_matrices.buffer import DensityMatrixBuffer
8from rhodent.density_matrices.readers.gpaw import KohnShamRhoWfsReader
9from rhodent.utils import safe_fill
10from .base import BaseDistributor, RhoIndices
11from ...typing import Communicator
12from ...utils.memory import MemoryEstimate
15class TimeDistributor(BaseDistributor):
17 """ Iteratively read density matrices in the time domain
19 This class uses the KohnShamRhoWfsReader to iteratively read wave functions
20 (each rank reading the same times) and construct density matrices in the ground state
21 Kohn-Sham basis for each time. The different ranks are reading different chunks of
22 the density matrices. The density matrices are accumulated in a buffer and yielded
23 when all times have been read.
25 Parameters
26 ----------
27 rho_reader
28 Density matrices reader
29 comm
30 Communicator
31 log
32 Logger
33 """
35 def __init__(self,
36 rho_reader: KohnShamRhoWfsReader,
37 parameters: RhoParameters | None = None,
38 comm: Communicator | None = None):
39 if rho_reader.comm.size > 1:
40 raise ValueError('Serial TimeDistributor must have serial reader.')
42 super().__init__(rho_reader, parameters, comm=comm)
44 @property
45 def dtype(self):
46 return np.float64
48 @property
49 def xshape(self):
50 return (self.nt, )
52 @property
53 def dt(self):
54 return self.rho_wfs_reader.dt
56 @property
57 def nt(self):
58 return self.rho_wfs_reader.nt
60 @property
61 def time_t(self):
62 return self.rho_wfs_reader.time_t
64 def __str__(self) -> str:
65 lines = []
66 lines.append('Density matrices reader')
67 lines.append(' Receiving density matrices in continuous chunks.')
68 lines.append(f' shape {self._parameters.nnshape}')
69 lines.append(f' received by {self.maxntimes} ranks')
70 lines.append(f' {self.niters} iterations to process all chunks')
72 return '\n'.join(lines)
74 def get_memory_estimate(self) -> MemoryEstimate:
75 narrays = (2 if self.yield_re and self.yield_im else 1) * len(self.derivative_order_s)
76 shape = self._parameters.nnshape + (self.nt, narrays)
78 comment = f'Buffers hold {narrays} arrays ({self.describe_reim()})'
79 memory_estimate = MemoryEstimate(comment=comment)
80 memory_estimate.add_key('Density matrix buffers', shape, float,
81 on_num_ranks=self.comm.size)
83 return memory_estimate
85 def __iter__(self) -> Generator[DensityMatrixBuffer, None, None]:
86 """ Yield density matrices for all times, chunk by chunk
88 The wave function file is read in chunks, time by time, with
89 the reading of times in the inner loop.
91 Yields
92 ------
93 Chunks of the density matrix
94 """
95 read_dm = DensityMatrixBuffer(self._parameters.nnshape,
96 (self.nt, ),
97 np.float64)
98 if self.yield_re:
99 read_dm.zeros(True, 0)
100 if self.yield_im:
101 read_dm.zeros(False, 0)
103 self.rho_wfs_reader.parallel_prepare()
105 # Loop over the chunks this rank should gather
106 for indices in self.work_loop(self.comm.rank):
107 if indices is None:
108 continue
109 # Convert to reading indices
110 n1 = slice(self._parameters.n1min + indices.n1.start, self._parameters.n1min + indices.n1.stop)
111 n2 = slice(self._parameters.n2min + indices.n2.start, self._parameters.n2min + indices.n2.stop)
112 gen = self.rho_wfs_reader.iread(indices.s, indices.k, n1, n2)
113 for t in self.rho_wfs_reader.work_loop(self.rho_wfs_reader.comm.rank):
114 if t is None:
115 continue
116 dm_buffer = next(gen)
117 for source_nn, dest_nn in zip(dm_buffer._iter_buffers(), read_dm[t]._iter_buffers()):
118 dest_nn[:] = source_nn
120 yield read_dm
122 @classmethod
123 def from_reader(cls,
124 rho_nn_reader: KohnShamRhoWfsReader,
125 parameters: RhoParameters,
126 **kwargs) -> TimeDistributor:
127 return cls(rho_nn_reader, parameters, **kwargs)
130class AlltoallvTimeDistributor(TimeDistributor):
132 """ Iteratively read density matrices in the time domain
134 This class uses the KohnShamRhoWfsReader to iteratively read wave functions
135 (one time per rank) and construct density matrices in the ground state Kohn-Sham
136 basis for each time. When all ranks have read one time each, this class
137 performs a redistribution of data, such that each rank only gets one chunk of the
138 density matrices, but for all times. The density matrices are accumulated in a
139 buffer and yielded when all times have been read.
141 Parameters
142 ----------
143 rho_reader
144 Density matrices reader
145 """
147 def __init__(self,
148 rho_reader: KohnShamRhoWfsReader,
149 parameters: RhoParameters | None = None):
150 if rho_reader.lcao_rho_reader.striden != 0:
151 raise ValueError('n stride must be 0 (index all) for alltoallv parallelized method')
153 BaseDistributor.__init__(self, rho_reader, parameters, comm=rho_reader.comm)
155 def __str__(self) -> str:
156 nnshape = self.rho_wfs_reader.nnshape(*self.first_indices())
158 lines = []
159 lines.append('Parallel density matrices reader')
160 lines.append(' Receiving density matrices in continuous chunks.')
161 lines.append(f' densiy matrix continous chunk {nnshape}')
162 lines.append(f' split into smaller chunks {self._parameters.nnshape}')
163 lines.append(f' received by {self.maxntimes} ranks')
164 lines.append(' Redistributing into continuous time form.')
165 lines.append(f' sent to {self.maxnchunks} ranks')
166 lines.append(f' {self.niters} iterations to process all chunks')
168 return '\n'.join(lines)
170 def get_memory_estimate(self) -> MemoryEstimate:
171 narrays = (2 if self.yield_re and self.yield_im else 1) * len(self.derivative_order_s)
173 nnshape = self.rho_wfs_reader.nnshape(*self.first_indices())
175 before_shape = self._parameters.nnshape + (self.maxnchunks, narrays)
176 after_shape = self._parameters.nnshape + (self.nt, narrays)
177 total_after_size = np.prod(self._parameters.nnshape + (self.nt, )) * self.maxnchunks * narrays
179 shape = nnshape + (narrays, )
181 comment = f'Buffers hold {narrays} arrays ({self.describe_reim()})'
182 memory_estimate = MemoryEstimate(comment=comment)
183 memory_estimate.add_key('Density matrix chunks', shape, float,
184 on_num_ranks=self.maxntimes)
185 memory_estimate.add_key('Before parallel redistribution', before_shape, float,
186 on_num_ranks=self.maxntimes)
187 memory_estimate.add_key('After parallel redistribution', after_shape, float,
188 total_size=total_after_size,
189 on_num_ranks=self.maxnchunks)
191 return memory_estimate
193 def first_indices(self):
194 for first_indices_r in self.work_loop_by_ranks():
195 break
197 indices_by_rank = [chunk for chunk in first_indices_r if chunk is not None]
198 indices_concat, _ = RhoIndices.concatenate_indices(indices_by_rank)
199 return indices_concat
201 def __iter__(self) -> Generator[DensityMatrixBuffer, None, None]:
202 """ Yield density matrices for all times, chunk by chunk
204 The wave function file is read in chunks, time by time. However,
205 chunks are grouped together so that the density matrix at each
206 time is read in large chunks. Each rank reads the same chunk for
207 a different time. Then, the chunks and times are redistributed,
208 so that each rank now holds a small chunk, but for many times.
209 The same chunk is read for all times before it is yielded.
211 Yields
212 ------
213 Chunks of the density matrix
214 """
215 log = self.log
217 self.rho_wfs_reader.parallel_prepare()
219 # Here x is a compound index for a slice of both n and M
220 for chunki, chunks_r in enumerate(self.work_loop_by_ranks()):
221 log.start('read_alltoallv')
223 # The work this rank is supposed to read
224 indices = chunks_r[self.comm.rank]
225 indices_by_rank = [chunk for chunk in chunks_r if chunk is not None]
227 # Number of chunks of nn-indices being read
228 nchunks = len(indices_by_rank)
229 assert nchunks > 0
231 # Find out how much of the total density matrix need to be read to get only
232 # the required chunks
233 indices_concat, reduced_indices_by_rank = RhoIndices.concatenate_indices(indices_by_rank)
235 if indices is None:
236 # This rank does not want any slices of n1 and n2.
237 # It will still potentially participate in the parallel reading of times
238 assert self.comm.rank >= nchunks
240 n1 = slice(self._parameters.n1min + indices_concat.n1.start,
241 self._parameters.n1min + indices_concat.n1.stop)
242 n2 = slice(self._parameters.n2min + indices_concat.n2.start,
243 self._parameters.n2min + indices_concat.n2.stop)
245 if self.comm.rank < self.maxntimes:
246 # This rank will read
247 contiguous_chunks_buffer = DensityMatrixBuffer(self._parameters.nnshape,
248 (nchunks, ),
249 np.float64)
250 else:
251 # This rank does not read any times
252 contiguous_chunks_buffer = DensityMatrixBuffer(self._parameters.nnshape,
253 (0, ),
254 np.float64)
255 if self.comm.rank < nchunks:
256 # This rank will get a chunk of the density matrices after redistribution
257 contiguous_time_buffer = DensityMatrixBuffer(self._parameters.nnshape,
258 (self.nt, ),
259 np.float64)
260 else:
261 contiguous_time_buffer = DensityMatrixBuffer(self._parameters.nnshape,
262 (0, ),
263 np.float64)
264 if self.yield_re:
265 contiguous_chunks_buffer.zeros(True, 0)
266 contiguous_time_buffer.zeros(True, 0)
267 if self.yield_im:
268 contiguous_chunks_buffer.zeros(False, 0)
269 contiguous_time_buffer.zeros(False, 0)
271 gen = self.rho_wfs_reader.iread(indices_concat.s, indices_concat.k, n1, n2)
273 for t_r in self.rho_wfs_reader.work_loop_by_ranks():
274 # Number of times being read
275 ntimes = sum(1 for t in t_r if t is not None)
276 # Time index this rank is reading, or None if not reading
277 globalt = t_r[self.comm.rank]
279 # Read the density matrices for one time per rank,
280 # with each rank reading a large chunk of the density matrix
281 if globalt is not None:
282 read_dm = next(gen)
284 for recvrank, readindices in enumerate(reduced_indices_by_rank):
285 for source_nn, dest_nn in zip(read_dm._iter_buffers(),
286 contiguous_chunks_buffer[recvrank]._iter_buffers()):
287 safe_fill(dest_nn, source_nn[readindices.n1, readindices.n2])
288 else:
289 # This rank actually has no time to read (number of times
290 # is not evenly divisible by number of ranks, and this rank
291 # is trying to read past the end)
292 # This rank will still participate in the alltoallv operation
293 assert self.comm.rank >= ntimes
295 # Perform the redistributions, so that each rank now holds
296 # a smaller chunk of the density matrix, but for many times.
297 contiguous_chunks_buffer.redistribute(
298 contiguous_time_buffer, comm=self.comm,
299 source_indices_r=[(r, ) if r < nchunks else None for r in range(self.comm.size)],
300 target_indices_r=[None if t is None else (t, ) for t in t_r],
301 log=log if 0 in t_r else None)
303 if self.comm.rank == 0:
304 log(f'Chunk {chunki+1}/{self.niters}: Read and distributed density matrices in '
305 f'{log.elapsed("read_alltoallv"):.1f}s', who='Response', flush=True)
307 if indices is not None:
308 yield contiguous_time_buffer