Source code for rhodent.voronoi

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, Iterator, Sequence, Union

import numpy as np
from numpy.typing import NDArray

from gpaw import GPAW
from gpaw.analyse.wignerseitz import wignerseitz
from gpaw.arraydict import ArrayDict
from gpaw.matrix import Matrix, suggest_blocking
from gpaw.mpi import SerialCommunicator, broadcast, world
from gpaw.utilities.partition import AtomPartition
from gpaw.utilities.tools import tri2full

from .utils import Logger, parulmopen, proj_as_dict_on_master, redistribute_LCAO_into_block_cyclic_form
from .typing import Communicator, GPAWCalculator


AtomProjectionsType = Sequence[Union[list[int], NDArray[np.float64]]]  # | breaks on py3.9


[docs] class VoronoiWeights(ABC): broadcast_weights: bool _comm: Any # MPI communicator _log: Logger def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): pass def __len__(self) -> int: """ Return the number of projections """ return self.nproj @abstractmethod def __iter__(self) -> Iterator[NDArray[np.float64] | None]: """ Yields the weights None is yielded on non-root ranks unless broadcast_weights is True """ raise NotImplementedError def __str__(self) -> str: lines = [f'{self.__class__.__name__} for ground state with {self.nn} bands', f'{self.nproj} projections:'] for i, atoms in enumerate(self.atom_projections): atomsstr = str(atoms) if len(atomsstr) > 50: atomsstr = atomsstr[:47] + '...' lines.append(f'- #{i:<3.0f}: On atoms {atomsstr}') return '\n'.join(lines) @property def log(self) -> Logger: return self._log @property @abstractmethod def atom_projections(self) -> AtomProjectionsType: """ Atom projections """ raise NotImplementedError @property @abstractmethod def nn(self) -> int: """ Global number of bands """ raise NotImplementedError @property @abstractmethod def dtype(self) -> np.dtype: """ dtype of wave functions """ raise NotImplementedError @property def nproj(self) -> int: return len(self.atom_projections) @property def comm(self) -> Communicator: """ Communicator """ return self._comm @property @abstractmethod def saved_fields(self) -> dict[str, Any]: """ Saved data fields associated with the object If this object is read from a ULM file there might be some extra data in that file. Return that data, or an empty dict if there is none. """ raise NotImplementedError
[docs] class EmptyVoronoiWeights(VoronoiWeights): def __init__(self): self._log = Logger() self._comm = self._log.comm self.broadcast_weights = True def __iter__(self): return yield @property def atom_projections(self): return [] @property def nn(self): return 0 @property def dtype(self): return float @property def saved_fields(self): return {}
[docs] class VoronoiLCAOWeights(ABC): _atom_projections: AtomProjectionsType broadcast_weights: bool _comm: Any # MPI communicator _log: Logger _dS_aii: Any _dist_C_nM: NDArray[np.float64] | None = None _P_ani: ArrayDict _gather_P_ani: ArrayDict | None = None def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): pass def __len__(self) -> int: """ Return the number of projections """ return self.nproj @abstractmethod def __iter__(self) -> Iterator[NDArray[np.float64] | None]: """ Yields the weights. Different data is returned on different ranks of the domain communicator. Duplicate data is returned on different ranks of the band communicator. """ raise NotImplementedError @property def log(self) -> Logger: return self._log @property @abstractmethod def dtype(self) -> np.dtype: """ dtype of the wave functions """ raise NotImplementedError @property @abstractmethod def nn(self) -> int: """ Global number of bands """ raise NotImplementedError @property @abstractmethod def nM(self) -> int: """ Global number of bands """ raise NotImplementedError @property def nproj(self) -> int: """ Number of atomic projections """ return len(self.atom_projections) @property def atom_projections(self) -> AtomProjectionsType: return self._atom_projections @property def calc(self) -> GPAWCalculator | None: return None @property def comm(self) -> Communicator: """ Communicator """ return self._comm @property @abstractmethod def C_nM(self): """ LCAO wave function coefficients. Distributed over bands. """ raise NotImplementedError @property @abstractmethod def P_ani(self) -> ArrayDict: r""" PAW projectors. Distributed over domains and bands. .. math:: P_{ni}^a = \left<\tilde{p}_i^a | \tilde{\psi}_n \right> """ raise NotImplementedError @property def dist_C_nM(self): """ LCAO wave function coefficients. Distributed in block cyclic form. """ if self._dist_C_nM is None: self._prepare_C() return self._dist_C_nM @property def gather_P_ani(self) -> ArrayDict: """ PAW projectors. Gathered to band comm rank 0. None on all other band comm ranks. """ if self._gather_P_ani is None: self._prepare_P() assert self._gather_P_ani is not None return self._gather_P_ani @property def dS_aii(self): r""" Overlap matrix PAW corrections. Same data on all ranks .. math:: \Delta S_{ij}^a= = \left<\phi_i^a|\phi_j^a\right> - \left<\tilde{\phi}_i^a|\tilde{\phi}_j^a\right> """ return self._dS_aii @property def dist(self): return self.dist_C_nM.dist @abstractmethod def _prepare_P(self): raise NotImplementedError @abstractmethod def _prepare_C(self): raise NotImplementedError @property @abstractmethod def band_comm(self) -> Communicator: raise NotImplementedError @property @abstractmethod def domain_comm(self) -> Communicator: raise NotImplementedError @property @abstractmethod def saved_fields(self) -> dict[str, Any]: """ Saved data fields associated with the object. If this object is read from a ULM file there might be some extra data in that file. Return that data, or an empty dict if there is none. """ raise NotImplementedError
[docs] class VoronoiReader(VoronoiWeights): """ Read Voronoi weights from ulm file. Parameters ---------- ulm_fname filename broadcast_weights If true, the array of weights is broadcasted and yielded on all ranks comm GPAW MPI communicator object. Defaults to world """ _nn: int _dtype: np.dtype _atom_projections: AtomProjectionsType def __init__(self, ulm_fname: str, broadcast_weights: bool = False, comm=None): self.ulm_fname = ulm_fname self.broadcast_weights = broadcast_weights if comm is None: comm = world self._log = Logger(comm=comm) self._comm = comm self.reader = parulmopen(self.ulm_fname, self.comm) # Read size if self.comm.rank == 0: weight_inn = self.reader.proxy('weight_inn') assert weight_inn.dtype is np.dtype(float) assert weight_inn.shape[1] == weight_inn.shape[2] atom_projections = self.reader.atom_projections nn = weight_inn.shape[1] brdcast = (atom_projections, nn, weight_inn.dtype) else: brdcast = None brdcast = broadcast(brdcast, root=0, comm=self.comm) self._atom_projections, self._nn, self._dtype = brdcast # type: ignore @property def atom_projections(self) -> AtomProjectionsType: return self._atom_projections @property def nn(self) -> int: return self._nn @property def dtype(self) -> np.dtype: return self._dtype def __exit__(self, exc_type, exc_value, tb): self.reader.close() def __iter__(self) -> Iterator[NDArray[np.float64] | None]: for i in range(len(self)): if self.comm.rank == 0: weight_nn = self.reader.proxy('weight_inn', i)[:] else: if self.broadcast_weights: weight_nn = np.empty((self.nn, self.nn), self.dtype) else: weight_nn = None if self.broadcast_weights: self.comm.broadcast(weight_nn, 0) yield weight_nn @property def saved_fields(self) -> dict[str, Any]: """ Saved data fields associated with the object If this object is read from a ULM file there might be some extra data in that file. Return that data, or an empty dict if there is none. """ data = {key: getattr(self.reader, key) for key in self.reader.keys() if key not in ['weight_inn', 'atom_projections']} return data
[docs] class VoronoiLCAOReader(VoronoiLCAOWeights): """ Read Voronoi weights in LCAO basis from ulm file. Parameters ---------- ulm_fname filename broadcast_weights If true, the array of weights is broadcasted and yielded on all ranks comm GPAW MPI communicator object. Defaults to world """ _comm_is_domain: bool def __init__(self, ulm_fname: str, comm_is_domain: bool = False, comm=None): self.ulm_fname = ulm_fname self._comm_is_domain = comm_is_domain if comm is None: comm = world self._log = Logger(comm=comm) self._comm = comm self.reader = parulmopen(self.ulm_fname, self.comm) # Read size if self.comm.rank == 0: weight_iMM = self.reader.proxy('weight_iMM') assert weight_iMM.dtype is np.dtype(float) assert weight_iMM.shape[1] == weight_iMM.shape[2] dS_aii = self.reader.dS_aii P_ani = self.reader.P_ani shapes_a = [P_ani[a].shape for a in range(len(P_ani))] self._C_nM = self.reader.C_nM[:] atom_projections = self.reader.atom_projections brdcast = (atom_projections, dS_aii, self._C_nM.dtype) + self._C_nM.shape else: brdcast = None P_ani = {} self._C_nM = None brdcast = broadcast(brdcast, root=0, comm=self.comm) self._atom_projections, self._dS_aii, self._dtype, self._nn, self._nM = brdcast # type: ignore natoms = len(self._dS_aii) if self.domain_comm.rank == 0: if self.band_comm.rank != 0: shapes_a = natoms*[(0, 0)] P_ani = {a: np.zeros(shape) for a, shape in enumerate(shapes_a)} partition = AtomPartition(self.domain_comm, natoms*[0], 'All on root partition') else: shapes_a = [] partition = AtomPartition(self.domain_comm, [], 'Empty partition') self._P_ani = ArrayDict(partition, shapes_a, d=P_ani) self._gather_P_ani = self._P_ani def __exit__(self, exc_type, exc_value, tb): self.reader.close() @property def C_nM(self): return self._C_nM @property def P_ani(self) -> ArrayDict: return self._P_ani @property def dtype(self) -> np.dtype: return self._dtype @property def nn(self) -> int: return self._nn @property def nM(self) -> int: return self._nM def __iter__(self) -> Iterator[NDArray[np.float64]]: for i in range(len(self)): if self.domain_comm.rank == 0: if self.band_comm.rank == 0: weight_MM = self.reader.proxy('weight_iMM', i)[:] else: weight_MM = np.zeros((self.nM, self.nM), float) self.band_comm.broadcast(weight_MM, 0) else: weight_MM = np.zeros((self.nM, self.nM), float) yield weight_MM def _prepare_P(self): pass def _prepare_C(self): dist = (self.comm, ) + suggest_blocking(self.nn, self.comm.size) dist_C_nM = Matrix(self.nn, self.nM, dtype=self.dtype, dist=dist) serial_C_nM = Matrix(self.nn, self.nM, dtype=self.dtype, dist=(self.comm, 1, 1, None), data=self.C_nM) serial_C_nM.redist(dist_C_nM) self._dist_C_nM = dist_C_nM self.log('Done distributing', rank=0) @property def band_comm(self) -> Communicator: if self._comm_is_domain: return SerialCommunicator() # type: ignore else: return self.comm @property def domain_comm(self) -> Communicator: if self._comm_is_domain: return self.comm else: return SerialCommunicator() # type: ignore @property def saved_fields(self) -> dict[str, Any]: data = {key: getattr(self.reader, key) for key in self.reader.keys() if key not in ['weight_iMM', 'atom_projections', 'dS_aii', 'P_ani', 'C_nM']} return data
[docs] class VoronoiWeightCalculator(VoronoiWeights): r"""Calculates KS wave function overlaps under operator :math:`\hat{w}` using LCAO basis function overlaps and PAW corrections. .. math:: W_{nn'} = \left<\psi_n | \hat{w} | \psi_{n'}\right> = \left<\tilde{\psi}_n | \hat{T}^\dagger \hat{w} \hat{T} | \tilde{\psi}_{n'}\right> = \left<\tilde{\psi}_n | \hat{w} | \tilde{\psi}_{n'}\right> + \sum_{aij} \left<\tilde{\psi}_n | \tilde{p}_i^a\right> \Delta S_{ij}^a \left<\tilde{p}_i^a | \tilde{\psi}_{n'}\right> where the operator :math:`\hat{w} = w(\vec{r})` is 1 in the Voronoi region of atoms :attr:`proj_atoms` and 0 outside. The sum over atoms is restricted to atoms in the region :math:`w(\vec{r}) = 1`. :math:`\hat{T}` is the PAW transformation operator. Parameters ---------- voronoi_lcao_gen Object that calculates or loads the LCAO weights from file use_pblas Whether PBLAS/ScaLAPACK should be used (more efficient parallelization) """ _voronoi_lcao_gen: VoronoiLCAOWeights def __init__(self, voronoi_lcao_gen: VoronoiLCAOWeights, use_pblas: bool = False): assert isinstance(voronoi_lcao_gen, VoronoiLCAOWeights) self._voronoi_lcao_gen = voronoi_lcao_gen self._log = self.voronoi_lcao_gen.log self.use_pblas = use_pblas if not use_pblas: assert self.band_comm.size == 1, 'Only domain parallelization allowed when not using PBLAS' self.log('Start', rank=0, flush=True) def __iter__(self) -> Iterator[NDArray[np.float64] | None]: """ Yields weights in KS basis. """ for proj_atoms, weight_MM in zip(self.atom_projections, self.voronoi_lcao_gen): if self.use_pblas: yield self.calculate_weight_nn_pblas(proj_atoms, weight_MM) else: self.voronoi_lcao_gen.gather_P_ani # Needs to be called by all domains to be gathered yield self.calculate_weight_nn(proj_atoms, weight_MM)
[docs] def calculate_weight_nn_pblas(self, proj_atoms: list[int] | NDArray[np.float64], weight_MM: NDArray[np.float64] | None, out: NDArray[np.float64] | None = None) -> NDArray[np.float64] | None: r""" Calculates weights using PBLAS. Parameters ---------- proj_atoms Do the Voronoi decomposition for these atoms weight_MM The weights in LCAO without PAW corrections. The weights should be summed to the rank 0 of the domain communicator The weights on other ranks than 0 on the band communicator are ignored (they should be identical on all ranks) out If this is specified the result is written to here on rank 0 Returns ------- The matrix elements :math:`W_{nn'}` on root and ``None`` on other ranks """ if out is not None and self.comm.rank == 0: assert out.shape == (self.nn, self.nn), out.shape # Only non-None on comm root gather_P_ani = self.voronoi_lcao_gen.gather_P_ani if weight_MM is None: # It is permissible to let weight_MM be None. Then all projectors # must be on the root rank of the domain comm assert self.domain_comm.rank != 0, 'Weights must not be None on non-root rank' assert gather_P_ani.partition.natoms == 0, gather_P_ani.partition.natoms if self.band_comm.rank == 0: v_nn = np.zeros((self.nn, self.nn), dtype=self.dtype) else: # Perform the transformation to KS basis only on domain comms where # weight_MM is not None if self.band_comm.rank != 0: # Weights should be identical over the entire band communicator weight_MM = np.zeros((0, 0)) dist = self.voronoi_lcao_gen.dist # redistribute the overlaps into block cyclic form serial_vt_MM: Matrix | None serial_vt_MM = Matrix(self.nM, self.nM, dtype=float, dist=(self.band_comm, 1, 1, None), data=weight_MM) dist_vt_MM = Matrix(self.nM, self.nM, dtype=float, dist=(self.band_comm, dist.rows, dist.columns, dist.blocksize)) serial_vt_MM.redist(dist_vt_MM) serial_vt_MM = None weight_MM = None if self.band_comm.rank == 0: self.log('Distributed vt_MM') # Transform into KS basis vt_nn = Matrix(self.nn, self.nn, dtype=float, dist=(self.band_comm, dist.rows, dist.columns, dist.blocksize)) calc_vt_nn_pblas(self.voronoi_lcao_gen.dist_C_nM, dist_vt_MM, vt_nn) if self.band_comm.rank == 0: self.log('Calculated vt_nn in parallel') # redistribute the v_nn matric to serial again serial_vt_nn = Matrix(self.nn, self.nn, dtype=float, dist=(self.band_comm, 1, 1, None)) vt_nn.redist(serial_vt_nn) v_nn = serial_vt_nn.array if self.band_comm.rank == 0: self.log('Distributed vt_nn', rank=0) if self.band_comm.rank == 0: calc_correction(proj_atoms, gather_P_ani, self.voronoi_lcao_gen.dS_aii, self.nn, out=v_nn) self.domain_comm.sum(v_nn) self.log('Calculated correction', rank=0) if self.comm.rank == 0: if out is not None: out[:] = v_nn return v_nn else: return None
[docs] def calculate_weight_nn(self, proj_atoms: list[int] | NDArray[np.float64], weight_MM: NDArray[np.float64] | None, out: NDArray[np.float64] | None = None) -> NDArray[np.float64] | None: r""" Calculates weights. Parameters ---------- proj_atoms Do the Voronoi decomposition for these atoms weight_MM The weights in LCAO without PAW corrections out If this is specified the result is written to here on rank 0 Returns ------- The matrix elements :math:`W_{nn'}` on root and ``None`` on other ranks """ if out is not None and self.comm.rank == 0: assert out.shape == (self.nn, self.nn), out.shape gather_P_ani = self.voronoi_lcao_gen.gather_P_ani if weight_MM is None: # It is permissible to let weight_MM be None. Then all projectors # must be on the root rank of the domain comm assert self.comm.rank != 0, 'Weights must not be None on non-root rank' assert gather_P_ani.partition.natoms == 0, gather_P_ani.partition.natoms v_nn = np.zeros((self.nn, self.nn), dtype=self.dtype) else: # Since we assume that all parallelization is over the domain # C_nM is only distributed over domain v_nn = calc_vt_nn(self.voronoi_lcao_gen.C_nM, weight_MM) calc_correction(proj_atoms, gather_P_ani, self.voronoi_lcao_gen.dS_aii, self.nn, out=v_nn) self.domain_comm.sum(v_nn) if self.comm.rank == 0: if out is not None: out[:] = v_nn return v_nn else: return None
@property def voronoi_lcao_gen(self) -> VoronoiLCAOWeights: return self._voronoi_lcao_gen @property def atom_projections(self) -> AtomProjectionsType: return self.voronoi_lcao_gen.atom_projections @property def nn(self) -> int: return self.voronoi_lcao_gen.nn @property def dtype(self) -> np.dtype: return self.voronoi_lcao_gen.dtype @property def nM(self) -> int: return self.voronoi_lcao_gen.nM @property def comm(self) -> Communicator: return self.voronoi_lcao_gen.comm @property def band_comm(self) -> Communicator: return self.voronoi_lcao_gen.band_comm @property def domain_comm(self) -> Communicator: return self.voronoi_lcao_gen.domain_comm @property def saved_fields(self) -> dict[str, Any]: return dict()
[docs] class VoronoiLCAOWeightCalculator(VoronoiLCAOWeights): r"""Loads Voronoi grid and calculates Voronoi weights from ground state file using LCAO basis function overlaps and PAW corrections. .. math:: W_{nn'} = \left<\psi_n|\hat{w}|\psi_{n'}\right> = \int w(\vec{r}) \psi_n^*(\vec{r}) \psi_{n'}(\vec{r}) d\vec{r} where the operator :math:`\hat{w} = w(\vec{r})` is 1 in the Voronoi region of the atomic projections and 0 outside. Parameters ---------- atom_projections List of atom groups (length ``Ni``). Each atom group is a list of integers (of any length) gpw_file Filename of GPAW ground state file """ _big_a_G: NDArray[np.int_] | None _a_G: NDArray[np.int_] | None _calc: GPAWCalculator def __init__(self, atom_projections: AtomProjectionsType, gpw_file: str, voronoi_grid_file: str | None = None, recalculate_grid: bool = False, domain: int = -1, use_pblas: bool = False, comm=None): assert all([isinstance(proj_atoms, list) or isinstance(proj_atoms, np.ndarray) for proj_atoms in atom_projections]) self._atom_projections = atom_projections if comm is None: comm = world self._comm = comm self._log = Logger(comm=comm) self.log('Start', rank=0, flush=True) if domain == -1: domain = self.comm.size calc = GPAW(gpw_file, txt=None, communicator=self.comm, parallel={'domain': domain}) calc.initialize_positions() self._calc = calc self._dS_aii = {a: setup.dO_ii for a, setup in enumerate(calc.wfs.setups)} # Same data on all ranks self._proj = self.calc.wfs.kpt_u[0].projections.toarraydict() # Load or calculate (and optionally save) Voronoi grid if recalculate_grid or voronoi_grid_file is None: self.log('compute a_g', rank=0, flush=True) big_a_G = calculate_a_g(fine_grid=False, calc=calc) if self.comm.rank == 0: if voronoi_grid_file is not None: if voronoi_grid_file[-4:] == '.npz': np.savez_compressed(voronoi_grid_file, a_G=big_a_G) else: np.save(voronoi_grid_file, big_a_G) self.log('Computed Voronoi decomposition grid', rank=0, flush=True) else: if self.comm.rank == 0: if voronoi_grid_file[-4:] == '.npz': files = np.load(voronoi_grid_file) big_a_G = files['a_G'] else: big_a_G = np.load(voronoi_grid_file) else: big_a_G = None self.log('Loaded Voronoi decomposition grid', rank=0, flush=True) self._big_a_G = big_a_G self._a_G = None @property def dtype(self) -> np.dtype: """ Global number of bands """ return self.C_nM.dtype @property def nn(self) -> int: """ Global number of bands """ return self.calc.wfs.bd.nbands @property def nM(self) -> int: """ Global number of bands """ return self.C_nM.shape[1] @property def calc(self) -> GPAWCalculator: return self._calc @property def C_nM(self): return self.calc.wfs.kpt_u[0].C_nM @property def P_ani(self): return self.calc.wfs.kpt_u[0].P_ani def _prepare_P(self): self._gather_P_ani_on_band_comm_root() def _gather_P_on_comm_root(self): # Do not use this one self.log('Gather P_ani to comm root', rank=0) gather_P_ani = proj_as_dict_on_master(self.calc.wfs.kpt_u[0].projections, 0, self.nn) if self.comm.rank != 0: gather_P_ani = None self._gather_P_ani = gather_P_ani self.log('Done gathering', rank=0) def _gather_P_ani_on_band_comm_root(self): self.log('Gather P_ani', rank=0) proj = self.calc.wfs.kpt_u[0].projections serial_proj = proj.new(bcomm=None) proj.matrix.redist(serial_proj.matrix) if self.band_comm.rank == 0: gather_P_ani = serial_proj.toarraydict() else: # Array dict with dimension n == 0 shape = proj.myshape[:-2] + (0, ) shapes = [shape + (nproj,) for nproj in proj.nproj_a] gather_P_ani = proj.atom_partition.arraydict(shapes, proj.matrix.array.dtype) self._gather_P_ani = gather_P_ani self.log('Done gathering', rank=0) def _prepare_C(self): # mynn is the number of bands on this rank mynn = len(self.C_nM) maxlocalnn = (self.nn + self.band_comm.size - 1) // self.band_comm.size assert mynn <= maxlocalnn # Distribute the LCAO coefficients and projectors into block cyclic form self.log('Distribute C_nM and P_ani', rank=0) dist_C_nM = redistribute_LCAO_into_block_cyclic_form(self.C_nM, self.nn, self.band_comm) self._dist_C_nM = dist_C_nM self.log('Done distributing', rank=0) @property def big_a_G(self): return self._big_a_G @property def a_G(self): """ Returns the voronoi decomposition on the coarse grid, distributed over domain_comm If this quantity is not stored, the voronoi decomposition on the master rank is distributed """ if self._a_G is None: self.distribute_a_G() return self._a_G
[docs] def distribute_a_G(self): """ Distribute the voronoi decomposition on the coarse grid over domain_comm Afterwards delete the non-distributed array """ domain_comm = self.domain_comm gd = self.calc.density.gd if domain_comm.rank == 0: assert self.big_a_G is not None, \ 'Full Voronoi grid must be given on rank 0 if distributed grid is not given' assert self.big_a_G.dtype == np.int16 # Now distribute to other domain ranks a_G = gd.zeros(dtype=np.int16) gd.distribute(self.big_a_G, a_G) self._a_G = a_G self._big_a_G = None
def __iter__(self) -> Iterator[NDArray[np.float64]]: """ Yields weights in LCAO basis. """ for proj_atoms in self.atom_projections: # assert isinstance(proj_atoms, list) or isinstance(proj_atoms, np.ndarray) weight_MM = self.calculate_voronoi_weights_LCAO_only(proj_atoms) self.log(f'Computed LCAO weights for projection {proj_atoms}', rank=0, flush=True) yield weight_MM def calculate_voronoi_weights_LCAO_only(self, proj_atoms: list[int] | NDArray[np.float64], out: NDArray[np.float64] | None = None) -> NDArray[np.float64]: if out is not None: assert out.shape == (self.nM, self.nM), out.shape w_G = np.where(np.isin(self.a_G, proj_atoms), 1.0, 0.0) v_MM = calc_vt_MM(self.calc, w_G) if out is not None: out[:] = v_MM return v_MM @property def domain_comm(self) -> Communicator: return self.calc.comms['d'] @property def band_comm(self) -> Communicator: return self.calc.comms['b'] @property def saved_fields(self) -> dict[str, Any]: return dict()
[docs] def calculate_a_g(gpw_file: str | None = None, fine_grid: bool = True, calc: GPAWCalculator | None = None): """Calculate Voronoi grid Parameters ---------- gpw_file Filename of ground state file. Ignored if calc is not None calc Initialized GPAW calculator fine_grid True if Voronoi grid on fine grid should be calculated, False if on coarse Returns ------- Voronoi grid as nx,ny,nz array """ calc_comm = world if calc is None: calc = GPAW(gpw_file, txt='/dev/null', communicator=calc_comm, parallel={'domain': calc_comm.size}) calc.initialize_positions() atoms = calc.get_atoms() if fine_grid: gd = calc.density.finegd else: gd = calc.density.gd a_g = wignerseitz(gd, atoms) a_g = a_g.astype(np.int16) # Storing as 16 bit integers reduces file size significantly big_g = gd.collect(a_g) return big_g
[docs] def calc_vt_MM(calc: GPAWCalculator, w_G: NDArray[np.float64]) -> NDArray[np.float64]: r""" Calculate LCAO basis function overlaps. .. math:: \tilde{v}_{\mu\nu} = \left<\tilde{\phi}_{\mu} | \hat{w} | \tilde{\phi}_{\nu}\right> = \int w(\vec{r}) \tilde{\phi}_{\mu}^*(\vec{r}) \tilde{\phi}_{\nu}(\vec{r}) d\vec{r} Parameters ---------- calc GPAW calculator object. w_G Operator w in real space (distributed on the domain of each rank). Should be 0 or 1 in each grid point. Returns ------- The contribution to :math:`\tilde{v}_{\mu\nu}` from this domain """ vt_MM = calc.wfs.basis_functions.calculate_potential_matrices(w_G)[0] tri2full(vt_MM) return vt_MM
[docs] def calc_vt_nn(C_nM: NDArray[np.float64], vt_MM: NDArray[np.float64]) -> NDArray[np.float64]: r""" Calculate pseudo KS wave function overlaps from LCAO overlaps. .. math:: \widetilde{W}_{nn'} = \left<\tilde{\psi}_n | \hat{w} | \tilde{\psi}_{n'}\right> = \sum_{\mu\nu} C_{\mu n}^* C_{\nu n'} \left<\tilde{\phi}_{\mu} | \hat{w} | \tilde{\phi}_{\nu}\right> = \sum_{\mu\nu} C_{\mu n}^* C_{\nu n'} \tilde{v}_{\mu\nu} Parameters ---------- C_nM The (transposed) LCAO coefficients :math:`C_{\mu n}` vt_MM The contribution to :math:`\tilde{v}_{\mu\nu}` on domain Returns ------- The contribution to :math:`\widetilde{W}_{nn'}` on this domain """ Wt_nn = C_nM @ vt_MM @ C_nM.T return Wt_nn
[docs] def calc_vt_nn_pblas(dist_C_nM, dist_vt_MM, dist_vt_nn): r""" See :func:`calc_vt_nn`. Parameters ---------- dist_C_nM LCAO coefficients, in block cyclic form dist_vt_MM The contribution to :math:\tilde{v}_{\mu\nu}` on domain, in block cyclic form dist_vt_nn Buffer in block cyclic form, into which the contribution to :math:`\widetilde{W}_{nn}` on this domain will be written """ dist_buf_nM = dist_C_nM.new() # buf_nM <- C_nM @ vt_MM dist_C_nM.dist.multiply(1.0, dist_C_nM, 'N', dist_vt_MM, 'N', 0.0, dist_buf_nM, symmetric=False) # v_nn <- buf_nM @ C_nM.T dist_C_nM.dist.multiply(1.0, dist_buf_nM, 'N', dist_C_nM, 'T', 0.0, dist_vt_nn, symmetric=False)
[docs] def calc_correction(correction_on_atoms: list[int] | NDArray[np.float64], P_ani: dict[int, NDArray[np.float64]] | ArrayDict, dS_aii: dict[int, NDArray[np.float64]], Nn: int, out: NDArray[np.float64] | None = None) -> NDArray[np.float64]: r""" Calculate the PAW correction to KS wave function overlaps on this domain. .. math:: c_{nn'} = \sum_a \left<\tilde{\psi}_n | \tilde{p}_i^a\right> \Delta S_{ij}^a \left<\tilde{p}_j^a | \tilde{\psi}_{n'}\right>, where the sum over atoms only includes a selection of atoms. Parameters ---------- correction_on_atoms Which atoms to loop over P_ani The matrix of projectors :math:`\left<\tilde{p}_i^a | \tilde{\psi}_{n}\right>` dS_aii :math:`\Delta S_{ij}^a` Nn Global number of bands out If this is specified the result is appended to here Returns ------- :math:`c_{nn'}'` """ if out is None: corr_nn = np.zeros((Nn, Nn)) else: assert out.shape == (Nn, Nn) corr_nn = out for a in P_ani.keys(): if a not in correction_on_atoms: continue corr_nn += P_ani[a] @ dS_aii[a] @ P_ani[a].T return corr_nn