Coverage for rhodent/calculators/base.py: 89%

263 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3from abc import ABC, abstractmethod 

4from typing import Generator, Collection 

5 

6import numpy as np 

7from numpy.typing import NDArray 

8 

9from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition 

10from gpaw.mpi import world 

11 

12from ..typing import Communicator, Array2D, Array3D 

13from ..response import BaseResponse 

14from ..perturbation import create_perturbation, Perturbation, PerturbationLike 

15from ..density_matrices.base import BaseDensityMatrices, WorkMetadata 

16from ..density_matrices.time import ConvolutionDensityMatrices 

17from ..density_matrices.frequency import FrequencyDensityMatrices 

18from ..voronoi import VoronoiWeights, EmptyVoronoiWeights 

19from ..typing import Array1D 

20from ..utils import Logger, ResultKeys, Result, broaden_n2e, broaden_xn2e, broaden_ia2ou 

21from ..utils.memory import HasMemoryEstimate, MemoryEstimate 

22 

23 

24class BaseObservableCalculator(HasMemoryEstimate, ABC): 

25 

26 """ Object of this class compute observables. 

27 

28 Parameters 

29 ---------- 

30 response 

31 Response object. 

32 voronoi 

33 Voronoi weights object. 

34 energies_occ 

35 Energy grid in units of eV for occupied levels (holes). 

36 energies_unocc 

37 Energy grid in units of eV for unoccupied levels (electrons). 

38 sigma 

39 Gaussian broadening width for energy grid in units of eV. 

40 times 

41 Compute observables in the time domain, for these times (or as close to them as possible). 

42 In units of as. 

43 

44 May not be used together with :attr:`frequencies` or :attr:`frequency_broadening`. 

45 pulses 

46 Compute observables in the time domain, in response to these pulses. 

47 If none, then no pulse convolution is performed. 

48 

49 May not be used together with :attr:`frequencies` or :attr:`frequency_broadening`. 

50 frequencies 

51 Compute observables in the frequency domain, for these frequencies. In units of eV. 

52 

53 May not be used together with :attr:`times` or :attr:`pulses`. 

54 frequency_broadening 

55 Compute observables in the frequency domain, with Gaussian broadening of this width. 

56 In units of eV. 

57 

58 May not be used together with :attr:`times` or :attr:`pulses`. 

59 """ 

60 

61 def __init__(self, 

62 response: BaseResponse, 

63 voronoi: VoronoiWeights | None = None, 

64 *, 

65 energies_occ: list[float] | Array1D[np.float64] | None = None, 

66 energies_unocc: list[float] | Array1D[np.float64] | None = None, 

67 sigma: float | None = None, 

68 times: list[float] | Array1D[np.float64] | None = None, 

69 pulses: Collection[PerturbationLike] | None = None, 

70 frequencies: list[float] | Array1D[np.float64] | None = None, 

71 frequency_broadening: float = 0, 

72 ): 

73 self._log = Logger() 

74 self.log.startup_message() 

75 world.barrier() 

76 self._response = response 

77 

78 self._is_time_density_matrices: bool 

79 if times is not None: 

80 if frequencies is not None or frequency_broadening != 0: 

81 raise ValueError('The parameters frequencies and frequency_broadening may not ' 

82 'be used together with times and pulses.') 

83 # Time calculator 

84 if pulses is None: 

85 pulses = [None] # No perturbation 

86 pulses = [create_perturbation(pulse) for pulse in pulses] 

87 self._is_time_density_matrices = True 

88 else: 

89 if frequencies is None: 

90 raise ValueError('One of the parameters times or frequencies must be given.') 

91 if pulses is not None: 

92 raise ValueError('The parameters frequencies and frequency_broadening may not ' 

93 'be used together with times and pulses.') 

94 # Frequency calculator 

95 self._is_time_density_matrices = False 

96 

97 derivatives, real, imag = self._need_derivatives_real_imag 

98 

99 density_matrices: BaseDensityMatrices 

