Coverage for rhodent/voronoi.py: 75%

421 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3from abc import ABC, abstractmethod 

4from typing import Any, Iterator, Sequence, Union 

5 

6import numpy as np 

7from numpy.typing import NDArray 

8 

9from gpaw import GPAW 

10from gpaw.analyse.wignerseitz import wignerseitz 

11from gpaw.io import Writer 

12from gpaw.mpi import broadcast, world 

13from gpaw.utilities.tools import tri2full 

14 

15from .utils import Logger, parulmopen, ParallelMatrix 

16from .typing import Communicator, GPAWCalculator 

17 

18 

19AtomProjectionsType = Sequence[Union[list[int], NDArray[np.float64]]] # | breaks on py3.9 

20 

21 

22class VoronoiWeights(ABC): 

23 

24 """ Abstract base class for Voronoi weights in Kohn-Sham basis. """ 

25 

26 def __init__(self, 

27 comm: Communicator | None = None, 

28 log: Logger | None = None): 

29 if comm is None: 

30 comm = world 

31 if log is None: 

32 log = Logger() 

33 

34 self._comm = comm 

35 self._log = log 

36 

37 def __enter__(self): 

38 return self 

39 

40 def __exit__(self, exc_type, exc_value, tb): 

41 pass 

42 

43 def __len__(self) -> int: 

44 """ Number of projections. """ 

45 return self.nproj 

46 

47 @abstractmethod 

48 def __iter__(self) -> Iterator[NDArray[np.float64] | None]: 

49 """ Iteratively yield Voronoi weights for each projection. 

50 

51 Yields 

52 ------ 

53 Matrix of Voronoi weights on root rank, `None` on other ranks. 

54 """ 

55 raise NotImplementedError 

56 

57 def __str__(self) -> str: 

58 lines = [f'{self.__class__.__name__} for ground state with {self.nn} bands', 

59 f'{self.nproj} projections:'] 

60 

61 for i, atoms in enumerate(self.atom_projections): 

62 atomsstr = str(atoms) 

63 if len(atomsstr) > 50: 

64 atomsstr = atomsstr[:47] + '...' 

65 lines.append(f' #{i:<3.0f}: On atoms {atomsstr}') 

66 

67 return '\n'.join(lines) 

68 

69 @property 

70 def root(self) -> bool: 

71 """ Whether this rank is the root rank. """ 

72 return self.comm.rank == 0 

73 

74 @property 

75 def log(self) -> Logger: 

76 """ Logger object. """ 

77 return self._log 

78 

79 @property 

80 @abstractmethod 

81 def atom_projections(self) -> AtomProjectionsType: 

82 """ Atom projections. """ 

83 raise NotImplementedError 

84 

85 @property 

86 @abstractmethod 

87 def nn(self) -> int: 

88 """ Number of bands. """ 

89 raise NotImplementedError 

90 

91 @property 

92 def nproj(self) -> int: 

93 """ Number of projections. """ 

94 return len(self.atom_projections) 

95 

96 @property 

97 def comm(self) -> Communicator: 

98 """ MPI Communicator. """ 

99 return self._comm 

100 

101 @property 

102 @abstractmethod 

103 def saved_fields(self) -> dict[str, Any]: 

104 """ Saved data fields associated with the object. 

105 

106 If this object is read from a ULM file there might be some extra 

107 data in that file. Return that data, or an empty dict if there is none. 

108 """ 

109 raise NotImplementedError 

110 

111 

112class VoronoiWeightCalculator(VoronoiWeights): 

113 

