from __future__ import annotations
import numpy as np
from numpy.typing import NDArray
from typing import Generator, Sequence
from ase.units import Bohr
from gpaw import GPAW
from gpaw.grid_descriptor import GridDescriptor
from gpaw.lcaotddft.densitymatrix import get_density
from .base import BaseObservableCalculator
from ..typing import GPAWCalculator
from ..density_matrices.base import BaseDensityMatrices, WorkMetadata
from ..voronoi import VoronoiWeights
from ..utils import ArrayIsOnRootRank, DistributedArray, ResultKeys, Result
class DensityCalculator(BaseObservableCalculator):
r""" Obtain induced density from frequency or pulse response density matrices.
The induced density (i.e. the density minus the ground state density) is to first
order given by
.. math::
\delta n(\boldsymbol{r}) = -2 \sum_{ia}^\text{eh}
n_{ia}(\boldsymbol{r}) \mathrm{Re}\:\delta\rho_{ia}
plus PAW corrections, where :math:`n_{ia}(\boldsymbol{r})` is the density of
ground state Kohn-Sham pair :math:`ia`
.. math::
n_{ia}(\boldsymbol{r}) = \psi^{(0)}_i(\boldsymbol{r}) \psi^{(0)}_a(\boldsymbol{r}).
Filename of GPAW ground state file
Object that gives the density matrix in the time or freqency domain
Filters for occupied states (holes). Provide a list of tuples (low, high)
to compute the density of holes with energies within the interval low-high.
Filters for unoccupied states (electrons). Provide a list of tuples (low, high)
to compute the density of excited electrons with energies within the interval low-high.
def __init__(self,
gpw_file: str,
density_matrices: BaseDensityMatrices,
filter_occ: Sequence[tuple[float, float]] = [],
filter_unocc: Sequence[tuple[float, float]] = []):
self._density_matrices = density_matrices
self._energies_occ = np.array([0])
self._energies_unocc = np.array([0])
self._sigma = 0
self._occ_filters = [self._build_single_filter('o', low, high) for low, high in filter_occ]
self._unocc_filters = [self._build_single_filter('u', low, high) for low, high in filter_unocc]
self._calc = GPAW(gpw_file, txt=None, communicator=self.calc_comm,
parallel={'domain': self.calc_comm.size})
msg = f'Loaded and init GPAW in {self.log.elapsed("load_gpaw"):.1f}s'
self.calc.initialize_positions() # Initialize in order to calculate density
msg += f'+{self.log.elapsed("init_gpaw"):.1f}s'
if self.calc_comm.rank == 0:
self.ksd.density = self.calc.density
def gdshape(self) -> tuple[int, int, int]:
""" Shape of the real space grid """
shape = tuple(int(N) - 1 for N in self.N_c)
return shape # type: ignore
def gd(self) -> GridDescriptor:
""" Real space grid """
return self.ksd.density.finegd
def N_c(self) -> NDArray[np.int_]:
""" Number of points in each Cartesian direction of the grid """
def cell_cv(self) -> NDArray[np.float64]:
""" Cell vectors """
return * Bohr
def voronoi(self) -> VoronoiWeights:
""" Voronoi weights object """
raise NotImplementedError
def occ_filters(self) -> list[slice]:
""" List of energy filters for occupied states """
return self._occ_filters
def unocc_filters(self) -> list[slice]:
""" List of energy filters for unoccupied states """
return self._unocc_filters
def calc(self) -> GPAWCalculator:
""" GPAW calculator instance """
return self._calc
def get_result_keys(self) -> ResultKeys:
noccf = len(self.occ_filters)
nunoccf = len(self.unocc_filters)
resultkeys = ResultKeys()
resultkeys.add_key('rho_g', self.gdshape)
resultkeys.add_key('occ_rho_g', self.gdshape)
resultkeys.add_key('unocc_rho_g', self.gdshape)
resultkeys.add_key('occ_rho_rows_fg', (noccf, ) + self.gdshape)
resultkeys.add_key('unocc_rho_rows_fg', (nunoccf, ) + self.gdshape)
resultkeys.add_key('occ_rho_diag_fg', (noccf, ) + self.gdshape)
resultkeys.add_key('unocc_rho_diag_fg', (nunoccf, ) + self.gdshape)
return resultkeys
def _find_limit(self,
lim: float) -> int:
""" Find the first eigenvalue larger than lim
Threshold value in eV
The index of the first eigenvalue larger than lim.
Returns len(eig_n) if lim is larger than all eigenvalues
if lim > self.eig_n[-1]:
return len(self.eig_n)
return int(np.argmax(self.eig_n > lim))
def _build_single_filter(self,
key: str,
low: float,
high: float) -> slice:
imin, imax, amin, amax = self.ksd.ialims()
if key == 'o':
nlow = min(self._find_limit(low), imax) - imin
nhigh = min(self._find_limit(high), imax) - imin
elif key == 'u':
nlow = min(self._find_limit(low), amax) - amin
nhigh = min(self._find_limit(high), amax) - amin
raise RuntimeError(f'Unknown key {key}. Key must be "o" or "u"')
return slice(nlow, nhigh)
def get_density(self,
rho_ia: DistributedArray,
n1: slice,
n2: slice,
fltn1: slice | NDArray[np.bool_] = slice(None),
fltn2: slice | NDArray[np.bool_] = slice(None),
u: int = 0) -> DistributedArray:
""" Calculate a real space density from a density matrix in the Kohn-Sham basis.
Distributed array with the density in real space on the root rank
C0_nM = self.ksd.C0_unM[u]
nM = C0_nM.shape[1]
if self.calc_comm.rank == 0:
assert np.issubdtype(rho_ia.dtype, float)
# Filter
rho_ia = rho_ia[fltn1][:, fltn2]
C0_n1M = C0_nM[n1][fltn1]
C0_n2M = C0_nM[n2][fltn2]
# Sum for sanity check
total = np.trace(rho_ia)
# Transform to LCAO basis
rho_MM = C0_n1M.T @ rho_ia @ C0_n2M.conj()
rho_MM = 0.5 * (rho_MM + rho_MM.T)
assert isinstance(rho_ia, ArrayIsOnRootRank)
rho_MM = np.zeros((nM, nM), dtype=float)
self.calc_comm.broadcast(rho_MM, 0)
msg = f'Transformed DM and constructed density in {self.log.elapsed("transform_dm"):.1f}s'
rho_g = get_density(rho_MM, self.calc.wfs, self.calc.density, u=u)
msg += f'+{self.log.elapsed("get_density"):.1f}s'
if self.calc_comm.rank == 0:
integ =
if self.calc_comm.rank == 0:
rerr = np.abs(integ - total) / total
if False:
self.log(f'Relative error: {rerr}')
big_rho_g =
if self.calc_comm.rank == 0:
return big_rho_g
return ArrayIsOnRootRank()
def icalculate(self) -> Generator[tuple[WorkMetadata, Result], None, None]:
""" Iteratively calculate results. The results include the total induced density,
and the densities of electrons and holes, optionally decomposed by `filter_occ`
and `filter_unocc`.
Tuple (work, result) on the root rank of the calculation communicator:
An object representing the metadata (time, frequency or pulse) for the work done
Object containg the calculation results for this time, frequency or pulse
Yields nothing on non-root ranks of the calculation communicator.
# Iterate over the pulses and times, or frequencies
for work, dm in self.density_matrices:
Q_ia = dm.Q_ia
P_ia = dm.P_ia
rho_ia = dm.rho_ia # Read these now, so non calc_comm root ranks can continue
# Holes
M_ii = 0.5 * (Q_ia @ Q_ia.T + P_ia @ P_ia.T)
# Electrons
M_aa = 0.5 * (Q_ia.T @ Q_ia + P_ia.T @ P_ia)
imin, imax, amin, amax = self.ksd.ialims()
islice = slice(imin, imax + 1)
aslice = slice(amin, amax + 1)
rho_g = self.get_density(rho_ia.real, islice, aslice)
occ_rho_g = self.get_density(M_ii, islice, islice)
unocc_rho_g = self.get_density(M_aa, aslice, aslice)
occ_rho_rows_fg = np.array([self.get_density(M_ii, islice, islice, fltn1=flt)
for flt in self.occ_filters])
occ_rho_diag_fg = np.array([self.get_density(M_ii, islice, islice, fltn1=flt, fltn2=flt)
for flt in self.occ_filters])
unocc_rho_rows_fg = np.array([self.get_density(M_aa, aslice, aslice, fltn1=flt)
for flt in self.unocc_filters])
unocc_rho_diag_fg = np.array([self.get_density(M_aa, aslice, aslice, fltn1=flt, fltn2=flt)
for flt in self.unocc_filters])
if dm.rank > 0:
result = Result()
result['rho_g'] = rho_g * Bohr ** -3
result['occ_rho_g'] = occ_rho_g * Bohr ** -3
result['unocc_rho_g'] = unocc_rho_g * Bohr ** -3
result['occ_rho_rows_fg'] = occ_rho_rows_fg * Bohr ** -3
result['occ_rho_diag_fg'] = occ_rho_diag_fg * Bohr ** -3
result['unocc_rho_rows_fg'] = unocc_rho_rows_fg * Bohr ** -3
result['unocc_rho_diag_fg'] = unocc_rho_diag_fg * Bohr ** -3
self.log(f'Calculated density in {self.log.elapsed("calculate"):.2f}s '
f'for {work.desc}', flush=True)
yield work, result
if self.calc_comm.rank == 0:
self.log('Finished calculating density contributions', flush=True)