100 if self._is_time_density_matrices: 

101 assert times is not None 

102 assert pulses is not None 

103 density_matrices = response._get_time_density_matrices( 

104 times, pulses, derivatives, real, imag, self.log) 

105 else: 

106 assert pulses is None 

107 assert frequencies is not None 

108 density_matrices = response._get_frequency_density_matrices( 

109 frequencies, frequency_broadening, real, imag, self.log) 

110 

111 self._density_matrices = density_matrices 

112 self._voronoi: VoronoiWeights 

113 if voronoi is None: 

114 self._voronoi = EmptyVoronoiWeights() 

115 else: 

116 self._voronoi = voronoi 

117 if energies_occ is None: 

118 energies_occ = [] 

119 if energies_unocc is None: 

120 energies_unocc = [] 

121 self._energies_occ = np.asarray(energies_occ) 

122 self._energies_unocc = np.asarray(energies_unocc) 

123 self._sigma = sigma 

124 self._weight_In: Array2D[np.float64] | None = None 

125 self._weight_Iii: Array3D[np.float64] | None = None 

126 self._weight_Iaa: Array3D[np.float64] | None = None 

127 world.barrier() 

128 self.log(f'Set up calculator:\n' 

129 f'{self}\n\n' 

130 '==================================\n' 

131 ' Procedure for obtaining response \n' 

132 '==================================\n' 

133 f'{self.density_matrices}\n\n' 

134 '=================\n' 

135 ' Memory estimate \n' 

136 '=================\n' 

137 f'{self.get_memory_estimate()}\n', 

138 rank=0) 

139 

140 def __str__(self) -> str: 

141 lines = [f' {self.__class__.__name__} '] 

142 

143 lines.append('response:') 

144 lines += [' ' + line for line in str(self.response).split('\n')] 

145 lines.append('') 

146 

147 lines.append('voronoi:') 

148 if self.nproj == 0: 

149 lines.append(' No Voronoi decomposition') 

150 else: 

151 lines += [' ' + line for line in str(self.voronoi).split('\n')] 

152 lines.append('') 

153 

154 if len(self.energies_occ) == 0 or len(self.energies_unocc) == 0 or self.sigma is None: 

155 lines.append('No energies for broadening') 

156 else: 

157 lines += [f'Energies for broadening (sigma = {self.sigma:.1f} eV)', 

158 f' Occupied: {len(self.energies_occ)} from ' 

159 f'{self.energies_occ[0]:.1f} to {self.energies_occ[-1]:.1f} eV', 

160 f' Unoccupied: {len(self.energies_unocc)} from ' 

161 f'{self.energies_unocc[0]:.1f} to {self.energies_unocc[-1]:.1f} eV', 

162 ] 

163 

164 # Make a cute frame 

165 maxlen = max(len(line) for line in lines) 

166 lines[0] = '{s:=^{n}}'.format(s=f' {lines[0]} ', n=maxlen) 

167 lines.append(maxlen * '-') 

168 

169 return '\n'.join(lines) 

170 

171 def get_memory_estimate(self) -> MemoryEstimate: 

172 memory_estimate = MemoryEstimate() 

173 memory_estimate.add_child('Reading response', self.density_matrices.get_memory_estimate()) 

174 for key, shape in self._voronoi_shapes.items(): 

175 memory_estimate.add_key(f'Voronoi weights {key}', shape, float, 

176 on_num_ranks=self.loop_comm.size) 

177 

178 return memory_estimate 

179 

180 @property 

181 def response(self) -> BaseResponse: 

182 """ Response object """ 

183 return self._response 

184 

185 @property 

186 def density_matrices(self) -> BaseDensityMatrices: 

187 """ Object that gives the density matrix in the time or freqency domain. """ 

188 return self._density_matrices 

189 

190 @property 

191 def nproj(self) -> int: 

192 """ Number of projections in the Voronoi weights object """ 

193 return self.voronoi.nproj 

194 