114 r"""Calculate Voronoi weights in the basis of ground state Kohn-Sham orbitals. 

115 

116 For each atomic projection, calculates 

117 

118 .. math:: 

119 

120 W_{nn'} 

121 = \left<\psi_n | \hat{w} | \psi_{n'}\right> 

122 = \left<\tilde{\psi}_n | \hat{w} | \tilde{\psi}_{n'}\right> 

123 + \sum_{aij} \left<\tilde{\psi}_n | \tilde{p}_i^a\right> 

124 \Delta S_{ij}^a \left<\tilde{p}_i^a | \tilde{\psi}_{n'}\right> 

125 

126 where the operator :math:`\hat{w} = w(\vec{r})` is 1 in the 

127 Voronoi region of atoms and 0 outside. 

128 The sum over atoms is restricted to atoms in the region :math:`w(\vec{r}) = 1`. 

129 

130 The PAW projectors 

131 

132 .. math:: 

133 

134 P_{ni}^a = \left<\tilde{p}_i^a | \tilde{\psi}_n \right> 

135 

136 and overlap matrix 

137 

138 .. math:: 

139 

140 \Delta S_{ij}^a 

141 = \left<\phi_i^a|\phi_j^a\right> 

142 - \left<\tilde{\phi}_i^a|\tilde{\phi}_j^a\right> 

143 

144 are obtained from the GPAW calculator and the smooth weight matrix is obtained by 

145 a basis change of the smooth LCAO weights 

146 

147 .. math:: 

148 

149 \left<\tilde{\psi}_n | \hat{w} | \tilde{\psi}_{n'}\right> = 

150 \sum_{\mu\nu} C_{n\mu} C_{n'\nu} 

151 \left<\tilde{\phi}_\mu | \hat{w} | \tilde{\phi}_{\nu}\right>. 

152 

153 Parameters 

154 ---------- 

155 voronoi_lcao 

156 Object that calculates or loads the LCAO weights from file. 

157 """ 

158 

159 _voronoi_lcao: VoronoiLCAOWeights 

160 

161 def __init__(self, 

162 voronoi_lcao: VoronoiLCAOWeights): 

163 assert isinstance(voronoi_lcao, VoronoiLCAOWeights) 

164 self._voronoi_lcao = voronoi_lcao 

165 

166 super().__init__(comm=voronoi_lcao.comm, log=voronoi_lcao.log) 

167 

168 def __iter__(self) -> Iterator[NDArray[np.float64] | None]: 

169 for proj_atoms, weight_MM in zip(self.atom_projections, self.voronoi_lcao): 

170 nM = self.voronoi_lcao.nM 

171 nn = self.voronoi_lcao.nn 

172 

173 # Transform to KS basis C_nM @ weight_MM @ C_nM.T 

174 w_MM = ParallelMatrix((nM, nM), np.float64, comm=self.comm, 

175 array=weight_MM) 

176 C_nM = ParallelMatrix((nn, nM), np.float64, comm=self.comm, 

177 array=self.voronoi_lcao.C_nM) 

178 

179 _v_nn = C_nM @ w_MM @ C_nM.T 

180 self.log('Transformed weights to KS basis', flush=True, who='Voronoi', rank=0) 

181 

182 # Calculate PAW corrections on the root rank 

183 if self.root: 

184 dS_aii = self.voronoi_lcao.dS_aii 

185 P_ani = self.voronoi_lcao.P_ani 

186 v_nn = _v_nn.array 

187 assert dS_aii is not None 

188 assert P_ani is not None 

189 

190 for a, P_ni in P_ani.items(): 

191 if a not in proj_atoms: 

192 continue 

193 v_nn += P_ni @ dS_aii[a] @ P_ni.T 

194 

195 self.log('Added PAW corrections to weights', flush=True, who='Voronoi', rank=0) 

196 

197 yield v_nn 

198 else: 

199 yield None 

200 

201 @property 

202 def voronoi_lcao(self) -> VoronoiLCAOWeights: 

203 return self._voronoi_lcao 

204 

205 @property 

206 def atom_projections(self) -> AtomProjectionsType: 

207 return self.voronoi_lcao.atom_projections 

208 

209 @property 

210 def nn(self) -> int: 

211 return self.voronoi_lcao.nn 

212 

213 @property 

214 def nM(self) -> int: 

215 return self.voronoi_lcao.nM 

216 

217 @property 

