from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Iterator, Sequence, Union
import numpy as np
from numpy.typing import NDArray
from gpaw import GPAW
from gpaw.analyse.wignerseitz import wignerseitz
from gpaw.io import Writer
from gpaw.mpi import broadcast, world
from gpaw.utilities.tools import tri2full
from .utils import Logger, parulmopen, ParallelMatrix
from .typing import Communicator, GPAWCalculator
AtomProjectionsType = Sequence[Union[list[int], NDArray[np.float64]]] # | breaks on py3.9
[docs]
class VoronoiWeights(ABC):
""" Abstract base class for Voronoi weights in Kohn-Sham basis. """
def __init__(self,
comm: Communicator | None = None,
log: Logger | None = None):
if comm is None:
comm = world
if log is None:
log = Logger()
self._comm = comm
self._log = log
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
pass
def __len__(self) -> int:
""" Number of projections. """
return self.nproj
@abstractmethod
def __iter__(self) -> Iterator[NDArray[np.float64] | None]:
""" Iteratively yield Voronoi weights for each projection.
Yields
------
Matrix of Voronoi weights on root rank, ``None`` on other ranks.
"""
raise NotImplementedError
def __str__(self) -> str:
lines = [f'{self.__class__.__name__} for ground state with {self.nn} bands',
f'{self.nproj} projections:']
for i, atoms in enumerate(self.atom_projections):
atomsstr = str(atoms)
if len(atomsstr) > 50:
atomsstr = atomsstr[:47] + '...'
lines.append(f' #{i:<3.0f}: On atoms {atomsstr}')
return '\n'.join(lines)
@property
def root(self) -> bool:
""" Whether this rank is the root rank. """
return self.comm.rank == 0
@property
def log(self) -> Logger:
""" Logger object. """
return self._log
@property
@abstractmethod
def atom_projections(self) -> AtomProjectionsType:
""" Atom projections. """
raise NotImplementedError
@property
@abstractmethod
def nn(self) -> int:
""" Number of bands. """
raise NotImplementedError
@property
def nproj(self) -> int:
""" Number of projections. """
return len(self.atom_projections)
@property
def comm(self) -> Communicator:
""" MPI Communicator. """
return self._comm
@property
@abstractmethod
def saved_fields(self) -> dict[str, Any]:
""" Saved data fields associated with the object.
If this object is read from a ULM file there might be some extra
data in that file. Return that data, or an empty dict if there is none.
"""
raise NotImplementedError
[docs]
class VoronoiWeightCalculator(VoronoiWeights):
r"""Calculate Voronoi weights in the basis of ground state Kohn-Sham orbitals.
For each atomic projection, calculates
.. math::
W_{nn'}
= \left<\psi_n | \hat{w} | \psi_{n'}\right>
= \left<\tilde{\psi}_n | \hat{w} | \tilde{\psi}_{n'}\right>
+ \sum_{aij} \left<\tilde{\psi}_n | \tilde{p}_i^a\right>
\Delta S_{ij}^a \left<\tilde{p}_i^a | \tilde{\psi}_{n'}\right>
where the operator :math:`\hat{w} = w(\vec{r})` is 1 in the
Voronoi region of atoms and 0 outside.
The sum over atoms is restricted to atoms in the region :math:`w(\vec{r}) = 1`.
The PAW projectors
.. math::
P_{ni}^a = \left<\tilde{p}_i^a | \tilde{\psi}_n \right>
and overlap matrix
.. math::
\Delta S_{ij}^a
= \left<\phi_i^a|\phi_j^a\right>
- \left<\tilde{\phi}_i^a|\tilde{\phi}_j^a\right>
are obtained from the GPAW calculator and the smooth weight matrix is obtained by
a basis change of the smooth LCAO weights
.. math::
\left<\tilde{\psi}_n | \hat{w} | \tilde{\psi}_{n'}\right> =
\sum_{\mu\nu} C_{n\mu} C_{n'\nu}
\left<\tilde{\phi}_\mu | \hat{w} | \tilde{\phi}_{\nu}\right>.
Parameters
----------
voronoi_lcao
Object that calculates or loads the LCAO weights from file.
"""
_voronoi_lcao: VoronoiLCAOWeights
def __init__(self,
voronoi_lcao: VoronoiLCAOWeights):
assert isinstance(voronoi_lcao, VoronoiLCAOWeights)
self._voronoi_lcao = voronoi_lcao
super().__init__(comm=voronoi_lcao.comm, log=voronoi_lcao.log)
def __iter__(self) -> Iterator[NDArray[np.float64] | None]:
for proj_atoms, weight_MM in zip(self.atom_projections, self.voronoi_lcao):
nM = self.voronoi_lcao.nM
nn = self.voronoi_lcao.nn
# Transform to KS basis C_nM @ weight_MM @ C_nM.T
w_MM = ParallelMatrix((nM, nM), np.float64, comm=self.comm,
array=weight_MM)
C_nM = ParallelMatrix((nn, nM), np.float64, comm=self.comm,
array=self.voronoi_lcao.C_nM)
_v_nn = C_nM @ w_MM @ C_nM.T
self.log('Transformed weights to KS basis', flush=True, who='Voronoi', rank=0)
# Calculate PAW corrections on the root rank
if self.root:
dS_aii = self.voronoi_lcao.dS_aii
P_ani = self.voronoi_lcao.P_ani
v_nn = _v_nn.array
assert dS_aii is not None
assert P_ani is not None
for a, P_ni in P_ani.items():
if a not in proj_atoms:
continue
v_nn += P_ni @ dS_aii[a] @ P_ni.T
self.log('Added PAW corrections to weights', flush=True, who='Voronoi', rank=0)
yield v_nn
else:
yield None
@property
def voronoi_lcao(self) -> VoronoiLCAOWeights:
return self._voronoi_lcao
@property
def atom_projections(self) -> AtomProjectionsType:
return self.voronoi_lcao.atom_projections
@property
def nn(self) -> int:
return self.voronoi_lcao.nn
@property
def nM(self) -> int:
return self.voronoi_lcao.nM
@property
def comm(self) -> Communicator:
return self.voronoi_lcao.comm
@property
def saved_fields(self) -> dict[str, Any]:
return dict()
[docs]
def calculate_and_write(self,
out_fname: str,
write_extra: dict[str, Any] = dict()):
""" Calculate the Voronoi weights in the Kohn-Sham basis and write to file.
The weights are saved in a numpy archive if the file extension is ``.npz``
or in a ULM file if the file extension is ``.ulm``.
Parameters
----------
out_fname
File name of the resulting data file.
write_extra
Dictionary of additional data written to the file.
"""
to_be_written = dict()
if world.rank == 0:
to_be_written.update(self.saved_fields)
to_be_written.update(write_extra)
if out_fname.endswith('.npz'):
# Calculate weights
weight_inn = list(self)
# Write on root ranks
if self.root:
return
to_be_written['atom_projections'] = atom_projections_to_numpy(self.atom_projections)
np.savez(out_fname, weight_inn=np.array(weight_inn), **to_be_written)
elif out_fname.endswith('.ulm'):
with Writer(out_fname, world, mode='w', tag='Voronoi') as writer:
writer.write(version=1)
writer.write('atom_projections', self.atom_projections)
writer.write(**to_be_written)
writer.add_array('weight_inn', (self.nproj, self.nn, self.nn), dtype=float)
# Calculate weights
for weight_nn in self:
# Write on root (DummyWriter on other ranks)
writer.fill(weight_nn)
else:
raise ValueError(f'output-file must have ending .npz or .ulm, is {out_fname}')
self.log(f'Weights written to {out_fname}', flush=True, who='Voronoi', rank=0)
world.barrier()
[docs]
@classmethod
def from_gpw(cls,
atom_projections: AtomProjectionsType,
gpw_file: str,
voronoi_grid: VoronoiGrid | None | str = None,
comm: Communicator | None = None) -> VoronoiWeightCalculator:
"""
Set up calculation from GPAW file.
Parameters
----------
atom_projections
List of atom groups. Each atom group is a list of integers (of any length).
gpw_file
Filename of GPAW ground state file.
voronoi_grid
Voronoi grid, or ``None`` to calculate it, or a filename to read it from file.
comm
Communicator.
"""
voronoi_lcao = VoronoiLCAOWeightCalculator(
atom_projections=atom_projections,
gpw_file=gpw_file,
comm=comm)
return cls(voronoi_lcao)
[docs]
@classmethod
def from_lcao_file(cls,
voronoi_lcao_file: str,
comm: Communicator | None = None) -> VoronoiWeightCalculator:
"""
Set up calculation from file of already computed weights in LCAO basis.
Parameters
----------
voronoi_lcao_file
Filename of file with Voronoi weights in LCAO basis.
comm
Communicator.
"""
voronoi_lcao = VoronoiLCAOReader(voronoi_lcao_file, comm=comm)
return cls(voronoi_lcao)
[docs]
class VoronoiReader(VoronoiWeights):
""" Read Voronoi weights from ulm file.
Parameters
----------
ulm_fname
Filename.
comm
GPAW MPI communicator object. Defaults to world.
"""
_nn: int
_atom_projections: AtomProjectionsType
def __init__(self,
ulm_fname: str,
comm: Communicator | None = None):
super().__init__(comm=comm)
self.ulm_fname = ulm_fname
self.reader = parulmopen(self.ulm_fname, self.comm)
# Read size
if self.root:
weight_inn = self.reader.proxy('weight_inn')
assert weight_inn.dtype is np.dtype(float)
assert weight_inn.shape[1] == weight_inn.shape[2]
atom_projections = self.reader.atom_projections
nn = weight_inn.shape[1]
brdcast = (atom_projections, nn)
else:
brdcast = None
brdcast = broadcast(brdcast, root=0, comm=self.comm)
self._atom_projections, self._nn = brdcast # type: ignore
@property
def atom_projections(self) -> AtomProjectionsType:
return self._atom_projections
@property
def nn(self) -> int:
return self._nn
def __exit__(self, exc_type, exc_value, tb):
self.reader.close()
def __iter__(self) -> Iterator[NDArray[np.float64] | None]:
for i in range(len(self)):
if self.root:
weight_nn = self.reader.proxy('weight_inn', i)[:]
else:
weight_nn = None
yield weight_nn
@property
def saved_fields(self) -> dict[str, Any]:
""" Saved data fields associated with the object
If this object is read from a ULM file there might be some extra
data in that file. Return that data, or an empty dict if there is none.
"""
data = {key: getattr(self.reader, key) for key in self.reader.keys()
if key not in ['weight_inn', 'atom_projections']}
return data
[docs]
class EmptyVoronoiWeights(VoronoiWeights):
""" Object representing a lack of weights. """
def __iter__(self):
return
yield
@property
def atom_projections(self):
return []
@property
def nn(self):
return 0
@property
def saved_fields(self):
return {}
[docs]
class VoronoiLCAOWeights(ABC):
""" Abstract base class for Voronoi weights in LCAO basis. """
_atom_projections: AtomProjectionsType
_comm: Communicator
_log: Logger
_dS_aii: Any
_P_ani: Any
_C_nM: Any
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
pass
def __len__(self) -> int:
""" Return the number of projections. """
return self.nproj
@abstractmethod
def __iter__(self) -> Iterator[NDArray[np.float64] | None]:
""" Iteratively yield Voronoi weights in the LCAO basis for each projection.
Yields
------
Matrix of Voronoi weights on root rank, ``None`` on other ranks.
"""
raise NotImplementedError
@property
def log(self) -> Logger:
return self._log
[docs]
def log_parallel(self, *args, **kwargs) -> Logger:
""" Log message with communicator information. """
return self._log(*args, **kwargs, comm=self.comm, who='Voronoi')
@property
@abstractmethod
def nn(self) -> int:
""" Number of bands. """
raise NotImplementedError
@property
@abstractmethod
def nM(self) -> int:
""" Number of basis functions. """
raise NotImplementedError
@property
def nproj(self) -> int:
""" Number of atomic projections """
return len(self.atom_projections)
@property
def atom_projections(self) -> AtomProjectionsType:
""" List of atom projections. """
return self._atom_projections
@property
def calc(self) -> GPAWCalculator | None:
""" GPAW calculator instance. """
return None
@property
def comm(self) -> Communicator:
""" MPI Communicator. """
return self._comm
@property
def root(self) -> bool:
""" Whether this rank is the root rank. """
return self.comm.rank == 0
@property
def C_nM(self) -> NDArray[np.float64] | None:
""" LCAO wave function coefficients on root rank, ``None`` on other ranks. """
return self._C_nM if self.root else None
@property
def P_ani(self) -> dict[int, NDArray[np.float64]] | None:
r""" PAW projectors :math:`P_{ni}^a` on the root rank, ``None`` on other ranks.
.. math::
P_{ni}^a = \left<\tilde{p}_i^a | \tilde{\psi}_n \right>
"""
return self._P_ani if self.root else None
@property
def dS_aii(self) -> dict[int, NDArray[np.float64]] | None:
r""" Overlap matrix PAW corrections :math:`\Delta S_{ij}^a` on the root rank, ``None`` on other ranks.
.. math::
\Delta S_{ij}^a
= \left<\phi_i^a|\phi_j^a\right>
- \left<\tilde{\phi}_i^a|\tilde{\phi}_j^a\right>
"""
return self._dS_aii if self.root else None
@property
@abstractmethod
def saved_fields(self) -> dict[str, Any]:
""" Saved data fields associated with the object.
If this object is read from a ULM file there might be some extra
data in that file. Return that data, or an empty dict if there is none.
"""
raise NotImplementedError
@property
def arrays(self) -> dict[str, Any]:
if self.comm.rank != 0:
return {}
arrays = dict(P_ani=self.P_ani,
C_nM=self.C_nM,
dS_aii=self.dS_aii)
return arrays
[docs]
class VoronoiLCAOReader(VoronoiLCAOWeights):
""" Read Voronoi weights in the LCAO basis from ULM file.
Parameters
----------
ulm_fname
Filename.
comm
GPAW MPI communicator object. Defaults to world.
"""
def __init__(self,
ulm_fname: str,
comm=None):
self.ulm_fname = ulm_fname
if comm is None:
comm = world
self._log = Logger()
self._comm = comm
self.reader = parulmopen(self.ulm_fname, self.comm)
self._dS_aii: dict[int, NDArray[np.float64]] | None
self._P_ani: dict[int, NDArray[np.float64]] | None
self._C_nM = NDArray[np.float64] | None
# Read arrays on root rank
if self.root:
weight_iMM = self.reader.proxy('weight_iMM')
assert weight_iMM.dtype is np.dtype(float)
assert weight_iMM.shape[1] == weight_iMM.shape[2]
self._dS_aii = self.reader.dS_aii
self._P_ani = self.reader.P_ani
self._C_nM = self.reader.C_nM[:]
atom_projections = self.reader.atom_projections
brdcast = (atom_projections, ) + self._C_nM.shape
else:
brdcast = None
self._C_nM = None
self._P_ani = None
self._dS_aii = None
# Broadcast atom projections and number of bands and basis functions
brdcast = broadcast(brdcast, root=0, comm=self.comm)
self._atom_projections, self._nn, self._nM = brdcast # type: ignore
def __exit__(self, exc_type, exc_value, tb):
self.reader.close()
@property
def nn(self) -> int:
return self._nn
@property
def nM(self) -> int:
return self._nM
def __iter__(self) -> Iterator[NDArray[np.float64]]:
for i in range(len(self)):
if self.root:
weight_MM = self.reader.proxy('weight_iMM', i)[:]
else:
weight_MM = None
yield weight_MM
@property
def saved_fields(self) -> dict[str, Any]:
data = {key: getattr(self.reader, key) for key in self.reader.keys()
if key not in ['weight_iMM', 'atom_projections',
'dS_aii', 'P_ani', 'C_nM']}
return data
[docs]
class VoronoiLCAOWeightCalculator(VoronoiLCAOWeights):
r"""Calculate Voronoi weights in LCAO basis.
For each atomic projection, calculates
.. math::
\tilde{v}_{\mu\nu}
= \left<\tilde{\phi}_{\mu} | \hat{w} | \tilde{\phi}_{\nu}\right>
= \int w(\vec{r}) \tilde{\phi}_{\mu}^*(\vec{r}) \tilde{\phi}_{\nu}(\vec{r}) d\vec{r}
where the operator :math:`\hat{w} = w(\vec{r})` is 1 in the
Voronoi region of the atomic projections and 0 outside, and :math:`\tilde{\phi}_{\nu}` are
the smooth LCAO basis functions.
Parameters
----------
atom_projections
List of atom groups. Each atom group is a list of integers (of any length).
gpw_file
Filename of GPAW ground state file.
voronoi_grid
Voronoi grid, or ``None`` to calculate it, or a filename to read it from file.
domain
Domain size.
comm
Communicator.
"""
_calc: GPAWCalculator
def __init__(self,
atom_projections: AtomProjectionsType,
gpw_file: str,
voronoi_grid: VoronoiGrid | None | str = None,
domain: int = -1,
comm: Communicator | None = None):
assert all([isinstance(proj_atoms, list) or isinstance(proj_atoms, np.ndarray)
for proj_atoms in atom_projections])
self._atom_projections = atom_projections
if comm is None:
comm = world
self._comm = comm
self._log = Logger()
if voronoi_grid is None:
voronoi_grid = VoronoiGridCalculator()
if isinstance(voronoi_grid, str):
voronoi_grid = VoronoiGridReader(voronoi_grid)
self.voronoi_grid = voronoi_grid
if domain == -1:
domain = self.comm.size
# Set up GPAW calculator
calc = GPAW(gpw_file, txt=None, communicator=self.comm, parallel={'domain': domain})
calc.initialize_positions()
self.log('Loaded and initialized GPAW', rank=0, flush=True, who='Voronoi')
self._calc = calc
# Collect wave functions, projectors and overlap to the root rank
C_nM = calc.wfs.collect_array('C_nM', k=0, s=0)
P_ani = proj_as_dict_on_master(self.calc.wfs.kpt_u[0].projections, 0, self.nn)
dS_aii = {a: setup.dO_ii for a, setup in enumerate(calc.wfs.setups)}
self._C_nM = C_nM if self.root else None
self._P_ani = P_ani if self.root else None
self._dS_aii = dS_aii if self.root else None
@property
def nn(self) -> int:
return self.calc.wfs.bd.nbands
@property
def nM(self) -> int:
return self.calc.wfs.setups.nao
@property
def calc(self) -> GPAWCalculator:
return self._calc
def __iter__(self) -> Iterator[NDArray[np.float64] | None]:
# Get the Voronoi grid
a_G = self.voronoi_grid.a_G(self.calc, self.log)
for proj_atoms in self.atom_projections:
w_G = np.where(np.isin(a_G, proj_atoms), 1.0, 0.0)
if self.calc.comms['b'].rank != 0:
# The band comm ranks compute the same information
return None
# Calculate the weights on the domain communicator
weight_MM = self.calc.wfs.basis_functions.calculate_potential_matrices(w_G)[0]
tri2full(weight_MM)
# Sum to the root rank
self.calc.comms['d'].sum(weight_MM, root=0)
self.log(f'Computed LCAO weights for projection {proj_atoms}', rank=0, flush=True, who='Voronoi')
if self.root:
yield weight_MM
else:
yield None
@property
def saved_fields(self) -> dict[str, Any]:
return dict()
[docs]
def calculate_and_write(self,
out_fname: str,
write_extra: dict[str, Any] = dict()):
""" Calculate the Voronoi weights in the LCAO basis and write to file.
The weights are saved in a numpy archive if the file extension is ``.npz``
or in a ULM file if the file extension is ``.ulm``.
Parameters
----------
out_fname
File name of the resulting data file.
write_extra
Dictionary of additional data written to the file.
"""
to_be_written = dict()
if world.rank == 0:
to_be_written.update(self.saved_fields)
to_be_written.update(self.arrays)
to_be_written.update(write_extra)
if out_fname.endswith('.npz'):
# Calculate weights
weight_iMM = list(self)
# Write on root ranks
if self.root:
return
to_be_written['atom_projections'] = atom_projections_to_numpy(self.atom_projections)
np.savez(out_fname, weight_iMM=np.array(weight_iMM), **to_be_written)
elif out_fname.endswith('.ulm'):
with Writer(out_fname, self.comm, mode='w', tag='Voronoi') as writer:
writer.write(version=1)
writer.write('atom_projections', self.atom_projections)
writer.write(**to_be_written)
writer.add_array('weight_iMM', (self.nproj, self.nM, self.nM), dtype=float)
# Calculate weights
for weight_MM in self:
# Write on root (DummyWriter on other ranks)
writer.fill(weight_MM)
else:
raise ValueError(f'output-file must have ending .npz or .ulm, is {out_fname}')
self.log(f'Written weights in LCAO basis to {out_fname}', flush=True, who='Voronoi', rank=0)
world.barrier()
[docs]
class VoronoiGrid(ABC):
[docs]
@abstractmethod
def a_G(self,
calc: GPAWCalculator,
log: Logger) -> NDArray[np.int_]:
""" Voronoi grid (on the coarse grid of the GPAW calculator)
distributed on the domain communicator.
Each element in the grid is an integer corresponding to the closest atom.
Parameters
----------
GPAW calculator.
"""
raise NotImplementedError
[docs]
def write(self,
filename: str,
calc: GPAWCalculator,
log: Logger):
""" Write grid to file.
Parameters
----------
filename
Filename.
calc
GPAW calculator.
"""
filename = str(filename)
# Calculate grid and collect to root
big_a_G = calc.density.gd.collect(self.a_G(calc, log))
if filename.endswith('.npz'):
np.savez_compressed(filename, a_G=big_a_G)
else:
np.save(filename, big_a_G)
log(f'Written Voronoi grid to {filename}', who='Voronoi', flush=True, rank=0)
[docs]
class VoronoiGridReader(VoronoiGrid):
""" Read Voronoi grid from file. """
def __init__(self,
filename: str):
self.filename = filename
[docs]
def a_G(self,
calc: GPAWCalculator,
log: Logger) -> NDArray[np.int_]:
# Read the grid on the domain communicator root
domain_comm = calc.comms['d']
if domain_comm.rank == 0:
if self.filename.endswith('.npz'):
files = np.load(self.filename)
big_a_G = files['a_G']
else:
big_a_G = np.load(self.filename)
assert big_a_G.dtype == np.int16
# Distribute grid across domain communicator
gd = calc.density.gd
a_G = gd.zeros(dtype=np.int16)
gd.distribute(big_a_G if domain_comm.rank == 0 else None, a_G)
log('Loaded Voronoi grid', rank=0, flush=True, who='Voronoi')
return a_G
[docs]
class VoronoiGridCalculator(VoronoiGrid):
""" Calculate the Voronoi grid. """
[docs]
def a_G(self,
calc: GPAWCalculator,
log: Logger) -> NDArray[np.int_]:
log('Computing Voronoi grid', rank=0, flush=True, who='Voronoi')
atoms = calc.get_atoms()
a_G = wignerseitz(calc.density.gd, atoms)
a_G = a_G.astype(np.int16)
log('Computed Voronoi grid', rank=0, flush=True, who='Voronoi')
return a_G
def atom_projections_to_numpy(atom_projections: AtomProjectionsType) -> NDArray[np.int_]:
Ni = len(atom_projections)
if Ni == 0:
Nj = 0
else:
Nj = max([len(proj_atoms) for proj_atoms in atom_projections])
atom_projections_ij = np.full((Ni, Nj), -1, dtype=int)
for i, proj_atoms in enumerate(atom_projections):
na = len(proj_atoms)
atom_projections_ij[i, :na] = proj_atoms
return atom_projections_ij
[docs]
def proj_as_dict_on_master(proj, n1: int, n2: int) -> dict[int, NDArray[np.float64]]:
""" Collect the projectors to a dictionary on the root rank."""
# In newer versions of GPAW this is proj.as_dict_on_master
P_nI = proj.collect()
if P_nI is None:
return {}
I1 = 0
P_ani = {}
for a, ni in enumerate(proj.nproj_a):
I2 = I1 + ni
P_ani[a] = P_nI[n1:n2, I1:I2]
I1 = I2
return P_ani