195 @property 

196 def voronoi(self) -> VoronoiWeights: 

197 """ Voronoi weights object """ 

198 return self._voronoi 

199 

200 @property 

201 def energies_occ(self) -> NDArray[np.float64]: 

202 """ Energy grid (in units of eV) for occupied levels (hot holes). """ 

203 return self._energies_occ 

204 

205 @property 

206 def energies_unocc(self) -> NDArray[np.float64]: 

207 """ Energy grid (in units of eV) for unoccupied levels (hot electrons). """ 

208 return self._energies_unocc 

209 

210 @property 

211 def sigma(self) -> float | None: 

212 """ Gaussian broadening width in units of eV. """ 

213 return self._sigma 

214 

215 @property 

216 def ksd(self) -> KohnShamDecomposition: 

217 """ Kohn-Sham decomposition object """ 

218 return self.density_matrices.ksd 

219 

220 @property 

221 def log(self) -> Logger: 

222 """ Logger """ 

223 return self._log 

224 

225 def log_parallel(self, *args, **kwargs) -> Logger: 

226 """ Log message with communicator information. """ 

227 return self._log(*args, **kwargs, comm=self.loop_comm, who='Calculator') 

228 

229 @property 

230 def frequencies(self) -> Array1D[np.float64]: 

231 """ Frequencies (in units of eV) at which the density matrices are evaluated. 

232 

233 Only valid when the density matrices object is defined in the frequency domain. """ 

234 assert isinstance(self.density_matrices, FrequencyDensityMatrices) 

235 return self.density_matrices.frequencies 

236 

237 @property 

238 def frequency_broadening(self) -> float: 

239 """ Value of frequency broadening in units of eV. 

240 

241 Only valid when the density matrices object is defined in the frequency domain. """ 

242 assert isinstance(self.density_matrices, FrequencyDensityMatrices) 

243 return self.density_matrices.frequency_broadening 

244 

245 @property 

246 def times(self) -> Array1D[np.float64]: 

247 """ Times (in units of as) at which the density matrices are evaluated. 

248 

249 Only valid when the density matrices object is defined in the time domain. """ 

250 assert isinstance(self.density_matrices, ConvolutionDensityMatrices) 

251 return self.density_matrices.times 

252 

253 @property 

254 def pulses(self) -> list[Perturbation]: 

255 """ List of pulses which the density matrices are responses to. 

256 

257 Only valid when the density matrices object is a convolution with pulses. """ 

258 assert isinstance(self.density_matrices, ConvolutionDensityMatrices) 

259 return self.density_matrices.pulses 

260 

261 @property 

262 def calc_comm(self) -> Communicator: 

263 """ Calculation communicator. 

264 

265 Each rank of this communicator calculates the observables corresponding to 

266 a part (in electron-hole space) of the density matrices. """ 

267 return self.density_matrices.calc_comm 

268 

269 @property 

270 def loop_comm(self) -> Communicator: 

271 """ Loop communicator. 

272 

273 Each rank of this communicator calculates the density matrices corresponding to 

274 different times, frequencies or after convolution with a different pulse. """ 

275 return self.density_matrices.loop_comm 

276 

277 @property 

278 def eig_n(self) -> Array1D[np.float64]: 

279 """ Eigenvalues (in units of eV) relative to Fermi level in the full ground state KS basis. """ 

280 eig_n, _ = self.ksd.get_eig_n(zero_fermilevel=True) 

281 return eig_n # type: ignore 

282 

283 @property 

284 def eig_i(self) -> Array1D[np.float64]: 

285 """ Eigenvalues (in units of eV) relative to Fermi level of occupied states (holes). """ 

286 return self.eig_n[self.flti] # type: ignore 

287 

288 @property 

289 def eig_a(self) -> Array1D[np.float64]: 

290 """ Eigenvalues (in units of eV) relative to Fermi level of unoccupied states (electrons). """ 

291 return self.eig_n[self.flta] # type: ignore 