218 def comm(self) -> Communicator: 

219 return self.voronoi_lcao.comm 

220 

221 @property 

222 def saved_fields(self) -> dict[str, Any]: 

223 return dict() 

224 

225 def calculate_and_write(self, 

226 out_fname: str, 

227 write_extra: dict[str, Any] = dict()): 

228 """ Calculate the Voronoi weights in the Kohn-Sham basis and write to file. 

229 

230 The weights are saved in a numpy archive if the file extension is `.npz` 

231 or in a ULM file if the file extension is `.ulm`. 

232 

233 Parameters 

234 ---------- 

235 out_fname 

236 File name of the resulting data file. 

237 write_extra 

238 Dictionary of additional data written to the file. 

239 """ 

240 to_be_written = dict() 

241 if world.rank == 0: 

242 to_be_written.update(self.saved_fields) 

243 to_be_written.update(write_extra) 

244 

245 if out_fname.endswith('.npz'): 

246 # Calculate weights 

247 weight_inn = list(self) 

248 

249 # Write on root ranks 

250 if self.root: 

251 return 

252 

253 to_be_written['atom_projections'] = atom_projections_to_numpy(self.atom_projections) 

254 np.savez(out_fname, weight_inn=np.array(weight_inn), **to_be_written) 

255 elif out_fname.endswith('.ulm'): 

256 with Writer(out_fname, world, mode='w', tag='Voronoi') as writer: 

257 writer.write(version=1) 

258 writer.write('atom_projections', self.atom_projections) 

259 writer.write(**to_be_written) 

260 

261 writer.add_array('weight_inn', (self.nproj, self.nn, self.nn), dtype=float) 

262 

263 # Calculate weights 

264 for weight_nn in self: 

265 # Write on root (DummyWriter on other ranks) 

266 writer.fill(weight_nn) 

267 else: 

268 raise ValueError(f'output-file must have ending .npz or .ulm, is {out_fname}') 

269 self.log(f'Weights written to {out_fname}', flush=True, who='Voronoi', rank=0) 

270 world.barrier() 

271 

272 @classmethod 

273 def from_gpw(cls, 

274 atom_projections: AtomProjectionsType, 

275 gpw_file: str, 

276 voronoi_grid: VoronoiGrid | None | str = None, 

277 comm: Communicator | None = None) -> VoronoiWeightCalculator: 

278 """ 

279 Set up calculation from GPAW file. 

280 

281 Parameters 

282 ---------- 

283 atom_projections 

284 List of atom groups. Each atom group is a list of integers (of any length). 

285 gpw_file 

286 File name of GPAW ground state file. 

287 voronoi_grid 

288 Voronoi grid, or `None` to calculate it, or a file name to read it from file. 

289 comm 

290 Communicator. 

291 """ 

292 voronoi_lcao = VoronoiLCAOWeightCalculator( 

293 atom_projections=atom_projections, 

294 gpw_file=gpw_file, 

295 comm=comm) 

296 return cls(voronoi_lcao) 

297 

298 @classmethod 

299 def from_lcao_file(cls, 

300 voronoi_lcao_file: str, 

301 comm: Communicator | None = None) -> VoronoiWeightCalculator: 

302 """ 

303 Set up calculation from file of already computed weights in LCAO basis. 

304 

305 Parameters 

306 ---------- 

307 voronoi_lcao_file 

308 File name of file with Voronoi weights in LCAO basis. 

309 comm 

310 Communicator. 

311 """ 

312 voronoi_lcao = VoronoiLCAOReader(voronoi_lcao_file, comm=comm) 

313 return cls(voronoi_lcao) 

314 

315 

316class VoronoiReader(VoronoiWeights): 

317 

318 """ Read Voronoi weights from ulm file. 

319 

320 Parameters 

321 ---------- 

322 ulm_fname 

323 File name. 

324 comm 

325 GPAW MPI communicator object. Defaults to world. 

326 """ 

327 

328 _nn: int 

329 _atom_projections: AtomProjectionsType 

