from __future__ import annotations
from pathlib import Path
import numpy as np
from numpy.typing import NDArray
from typing import Any, Generator, Sequence, Collection
from ase.units import Bohr
from gpaw import GPAW
from gpaw.grid_descriptor import GridDescriptor
from gpaw.lcaotddft.densitymatrix import get_density
from .base import BaseObservableCalculator
from ..typing import GPAWCalculator
from ..density_matrices.base import WorkMetadata
from ..density_matrices.frequency import FrequencyDensityMatrixMetadata
from ..density_matrices.time import ConvolutionDensityMatrixMetadata
from ..perturbation import PerturbationLike
from ..response import BaseResponse
from ..typing import ArrayIsOnRootRank, DistributedArray
from ..utils import ResultKeys, Result, get_gaussian_pulse_values
[docs]
class DensityCalculator(BaseObservableCalculator):
r""" Calculate densities in the time or frequency domain.
The induced density (i.e. the density minus the ground state density) is to first
order given by
.. math::
\delta n(\boldsymbol{r}) = -2 \sum_{ia}^\text{eh}
n_{ia}(\boldsymbol{r}) \mathrm{Re}\:\delta\rho_{ia}
plus PAW corrections, where :math:`n_{ia}(\boldsymbol{r})` is the density of
ground state Kohn-Sham pair :math:`ia`
.. math::
n_{ia}(\boldsymbol{r}) = \psi^{(0)}_i(\boldsymbol{r}) \psi^{(0)}_a(\boldsymbol{r}).
In the time domain, electrons and holes densities can be computed.
.. math::
\begin{align}
n^\text{holes}(\boldsymbol{r}) &= \sum_{ii'}
n_{ii'}(\boldsymbol{r}) \delta\rho_{ii'} \\
n^\text{electrons}(\boldsymbol{r}) &= \sum_{aa'}
n_{aa'}(\boldsymbol{r}) \delta\rho_{aa'}.
\end{align}
Refer to the documentation of
:class:`HotCarriersCalculator <rhodent.calculators.HotCarriersCalculator>` for definitions
of :math:`\delta\rho_{ii'}` and :math:`\delta\rho_{aa'}`.
Parameters
----------
gpw_file
Filename of GPAW ground state file.
response
Response object.
filter_occ
Filters for occupied states (holes). Provide a list of tuples (low, high)
to compute the density of holes with energies within the interval low-high.
filter_unocc
Filters for unoccupied states (electrons). Provide a list of tuples (low, high)
to compute the density of excited electrons with energies within the interval low-high.
times
Compute densities in the time domain, for these times (or as close to them as possible).
In units of as.
May not be used together with :attr:`frequencies` or :attr:`frequency_broadening`.
pulses
Compute densities in the time domain, in response to these pulses.
If none, then no pulse convolution is performed.
May not be used together with :attr:`frequencies` or :attr:`frequency_broadening`.
frequencies
Compute densities in the frequency domain, for these frequencies. In units of eV.
May not be used together with :attr:`times` or :attr:`pulses`.
frequency_broadening
Compute densities in the frequency domain, with Gaussian broadening of this width.
In units of eV.
May not be used together with :attr:`times` or :attr:`pulses`.
"""
def __init__(self,
gpw_file: str,
response: BaseResponse,
filter_occ: Sequence[tuple[float, float]] = [],
filter_unocc: Sequence[tuple[float, float]] = [],
*,
times: list[float] | NDArray[np.float64] | None = None,
pulses: Collection[PerturbationLike] | None = None,
frequencies: list[float] | NDArray[np.float64] | None = None,
frequency_broadening: float = 0,
):
super().__init__(response=response,
times=times,
pulses=pulses,
frequencies=frequencies,
frequency_broadening=frequency_broadening)
self._occ_filters = [self._build_single_filter('o', low, high) for low, high in filter_occ]
self._unocc_filters = [self._build_single_filter('u', low, high) for low, high in filter_unocc]
self.log.start('load_gpaw')
self._calc = GPAW(gpw_file, txt=None, communicator=self.calc_comm,
parallel={'domain': self.calc_comm.size})
msg = f'Loaded/initialized GPAW in {self.log.elapsed("load_gpaw"):.1f}'
self.log.start('init_gpaw')
self.calc.initialize_positions() # Initialize in order to calculate density
msg += f'/{self.log.elapsed("init_gpaw"):.1f} s'
if self.calc_comm.rank == 0:
self.log_parallel(msg)
self.ksd.density = self.calc.density
@property
def gdshape(self) -> tuple[int, int, int]:
""" Shape of the real space grid.
"""
shape = tuple(int(N) - 1 for N in self.N_c)
return shape # type: ignore
@property
def gd(self) -> GridDescriptor:
""" Real space grid. """
return self.ksd.density.finegd
@property
def N_c(self) -> NDArray[np.int_]:
""" Number of points in each Cartesian direction of the grid.
"""
return self.gd.N_c
@property
def cell_cv(self) -> NDArray[np.float64]:
""" Cell vectors. """
return self.gd.cell_cv * Bohr
@property
def occ_filters(self) -> list[slice]:
""" List of energy filters for occupied states. """
return self._occ_filters
@property
def unocc_filters(self) -> list[slice]:
""" List of energy filters for unoccupied states. """
return self._unocc_filters
@property
def calc(self) -> GPAWCalculator:
""" GPAW calculator instance. """
return self._calc
[docs]
def get_result_keys(self,
yield_total: bool = True,
yield_electrons: bool = False,
yield_holes: bool = False) -> ResultKeys:
noccf = len(self.occ_filters)
nunoccf = len(self.unocc_filters)
if (yield_electrons or yield_holes) and not self._is_time_density_matrices:
raise ValueError('Electron or hole densities can only be computed in the time domain.')
resultkeys = ResultKeys()
if yield_total:
resultkeys.add_key('rho_g', self.gdshape)
if yield_holes:
resultkeys.add_key('occ_rho_g', self.gdshape)
if noccf > 0:
resultkeys.add_key('occ_rho_rows_fg', (noccf, ) + self.gdshape)
resultkeys.add_key('occ_rho_diag_fg', (noccf, ) + self.gdshape)
if yield_electrons:
resultkeys.add_key('unocc_rho_g', self.gdshape)
if nunoccf > 0:
resultkeys.add_key('unocc_rho_rows_fg', (nunoccf, ) + self.gdshape)
resultkeys.add_key('unocc_rho_diag_fg', (nunoccf, ) + self.gdshape)
return resultkeys
@property
def _need_derivatives_real_imag(self) -> tuple[list[int], bool, bool]:
# Time domain: We only need the real part of the density matrix.
# Frequency domain: We need the (complex) Fourier transform of
# the real part of the density matrix.
return ([0], True, False)
def _find_limit(self,
lim: float) -> int:
""" Find the first eigenvalue larger than :attr:`lim`.
Parameters
----------
lim
Threshold value in units of eV.
Returns
-------
The index of the first eigenvalue larger than :attr:`lim`.
Returns ``len(eig_n)`` if :attr:`lim` is larger than all eigenvalues.
"""
if lim > self.eig_n[-1]:
return len(self.eig_n)
return int(np.argmax(self.eig_n > lim))
def _build_single_filter(self,
key: str,
low: float,
high: float) -> slice:
imin, imax, amin, amax = self.ksd.ialims()
if key == 'o':
nlow = min(self._find_limit(low), imax) - imin
nhigh = min(self._find_limit(high), imax) - imin
elif key == 'u':
nlow = min(self._find_limit(low), amax) - amin
nhigh = min(self._find_limit(high), amax) - amin
else:
raise RuntimeError(f'Unknown key {key}. Key must be "o" or "u"')
return slice(nlow, nhigh)
[docs]
def get_density(self,
rho_nn: DistributedArray,
nn_indices: str,
fltn1: slice | NDArray[np.bool_] = slice(None),
fltn2: slice | NDArray[np.bool_] = slice(None),
u: int = 0) -> DistributedArray:
r""" Calculate a real space density from a density matrix in the Kohn-Sham basis.
Parameters
----------
rho_nn
Density matrix :math:`\delta\rho_{ia}`, :math:`\delta\rho_{ii'}`, or
:math:`\delta\rho_{aa'}`.
nn_indices
Indices describing the density matrices :attr:`rho_nn`. One of
- `ia` for induced density :math:`\delta\rho_{ia'}`.
- `ii` for holes density :math:`\delta\rho_{ii'}`.
- `aa` for electrons density :math:`\delta\rho_{aa'}`.
flt_n1
Filter selecting rows of the density matrix.
flt_n2
Filter selecting columns of the density matrix.
u
Kpoint index.
Returns
-------
Distributed array with the density in real space on the root rank.
"""
imin, imax, amin, amax = self.ksd.ialims()
if nn_indices not in ['ia', 'ii', 'aa']:
raise ValueError(f'Parameter nn_indices must be either "ia", "ii" or "aa". Is {nn_indices}.')
n1 = slice(imin, imax + 1) if nn_indices[0] == 'i' else slice(amin, amax + 1)
n2 = slice(imin, imax + 1) if nn_indices[1] == 'i' else slice(amin, amax + 1)
# Distribute rho_nn
self.log.start('transform_dm')
if self.calc_comm.rank == 0:
assert np.issubdtype(rho_nn.dtype, float)
rho_nn = np.ascontiguousarray(rho_nn)
else:
assert isinstance(rho_nn, ArrayIsOnRootRank)
rho_nn = np.zeros((n1.stop - n1.start, n2.stop - n2.start), dtype=float)
self.calc_comm.broadcast(rho_nn, 0)
# Sum for sanity check
total = np.trace(rho_nn)
C0_nM = self.ksd.C0_unM[u]
nM = C0_nM.shape[1]
# Construct a slice of work to be done by this calc_comm rank
strideM = (nM + self.calc_comm.size - 1) // self.calc_comm.size
sliceM = slice(strideM * self.calc_comm.rank, strideM * (self.calc_comm.rank + 1))
# Filter
rho_nn = rho_nn[fltn1][:, fltn2]
C0_n1M = C0_nM[n1][fltn1]
C0_n2M = C0_nM[n2][fltn2]
# Transform to LCAO basis:
# Perform right matrix multiplication in parallel
rho_nM = np.zeros(rho_nn.shape[0:1] + (nM, ), dtype=float)
rho_nM[:, sliceM] = rho_nn @ C0_n2M[:, sliceM].conj()
self.calc_comm.sum(rho_nM)
# Perform left matrix multiplication in parallel
rho_MM = np.zeros((nM, nM), dtype=float)
rho_MM[sliceM, :] = C0_n1M[:, sliceM].T @ rho_nM
self.calc_comm.sum(rho_MM)
# Symmetrize
rho_MM = 0.5 * (rho_MM + rho_MM.T)
msg = f'Transformed DM and constructed density in {self.log.elapsed("transform_dm"):.1f}s'
self.log.start('get_density')
rho_g = get_density(rho_MM, self.calc.wfs, self.calc.density, u=u)
msg += f'+{self.log.elapsed("get_density"):.1f}s'
if self.calc_comm.rank == 0:
self.log_parallel(msg, flush=True)
integ = self.gd.integrate(rho_g)
if self.calc_comm.rank == 0:
rerr = np.abs(integ - total) / total
if False:
self.log_parallel(f'Relative error: {rerr}')
big_rho_g = self.gd.collect(rho_g)
if self.calc_comm.rank == 0:
return big_rho_g
else:
return ArrayIsOnRootRank()
[docs]
def icalculate(self,
yield_total: bool = True,
yield_electrons: bool = False,
yield_holes: bool = False) -> Generator[tuple[WorkMetadata, Result], None, None]:
""" Iteratively calculate results.
Parameters
----------
yield_total
The results should include the total induced density.
yield_holes
The results should include the holes densities, optionally decomposed by `filter_occ`.
yield_electrons
The results should include the electrons densities, optionally decomposed by `filter_unocc`.
Yields
------
Tuple (work, result) on the root rank of the calculation communicator. \
Does not yield on non-root ranks of the calculation communicator.
work
An object representing the metadata (time, frequency or pulse) for the work done.
result
Object containg the calculation results for this time, frequency or pulse.
"""
noccf = len(self.occ_filters)
nunoccf = len(self.unocc_filters)
if (yield_electrons or yield_holes) and not self._is_time_density_matrices:
raise ValueError('Electron or hole densities can only be computed in the time domain.')
# Iterate over the pulses and times, or frequencies
for work, dm in self.density_matrices:
# Read these now, so non calc_comm root ranks can continue
if yield_electrons or yield_holes:
Q_ia = dm.Q_ia
P_ia = dm.P_ia
if self._is_time_density_matrices:
# Real part contributes to density
rho_ia = dm.rho_ia.real
else:
# Imaginary part gives absorption contribution
rho_ia = -dm.rho_ia.imag
self.log.start('calculate')
# Non-root ranks on calc_comm will write empty arrays to result, but will not be yielded
result = Result()
if yield_total:
result['rho_g'] = self.get_density(rho_ia.real, 'ia') * Bohr ** -3
if yield_holes:
# Holes
M_ii = 0.5 * (Q_ia @ Q_ia.T + P_ia @ P_ia.T)
result['occ_rho_g'] = self.get_density(M_ii, 'ii') * Bohr ** -3
if noccf > 0:
result['occ_rho_rows_fg'] = np.array([self.get_density(M_ii, 'ii', fltn1=flt)
for flt in self.occ_filters]) * Bohr ** -3
result['occ_rho_diag_fg'] = np.array([self.get_density(M_ii, 'ii', fltn1=flt, fltn2=flt)
for flt in self.occ_filters]) * Bohr ** -3
if yield_electrons:
# Electrons
M_aa = 0.5 * (Q_ia.T @ Q_ia + P_ia.T @ P_ia)
result['unocc_rho_g'] = self.get_density(M_aa, 'aa') * Bohr ** -3
if nunoccf > 0:
result['unocc_rho_rows_fg'] = np.array([self.get_density(M_aa, 'aa', fltn1=flt)
for flt in self.unocc_filters]) * Bohr ** -3
result['unocc_rho_diag_fg'] = np.array([self.get_density(M_aa, 'aa', fltn1=flt, fltn2=flt)
for flt in self.unocc_filters]) * Bohr ** -3
if dm.rank > 0:
continue
self.log_parallel(f'Calculated density in {self.log.elapsed("calculate"):.2f}s '
f'for {work.desc}', flush=True)
yield work, result
if self.calc_comm.rank == 0:
self.log_parallel('Finished calculating density contributions', flush=True)
[docs]
def calculate_and_write(self,
out_fname: str,
which: str | list[str] = 'induced',
write_extra: dict[str, Any] = dict()):
""" Calculate density contributions.
Densities are saved in a numpy archive, ULM file or cube file depending on
whether the file extension is ``.npz``, ``.ulm``, or ``.cube``.
If the file extension is ``.cube`` then :attr:`out_fname` is taken to be a format string
that takes the following attributes.
Accepts variables
* `{time}` - Time in units of as (time domain only).
* `{freq}` - Frequency in units of eV (frequency domain only).
* `{which}` - The :attr:`which` argument.
* `{pulsefreq}` - Pulse frequency in units of eV (time domain only).
* `{pulsefwhm}` - Pulse FWHM in units of fs (time domain only).
Examples:
* out_fname = `{which}_density_t{time:09.1f}.cube`.
* out_fname = `{which}_density_w{freq:05.2f}.cube`.
Parameters
----------
out_fname
File name of the resulting data file.
which
String, or list of strings specifying the types of density to compute:
* `induced` - Induced density.
* `holes` - Holes density (only allowed in the time domain).
* `electrons` - Electrons density (only allowed in the time domain).
write_extra
Dictionary of extra key-value pairs to write to the data file.
"""
from ..writers.density import DensityWriter, write_density
from ..writers.writer import FrequencyResultsCollector, TimeResultsCollector
cls = TimeResultsCollector if self._is_time_density_matrices else FrequencyResultsCollector
if isinstance(which, str):
which = [which]
for which_key in which:
if which_key in ['holes', 'electrons'] and not self._is_time_density_matrices:
raise ValueError(f'Option which={which_key} not allowed in the frequency domain.')
if which_key not in ['induced', 'holes', 'electrons']:
raise ValueError(f'Option which={which} not recognized. '
'Must be one of: induced, holes, electrons')
calc_kwargs = dict(yield_total='induced' in which,
yield_holes='holes' in which,
yield_electrons='electrons' in which)
keys = {'induced': 'rho_g',
'holes': 'occ_rho_g',
'electrons': 'unocc_rho_g'}
out_fname = str(out_fname)
if out_fname.endswith('.npz'):
exclude = ['occ_rho_rows_fg', 'occ_rho_diag_fg', 'unocc_rho_rows_fg', 'unocc_rho_diag_fg']
writer = DensityWriter(cls(self, calc_kwargs=calc_kwargs, exclude=exclude))
writer.calculate_and_save_npz(out_fname=out_fname, write_extra=write_extra)
elif out_fname.endswith('.ulm'):
exclude = ['rho_g', 'occ_rho_rows_fg', 'occ_rho_diag_fg', 'unocc_rho_rows_fg', 'unocc_rho_diag_fg']
writer = DensityWriter(cls(self, calc_kwargs=calc_kwargs, exclude=exclude))
writer.calculate_and_save_ulm(out_fname=out_fname, write_extra=write_extra)
elif out_fname.endswith('.cube'):
atoms = self.calc.atoms
for work, res in self.icalculate(**calc_kwargs):
if self.calc_comm.rank > 0:
continue
for which_key in which:
key = keys[which_key]
fname_kw: dict[str, float | str] = dict(which=which_key)
data = res[key]
if self._is_time_density_matrices:
assert isinstance(work, ConvolutionDensityMatrixMetadata)
fname_kw.update(time=work.time, **get_gaussian_pulse_values(work.pulse))
else:
assert isinstance(work, FrequencyDensityMatrixMetadata)
fname_kw.update(freq=work.freq)
fpath = Path(out_fname.format(**fname_kw))
write_density(str(fpath), atoms, data)
self.log_parallel(f'Written {fpath}', flush=True)
else:
raise ValueError(f'output-file must have ending .npz or .ulm, is {out_fname}')