292 

293 @property 

294 def flti(self) -> slice: 

295 """ Slice for extracting indices corresponding to occupied states. """ 

296 imin, imax, _, _ = self.ksd.ialims() 

297 return slice(imin, imax+1) 

298 

299 @property 

300 def flta(self) -> slice: 

301 """ Slice for extracting indices corresponding to unoccupied states. """ 

302 _, _, amin, amax = self.ksd.ialims() 

303 return slice(amin, amax+1) 

304 

305 @property 

306 def _need_derivatives_real_imag(self) -> tuple[list[int], bool, bool]: 

307 """ Derivatives needed by this calculator, and 

308 whether real and imaginary parts are needed 

309 """ 

310 raise NotImplementedError 

311 

312 @property 

313 def _voronoi_shapes(self) -> dict[str, tuple[int, ...]]: 

314 """ List of shapes for the Voronoi weights stored on this calculator. """ 

315 return {} 

316 

317 def _read_weights_diagonal(self) -> None: 

318 """ Read the diagonal weights from the voronoi object and store them in memory. """ 

319 nI = self.voronoi.nproj 

320 if nI == 0: 

321 return 

322 Nn = self.voronoi.nn 

323 

324 if self.calc_comm.rank == 0: 

325 weight_In = np.empty((nI, Nn), dtype=float) 

326 else: 

327 weight_In = None 

328 for iI, weight_nn in enumerate(self.voronoi): 

329 if self.voronoi.comm.rank == 0: 

330 assert weight_In is not None 

331 assert weight_nn is not None 

332 weight_In[iI, ...] = weight_nn.diagonal() 

333 else: 

334 assert weight_nn is None 

335 

336 if self.calc_comm.rank == 0: 

337 # Broadcast to all calc_comm rank 0's 

338 self.loop_comm.broadcast(weight_In, 0) 

339 

340 self._weight_In = weight_In 

341 

342 def _read_weights_eh(self) -> None: 

343 """ Read the electron-hole weights from the voronoi object and store them in memory. """ 

344 nI = self.voronoi.nproj 

345 if nI == 0: 

346 return 

347 

348 if self.calc_comm.rank == 0: 

349 weight_Iii = np.empty((nI, len(self.eig_i), len(self.eig_i)), dtype=float) 

350 weight_Iaa = np.empty((nI, len(self.eig_a), len(self.eig_a)), dtype=float) 

351 else: 

352 weight_Iii = None 

353 weight_Iaa = None 

354 for iI, weight_nn in enumerate(self.voronoi): 

355 if self.voronoi.comm.rank == 0: 

356 assert weight_nn is not None 

357 assert weight_Iii is not None 

358 assert weight_Iaa is not None 

359 weight_Iii[iI, ...] = weight_nn[self.flti, self.flti] 

360 weight_Iaa[iI, ...] = weight_nn[self.flta, self.flta] 

361 else: 

362 assert weight_nn is None 

363 

364 if self.calc_comm.rank == 0: 

365 # Broadcast to all calc_comm rank 0's 

366 self.loop_comm.broadcast(weight_Iii, 0) 

367 self.loop_comm.broadcast(weight_Iaa, 0) 

368 

369 self._weight_Iii = weight_Iii 

370 self._weight_Iaa = weight_Iaa 

371 

372 @property 

373 def _iterate_weights_diagonal(self) -> Generator[NDArray[np.float64], None, None]: 

374 """ Iterate over the diagonal weights. 

375 

376 Yields 

377 ------ 

378 The diagonal of the Voronoi weights, one projection at a time """ 

379 assert self.calc_comm.rank == 0 

380 if self.nproj == 0: 

381 return 

382 

383 if self._weight_In is None: 

384 self._read_weights_diagonal() 

385 assert self._weight_In is not None 

386 

387 for weight_n in self._weight_In: 

388 yield weight_n 

389 

390 @property 

391 def _iterate_weights_eh(self) -> Generator[tuple[NDArray[np.float64], NDArray[np.float64]], None, None]: 