330 

331 def __init__(self, 

332 ulm_fname: str, 

333 comm: Communicator | None = None): 

334 super().__init__(comm=comm) 

335 

336 self.ulm_fname = ulm_fname 

337 self.reader = parulmopen(self.ulm_fname, self.comm) 

338 

339 # Read size 

340 if self.root: 

341 weight_inn = self.reader.proxy('weight_inn') 

342 assert weight_inn.dtype is np.dtype(float) 

343 assert weight_inn.shape[1] == weight_inn.shape[2] 

344 atom_projections = self.reader.atom_projections 

345 nn = weight_inn.shape[1] 

346 brdcast = (atom_projections, nn) 

347 else: 

348 brdcast = None 

349 

350 brdcast = broadcast(brdcast, root=0, comm=self.comm) 

351 self._atom_projections, self._nn = brdcast # type: ignore 

352 

353 @property 

354 def atom_projections(self) -> AtomProjectionsType: 

355 return self._atom_projections 

356 

357 @property 

358 def nn(self) -> int: 

359 return self._nn 

360 

361 def __exit__(self, exc_type, exc_value, tb): 

362 self.reader.close() 

363 

364 def __iter__(self) -> Iterator[NDArray[np.float64] | None]: 

365 for i in range(len(self)): 

366 if self.root: 

367 weight_nn = self.reader.proxy('weight_inn', i)[:] 

368 else: 

369 weight_nn = None 

370 

371 yield weight_nn 

372 

373 @property 

374 def saved_fields(self) -> dict[str, Any]: 

375 """ Saved data fields associated with the object 

376 

377 If this object is read from a ULM file there might be some extra 

378 data in that file. Return that data, or an empty dict if there is none. 

379 """ 

380 data = {key: getattr(self.reader, key) for key in self.reader.keys() 

381 if key not in ['weight_inn', 'atom_projections']} 

382 

383 return data 

384 

385 

386class EmptyVoronoiWeights(VoronoiWeights): 

387 

388 """ Object representing a lack of weights. """ 

389 

390 def __iter__(self): 

391 return 

392 yield 

393 

394 @property 

395 def atom_projections(self): 

396 return [] 

397 

398 @property 

399 def nn(self): 

400 return 0 

401 

402 @property 

403 def saved_fields(self): 

404 return {} 

405 

406 

407class VoronoiLCAOWeights(ABC): 

408 

409 """ Abstract base class for Voronoi weights in LCAO basis. """ 

410 

411 _atom_projections: AtomProjectionsType 

412 _comm: Communicator 

413 _log: Logger 

414 _dS_aii: Any 

415 _P_ani: Any 

416 _C_nM: Any 

417 

418 def __enter__(self): 

419 return self 

420 

421 def __exit__(self, exc_type, exc_value, tb): 

422 pass 

423 

424 def __len__(self) -> int: 

425 """ Return the number of projections. """ 

426 return self.nproj 

427 

428 @abstractmethod 

429 def __iter__(self) -> Iterator[NDArray[np.float64] | None]: 

430 """ Iteratively yield Voronoi weights in the LCAO basis for each projection. 

431 

432 Yields 

433 ------ 

434 Matrix of Voronoi weights on root rank, `None` on other ranks. 

435 """ 

436 raise NotImplementedError 

437 

438 @property 

439 def log(self) -> Logger: 

440 return self._log 

441 

442 def log_parallel(self, *args, **kwargs) -> Logger: 

443 """ Log message with communicator information. """ 

444 return self._log(*args, **kwargs, comm=self.comm, who='Voronoi') 

445 

446 @property 

447 @abstractmethod 

448 def nn(self) -> int: 

449 """ Number of bands. """ 

450 raise NotImplementedError 

451 

452 @property 

453 @abstractmethod 

454 def nM(self) -> int: 

455 """ Number of basis functions. """ 

456 raise NotImplementedError 

457 

458 @property 

459 def nproj(self) -> int: 

460 """ Number of atomic projections """ 

461 return len(self.atom_projections) 

462 

463 @property 

464 def atom_projections(self) -> AtomProjectionsType: 

465 """ List of atom projections. """ 

466 return self._atom_projections 

467 

468 @property 

469 def calc(self) -> GPAWCalculator | None: 

470 """ GPAW calculator instance. """ 

471 return None 

472 

473 @property 

474 def comm(self) -> Communicator: 

475 """ MPI Communicator. """ 

476 return self._comm 

477 

478 @property 

479 def root(self) -> bool: 

480 """ Whether this rank is the root rank. """ 

481 return self.comm.rank == 0 

482 

483 @property 

484 def C_nM(self) -> NDArray[np.float64] | None: 

485 """ LCAO wave function coefficients on root rank, `None` on other ranks. """ 

486 return self._C_nM if self.root else None 

487 

488 @property 

489 def P_ani(self) -> dict[int, NDArray[np.float64]] | None: 

490 r""" PAW projectors :math:`P_{ni}^a` on the root rank, `None` on other ranks. 

491 

492 .. math:: 

493 

494 P_{ni}^a = \left<\tilde{p}_i^a | \tilde{\psi}_n \right> 

495 """ 

496 return self._P_ani if self.root else None 

497 

498 @property 

499 def dS_aii(self) -> dict[int, NDArray[np.float64]] | None: 

500 r""" Overlap matrix PAW corrections :math:`\Delta S_{ij}^a` on the root rank, `None` on other ranks. 

501 

502 .. math:: 

503 

504 \Delta S_{ij}^a 

505 = \left<\phi_i^a|\phi_j^a\right> 

506 - \left<\tilde{\phi}_i^a|\tilde{\phi}_j^a\right> 

507 """ 

508 return self._dS_aii if self.root else None 

509 

510 @property 

511 @abstractmethod 

512 def saved_fields(self) -> dict[str, Any]: 

513 """ Saved data fields associated with the object. 

514 

515 If this object is read from a ULM file there might be some extra 

516 data in that file. Return that data, or an empty dict if there is none. 

517 """ 

518 raise NotImplementedError 

519 

520 @property 

521 def arrays(self) -> dict[str, Any]: 

522 if self.comm.rank != 0: 

523 return {} 

524 

525 arrays = dict(P_ani=self.P_ani, 

526 C_nM=self.C_nM, 

527 dS_aii=self.dS_aii) 

528 

529 return arrays 

530 

531 

532class VoronoiLCAOReader(VoronoiLCAOWeights): 

533 

534 """ Read Voronoi weights in the LCAO basis from ULM file. 

535 

536 Parameters 

537 ---------- 

538 ulm_fname 

539 File name. 

540 comm 

541 GPAW MPI communicator object. Defaults to world. 

542 """ 

543 

544 def __init__(self, 

545 ulm_fname: str, 

546 comm=None): 

547 self.ulm_fname = ulm_fname 

548 

549 if comm is None: 

550 comm = world 

551 

552 self._log = Logger() 

553 self._comm = comm 

554 self.reader = parulmopen(self.ulm_fname, self.comm) 

555 

556 self._dS_aii: dict[int, NDArray[np.float64]] | None 

557 self._P_ani: dict[int, NDArray[np.float64]] | None 

558 self._C_nM = NDArray[np.float64] | None 

559 

560 # Read arrays on root rank 

561 if self.root: 

562 weight_iMM = self.reader.proxy('weight_iMM') 

563 assert weight_iMM.dtype is np.dtype(float) 

564 assert weight_iMM.shape[1] == weight_iMM.shape[2] 

565 self._dS_aii = self.reader.dS_aii 

566 self._P_ani = self.reader.P_ani 

567 self._C_nM = self.reader.C_nM[:] 

568 atom_projections = self.reader.atom_projections 

569 brdcast = (atom_projections, ) + self._C_nM.shape 

570 else: 

571 brdcast = None 

572 self._C_nM = None 