392 """ Iterate over the electron-hole part of the weights. 

393 

394 Yields 

395 ------ 

396 The electron-hole part of the Voronoi weights, one projection at a time """ 

397 assert self.calc_comm.rank == 0 

398 if self.nproj == 0: 

399 return 

400 

401 if self._weight_Iaa is None or self._weight_Iii is None: 

402 self._read_weights_eh() 

403 assert self._weight_Iii is not None 

404 assert self._weight_Iaa is not None 

405 

406 for weight_ii, weight_aa in zip(self._weight_Iii, self._weight_Iaa): 

407 yield weight_ii, weight_aa 

408 

409 @abstractmethod 

410 def get_result_keys(self) -> ResultKeys: 

411 """ Get the keys that each result will contain, and dimensions thereof. 

412 

413 Returns 

414 ------- 

415 Object representing the data that will be present in the result objects. """ 

416 raise NotImplementedError 

417 

418 @abstractmethod 

419 def icalculate(self) -> Generator[tuple[WorkMetadata, Result], None, None]: 

420 """ Iteratively calculate results. 

421 

422 Yields 

423 ------ 

424 Tuple (work, result) on the root rank of the calculation communicator: 

425 

426 work 

427 An object representing the metadata (time, frequency or pulse) for the work done 

428 result 

429 Object containg the calculation results for this time, frequency or pulse 

430 

431 Yields nothing on non-root ranks of the calculation communicator. 

432 """ 

433 raise NotImplementedError 

434 

435 def icalculate_gather_on_root(self, **kwargs) -> Generator[ 

436 tuple[WorkMetadata, Result], None, None]: 

437 """ Iteratively calculate results and gather to the root rank. 

438 

439 Yields 

440 ------ 

441 Tuple (work, result) on the root rank of the both calculation and loop communicators: 

442 

443 work 

444 An object representing the metadata (time, frequency or pulse) for the work done 

445 result 

446 Object containg the calculation results for this time, frequency or pulse 

447 

448 Yields nothing on non-root ranks of the communicators. 

449 """ 

450 resultkeys = self.get_result_keys(**kwargs) 

451 gen = iter(self.icalculate(**kwargs)) 

452 

453 # Loop over the work to be done, and the ranks that are supposed to do it 

454 for rank, work in self.density_matrices.global_work_loop_with_idle(): 

455 if work is None: 

456 # Rank rank will not do any work at this point 

457 continue 

458 

459 if self.calc_comm.rank > 0: 

460 continue 

461 

462 if self.loop_comm.rank == 0 and rank == 0: 

463 # This is the root rank, and the root rank should yield its own work 

464 mywork, result = next(gen) 

465 assert work.global_indices == mywork.global_indices, f'{work.desc} != {mywork.desc}' 

466 yield mywork, result 

467 # self.log.start('communicate') 

468 elif self.loop_comm.rank == 0: 

469 # This is the root rank, and the root rank should receive the work 

470 # done by rank rank, and yield that 

471 result.inplace_receive(resultkeys, rank, comm=self.loop_comm) 

472 yield work, result 

473 # self.log(f'Communicated for {self.log.elapsed("communicate"):.2f}s', flush=True) 

474 elif self.loop_comm.rank == rank: 

475 # This is not the root rank, but this rank should send its data to root 

476 _, result = next(gen) 

477 result.send(resultkeys, 0, comm=self.loop_comm) 

478 

479 _exhausted = object() 

480 rem = next(gen, _exhausted) 

481 assert rem is _exhausted, rem 

482 

483 def write_response_to_disk(self, 

484 fmt: str): 