573 self._P_ani = None 

574 self._dS_aii = None 

575 

576 # Broadcast atom projections and number of bands and basis functions 

577 brdcast = broadcast(brdcast, root=0, comm=self.comm) 

578 self._atom_projections, self._nn, self._nM = brdcast # type: ignore 

579 

580 def __exit__(self, exc_type, exc_value, tb): 

581 self.reader.close() 

582 

583 @property 

584 def nn(self) -> int: 

585 return self._nn 

586 

587 @property 

588 def nM(self) -> int: 

589 return self._nM 

590 

591 def __iter__(self) -> Iterator[NDArray[np.float64]]: 

592 for i in range(len(self)): 

593 if self.root: 

594 weight_MM = self.reader.proxy('weight_iMM', i)[:] 

595 else: 

596 weight_MM = None 

597 

598 yield weight_MM 

599 

600 @property 

601 def saved_fields(self) -> dict[str, Any]: 

602 data = {key: getattr(self.reader, key) for key in self.reader.keys() 

603 if key not in ['weight_iMM', 'atom_projections', 

604 'dS_aii', 'P_ani', 'C_nM']} 

605 

606 return data 

607 

608 

609class VoronoiLCAOWeightCalculator(VoronoiLCAOWeights): 

610 

611 r"""Calculate Voronoi weights in LCAO basis. 

612 

613 For each atomic projection, calculates 

614 

615 .. math:: 

616 

617 \tilde{v}_{\mu\nu} 

618 = \left<\tilde{\phi}_{\mu} | \hat{w} | \tilde{\phi}_{\nu}\right> 

619 = \int w(\vec{r}) \tilde{\phi}_{\mu}^*(\vec{r}) \tilde{\phi}_{\nu}(\vec{r}) d\vec{r} 

620 

621 where the operator :math:`\hat{w} = w(\vec{r})` is 1 in the 

622 Voronoi region of the atomic projections and 0 outside, and :math:`\tilde{\phi}_{\nu}` are 

623 the smooth LCAO basis functions. 

624 

625 Parameters 

626 ---------- 

627 atom_projections 

628 List of atom groups. Each atom group is a list of integers (of any length). 

629 gpw_file 

630 File name of GPAW ground state file. 

631 voronoi_grid 

632 Voronoi grid, or `None` to calculate it, or a file name to read it from file. 

633 domain 

634 Domain size. 

635 comm 

636 Communicator. 

637 """ 

638 

639 _calc: GPAWCalculator 

640 

641 def __init__(self, 

642 atom_projections: AtomProjectionsType, 

643 gpw_file: str, 

644 voronoi_grid: VoronoiGrid | None | str = None, 

645 domain: int = -1, 

646 comm: Communicator | None = None): 

647 assert all([isinstance(proj_atoms, list) or isinstance(proj_atoms, np.ndarray) 

648 for proj_atoms in atom_projections]) 

649 self._atom_projections = atom_projections 

650 

651 if comm is None: 

652 comm = world 

653 

654 self._comm = comm 

655 self._log = Logger() 

656 

657 if voronoi_grid is None: 

658 voronoi_grid = VoronoiGridCalculator() 

659 if isinstance(voronoi_grid, str): 

660 voronoi_grid = VoronoiGridReader(voronoi_grid) 

661 self.voronoi_grid = voronoi_grid 

662 

663 if domain == -1: 

664 domain = self.comm.size 

665 

666 # Set up GPAW calculator 

667 calc = GPAW(gpw_file, txt=None, communicator=self.comm, parallel={'domain': domain}) 

668 calc.initialize_positions() 

669 self.log('Loaded and initialized GPAW', rank=0, flush=True, who='Voronoi') 

670 self._calc = calc # type: ignore 

671 

672 # Collect wave functions, projectors and overlap to the root rank 

673 C_nM = calc.wfs.collect_array('C_nM', k=0, s=0) 

674 P_ani = proj_as_dict_on_master(self.calc.wfs.kpt_u[0].projections, 0, self.nn) 

675 dS_aii = {a: setup.dO_ii for a, setup in enumerate(calc.wfs.setups)} 

676 

677 self._C_nM = C_nM if self.root else None 

678 self._P_ani = P_ani if self.root else None 

679 self._dS_aii = dS_aii if self.root else None 

680 

681 @property 

682 def nn(self) -> int: 

683 return self.calc.wfs.bd.nbands 

684 

685 @property 

686 def nM(self) -> int: 

687 return self.calc.wfs.setups.nao 

688 

689 @property 

690 def calc(self) -> GPAWCalculator: 

691 return self._calc 

692 

693 def __iter__(self) -> Iterator[NDArray[np.float64] | None]: 

694 # Get the Voronoi grid 

695 a_G = self.voronoi_grid.a_G(self.calc, self.log) 

696 

697 for proj_atoms in self.atom_projections: 

698 w_G = np.where(np.isin(a_G, proj_atoms), 1.0, 0.0) 

699 if self.calc.comms['b'].rank != 0: 

700 # The band comm ranks compute the same information 

701 return None 

702 

703 # Calculate the weights on the domain communicator 

704 weight_MM = self.calc.wfs.basis_functions.calculate_potential_matrices(w_G)[0] 

705 tri2full(weight_MM) 

706 

707 # Sum to the root rank 

708 self.calc.comms['d'].sum(weight_MM, root=0) 

709 

710 self.log(f'Computed LCAO weights for projection {proj_atoms}', rank=0, flush=True, who='Voronoi') 

711 

712 if self.root: 

713 yield weight_MM 

714 else: 

715 yield None 

716 

717 @property 

718 def saved_fields(self) -> dict[str, Any]: 

719 return dict() 

720 

721 def calculate_and_write(self, 

722 out_fname: str, 

723 write_extra: dict[str, Any] = dict()): 

724 """ Calculate the Voronoi weights in the LCAO basis and write to file. 

725 

726 The weights are saved in a numpy archive if the file extension is `.npz` 

727 or in a ULM file if the file extension is `.ulm`. 

728 

729 Parameters 

730 ---------- 

731 out_fname 

732 File name of the resulting data file. 

733 write_extra 

734 Dictionary of additional data written to the file. 

735 """ 

736 to_be_written = dict() 

737 if world.rank == 0: 

738 to_be_written.update(self.saved_fields) 

739 to_be_written.update(self.arrays) 

740 to_be_written.update(write_extra) 

741 

742 if out_fname.endswith('.npz'): 

743 # Calculate weights 

744 weight_iMM = list(self) 

745 

746 # Write on root ranks 

747 if self.root: 

748 return 

749 

750 to_be_written['atom_projections'] = atom_projections_to_numpy(self.atom_projections) 

751 np.savez(out_fname, weight_iMM=np.array(weight_iMM), **to_be_written) 

752 elif out_fname.endswith('.ulm'): 

753 with Writer(out_fname, self.comm, mode='w', tag='Voronoi') as writer: 

754 writer.write(version=1) 

755 writer.write('atom_projections', self.atom_projections) 

756 writer.write(**to_be_written) 

757 

758 writer.add_array('weight_iMM', (self.nproj, self.nM, self.nM), dtype=float) 

759 

760 # Calculate weights 

761 for weight_MM in self: 

762 # Write on root (DummyWriter on other ranks) 

763 writer.fill(weight_MM) 

764 else: 

765 raise ValueError(f'output-file must have ending .npz or .ulm, is {out_fname}') 

766 self.log(f'Written weights in LCAO basis to {out_fname}', flush=True, who='Voronoi', rank=0) 

767 world.barrier() 

768 

769 

770class VoronoiGrid(ABC): 

771 

772 @abstractmethod 

773 def a_G(self, 

774 calc: GPAWCalculator, 

775 log: Logger) -> NDArray[np.int_]: 