485 """ Calculate the response needed for this calculator and write the response to disk. 

486 

487 The necessary derivatives and real and imaginary parts of the density matrices 

488 will be written. Can be used both in the time and frequency domains. 

489 

490 Parameters 

491 ---------- 

492 fmt 

493 Formatting string for the density matrices saved to disk. 

494 

495 The formatting string should be a plain string containing variable 

496 placeholders within curly brackets `{}`. It should not be confused with 

497 a formatted string literal (f-string). 

498 

499 Examples: 

500 

501 * frho_fmt = `'frho/w{freq:05.2f}-{reim}.npy'`. 

502 * pulserho_fmt = `pulserho/t{time:09.1f}{tag}.npy`. 

503 

504 Accepts variables in the time domain: 

505 

506 * `{time}` - Time in units of as. 

507 * `{tag}` - Derivative tag, `''`, `'-Iomega'`, or `'-omega2'`. 

508 * `{pulsefreq}` - Pulse frequency in units of eV. 

509 * `{pulsefwhm}` - Pulse FWHM in units of fs. 

510 

511 Accepts variables in the frequency domain: 

512 

513 * `{freq}` - Frequency in units of eV. 

514 * `{reim}` - `'Re'` or `'Im'` for Fourier transform of real/imaginary 

515 part of density matrix. 

516 """ 

517 self._density_matrices.write_to_disk(fmt) 

518 

519 def broaden_occ(self, 

520 M_i: NDArray[np.float64]) -> NDArray[np.float64]: 

521 """ Broaden array corresponding to occupied levels with Gaussians of width sigma 

522 

523 Parameters 

524 ---------- 

525 M_i 

526 Array to broaden 

527 

528 Returns 

529 ------- 

530 Broadened array 

531 """ 

532 assert self.sigma is not None 

533 

534 return broaden_n2e(M_i, self.eig_i, self.energies_occ, self.sigma) 

535 

536 def broaden_unocc(self, 

537 M_a: NDArray[np.float64]) -> NDArray[np.float64]: 

538 """ Broaden array corresponding to unoccupied levels with Gaussians of width sigma 

539 

540 Parameters 

541 ---------- 

542 M_a 

543 Array to broaden 

544 

545 Returns 

546 ------- 

547 Broadened array 

548 """ 

549 assert self.sigma is not None 

550 

551 return broaden_n2e(M_a, self.eig_a, self.energies_unocc, self.sigma) 

552 

553 def broaden_xi2o(self, 

554 M_xi: NDArray[np.float64]) -> NDArray[np.float64]: 

555 """ Broaden array corresponding to occupied levels with Gaussians of width sigma 

556 

557 Parameters 

558 ---------- 

559 M_xa 

560 Array to broaden. The last dimension should correspond to the occupied levels 

561 

562 Returns 

563 ------- 

564 Broadened array 

565 """ 

566 assert self.sigma is not None 

567 

568 return broaden_xn2e(M_xi, self.eig_i, self.energies_occ, self.sigma) 

569 

570 def broaden_xi2u(self, 

571 M_xa: NDArray[np.float64]) -> NDArray[np.float64]: 

572 """ Broaden array corresponding to unoccupied levels with Gaussians of width sigma 

573 

574 Parameters 

575 ---------- 

576 M_xa 

577 Array to broaden. The last dimension should correspond to the unoccupied levels 

578 

579 Returns 

580 ------- 

581 Broadened array 

582 """ 

583 assert self.sigma is not None 

584 

585 return broaden_xn2e(M_xa, self.eig_a, self.energies_unocc, self.sigma) 

586 

587 def broaden_ia2ou(self, 

588 M_ia: NDArray[np.float64]) -> NDArray[np.float64]: 

589 """ Broaden matrix in electron-hole basis with Gaussians of width sigma. 

590 

591 Parameters 

592 ---------- 

593 M_ia 

594 Matrix to broaden. The first dimension should correspond to occupied levels 

595 and the second to unoccupied levels. 

596 

597 Returns 

598 ------- 

599 Broadened array 

600 """ 

601 assert self.sigma is not None 

602 

603 return broaden_ia2ou(M_ia, self.eig_i, self.eig_a, 

604 self.energies_occ, self.energies_unocc, self.sigma)