776 """ Voronoi grid (on the coarse grid of the GPAW calculator) 

777 distributed on the domain communicator. 

778 

779 Each element in the grid is an integer corresponding to the closest atom. 

780 

781 Parameters 

782 ---------- 

783 GPAW calculator. 

784 """ 

785 raise NotImplementedError 

786 

787 def write(self, 

788 filename: str, 

789 calc: GPAWCalculator, 

790 log: Logger): 

791 """ Write grid to file. 

792 

793 Parameters 

794 ---------- 

795 file name 

796 File name. 

797 calc 

798 GPAW calculator. 

799 """ 

800 filename = str(filename) 

801 

802 # Calculate grid and collect to root 

803 big_a_G = calc.density.gd.collect(self.a_G(calc, log)) 

804 if filename.endswith('.npz'): 

805 np.savez_compressed(filename, a_G=big_a_G) 

806 else: 

807 np.save(filename, big_a_G) 

808 log(f'Written Voronoi grid to {filename}', who='Voronoi', flush=True, rank=0) 

809 

810 

811class VoronoiGridReader(VoronoiGrid): 

812 

813 """ Read Voronoi grid from file. """ 

814 

815 def __init__(self, 

816 filename: str): 

817 self.filename = filename 

818 

819 def a_G(self, 

820 calc: GPAWCalculator, 

821 log: Logger) -> NDArray[np.int_]: 

822 # Read the grid on the domain communicator root 

823 domain_comm = calc.comms['d'] 

824 if domain_comm.rank == 0: 

825 if self.filename.endswith('.npz'): 

826 files = np.load(self.filename) 

827 big_a_G = files['a_G'] 

828 else: 

829 big_a_G = np.load(self.filename) 

830 

831 assert big_a_G.dtype == np.int16 

832 

833 # Distribute grid across domain communicator 

834 gd = calc.density.gd 

835 a_G = gd.zeros(dtype=np.int16) 

836 gd.distribute(big_a_G if domain_comm.rank == 0 else None, a_G) 

837 

838 log('Loaded Voronoi grid', rank=0, flush=True, who='Voronoi') 

839 

840 return a_G 

841 

842 

843class VoronoiGridCalculator(VoronoiGrid): 

844 

845 """ Calculate the Voronoi grid. """ 

846 

847 def a_G(self, 

848 calc: GPAWCalculator, 

849 log: Logger) -> NDArray[np.int_]: 

850 log('Computing Voronoi grid', rank=0, flush=True, who='Voronoi') 

851 atoms = calc.get_atoms() 

852 a_G = wignerseitz(calc.density.gd, atoms) 

853 a_G = a_G.astype(np.int16) 

854 log('Computed Voronoi grid', rank=0, flush=True, who='Voronoi') 

855 

856 return a_G 

857 

858 

859def atom_projections_to_numpy(atom_projections: AtomProjectionsType) -> NDArray[np.int_]: 

860 Ni = len(atom_projections) 

861 if Ni == 0: 

862 Nj = 0 

863 else: 

864 Nj = max([len(proj_atoms) for proj_atoms in atom_projections]) 

865 atom_projections_ij = np.full((Ni, Nj), -1, dtype=int) 

866 for i, proj_atoms in enumerate(atom_projections): 

867 na = len(proj_atoms) 

868 atom_projections_ij[i, :na] = proj_atoms 

869 

870 return atom_projections_ij 

871 

872 

873def proj_as_dict_on_master(proj, n1: int, n2: int) -> dict[int, NDArray[np.float64]]: 

874 """ Collect the projectors to a dictionary on the root rank.""" 

875 # In newer versions of GPAW this is proj.as_dict_on_master 

876 P_nI = proj.collect() 

877 if P_nI is None: 

878 return {} 

879 I1 = 0 

880 P_ani = {} 

881 for a, ni in enumerate(proj.nproj_a): 

882 I2 = I1 + ni 

883 P_ani[a] = P_nI[n1:n2, I1:I2] 

884 I1 = I2 

885 return P_ani