Source code for rhodent.writers.hcdist

from __future__ import annotations

import sys
from typing import cast
import numpy as np
from numpy.typing import NDArray
from gpaw.mpi import world
from gpaw.tddft.units import au_to_eV, au_to_fs

from ..calculators.hotcarriers import HotCarriersCalculator
from ..density_matrices.time import (ConvolutionDensityMatrices, TimeDensityMatrices)
from ..voronoi import EmptyVoronoiWeights, VoronoiWeights
from .writer import (Writer, ResultsCollector, TimeAverageResultsCollector,
                     TimeResultsCollector,
                     PulseConvolutionAverageResultsCollector)


[docs] class HotCarriersWriter(Writer): """ Calculate hot-carrier totals, and optionally broadened hot-carrier energy distributions Parameters ---------- collector ResultsCollector object only_one_pulse False if the resulting outputs should have one dimension corresponding to different pulses. True if there should be no such dimension. If True, then the calculator must only hold one pulse. """ def __init__(self, collector: ResultsCollector, only_one_pulse: bool): super().__init__(collector) self.only_one_pulse = only_one_pulse if only_one_pulse: if isinstance(self.density_matrices, ConvolutionDensityMatrices): assert len(self.density_matrices.pulses) == 1, 'Only one pulse allowed' else: assert isinstance(self.density_matrices, ConvolutionDensityMatrices) @property def common_arrays(self) -> dict[str, NDArray[np.float64] | int | float]: common = super().common_arrays assert isinstance(self.density_matrices, (TimeDensityMatrices, ConvolutionDensityMatrices)) if isinstance(self.collector, (TimeAverageResultsCollector, PulseConvolutionAverageResultsCollector)): # Averages over time are taken common['avgtime_t'] = self.density_matrices.times * 1e-3 else: common['time_t'] = self.density_matrices.times * 1e-3 if isinstance(self.density_matrices, ConvolutionDensityMatrices): # Frequency (eV) pulsefreqs = [pulse.omega0 * au_to_eV for pulse in self.density_matrices.pulses] # FWHM in time domain (fs) pulsefwhms = [1 / pulse.sigma * (2 * np.sqrt(2 * np.log(2))) * au_to_fs for pulse in self.density_matrices.pulses] if self.only_one_pulse: common['pulsefreq'] = pulsefreqs[0] common['pulsefwhm'] = pulsefwhms[0] else: common['pulsefreq_p'] = np.array(pulsefreqs) common['pulsefwhm_p'] = np.array(pulsefwhms) if self.calc.sigma is not None: # There is an energy grid common['sigma'] = self.calc.sigma common['energy_o'] = np.array(self.calc.energies_occ) common['energy_u'] = np.array(self.calc.energies_unocc) return common
[docs] def calculate_hcdist_and_save_by_filename(out_fname: str, **kwargs): """ Calculate broadened hot-carrier energy distributions, optionally averaged over time The file format of the resulting data file is inferred from the file name Parameters ---------- out_fname File name of the resulting data file density_matrices Collection of density matrices in the time domain voronoi Optional Voronoi weights object. If given, then the hot-carrier distributions are additonally projected according to the weights. energies_occ Energy grid in eV for occupied levels (hole carriers). If given, hole distributions are computed and saved. energies_unocc Energy grid in eV for unoccupied levels (excited electrons). If given, electron distributions are computed and saved. sigma Gaussian broadening width in eV for the broadened distributions. average_times If true, an average over the given times will be taken. If false, then hot-carrier distributions are computed separately over the times, and each output is written separately for each time """ if out_fname[-4:] == '.npz': calculate_hcdist_and_save_npz(out_fname=out_fname, **kwargs) elif out_fname[-4:] == '.dat': calculate_hcdist_and_save_dat(out_fname=out_fname, **kwargs) else: print(f'output-file must have ending .npz or .dat, is {out_fname}') sys.exit(1)
[docs] def calculate_hcsweeppulse_and_save_by_filename(out_fname: str, **kwargs): """ Calculate the number of generated hot carriers, projected on groups of atoms, for a list of pulses The file format of the resulting data file is inferred from the file name Parameters ---------- out_fname File name of data file where data is to be saved density_matrices Object that gives the density matrix in the time domain voronoi Voronoi weights object """ if out_fname[-4:] == '.npz': calculate_hcsweeppulse_and_save_npz(out_fname=out_fname, **kwargs) elif out_fname[-4:] == '.dat': calculate_hcsweeppulse_and_save_dat(out_fname=out_fname, **kwargs) else: print(f'output-file must have ending .npz or .dat, is {out_fname}') sys.exit(1)
[docs] def calculate_hcsweeptime_and_save_by_filename(out_fname: str, **kwargs): """ Calculate the number of generated hot carriers, projected on groups of atoms, for a list of times The ground state including all unoccupied states and KohnShamDecomposition file are loaded. The delta-kick response density matrix in frequency space is loaded, convoluted with a pulse in frequency space, and inverse Fourier transformed to real time. Then number of generated HCs are computed for each of the atom projections. The file format of the resulting data file is inferred from the file name Parameters ---------- out_fname File name of data file where data is to be saved density_matrices Object that gives the density matrix in the time domain voronoi Voronoi weights object """ if out_fname[-4:] == '.npz': calculate_hcsweeptime_and_save_npz(out_fname=out_fname, **kwargs) elif out_fname[-4:] == '.dat': calculate_hcsweeptime_and_save_dat(out_fname=out_fname, **kwargs) else: print(f'output-file must have ending .npz or .dat, is {out_fname}') sys.exit(1)
[docs] def calculate_hcdist_and_save_dat(out_fname: str, density_matrices: TimeDensityMatrices | ConvolutionDensityMatrices, voronoi: VoronoiWeights | None = None, energies_occ: list[float] | NDArray[np.float64] = [], energies_unocc: list[float] | NDArray[np.float64] = [], *, sigma: float, average_times: bool = True): """ Calculate broadened hot-carrier energy distributions, optionally averaged over time HC distributions are saved in a text file Parameters ---------- out_fname File name of the resulting data file density_matrices Collection of density matrices in the time domain voronoi Optional Voronoi weights object. If given, then the hot-carrier distributions are additonally projected according to the weights. energies_occ Energy grid in eV for occupied levels (hole carriers). If given, hole distributions are computed and saved. energies_unocc Energy grid in eV for unoccupied levels (excited electrons). If given, electron distributions are computed and saved. sigma Gaussian broadening width in eV for the broadened distributions. average_times If true, an average over the given times will be taken. If false, then hot-carrier distributions are computed separately over the times, and each output is written separately for each time """ zerostr = 'relative to Fermi level' if voronoi is None: voronoi = EmptyVoronoiWeights() # Calculate if len(energies_occ) == 0 and len(energies_unocc) == 0: raise ValueError('Either occupied or unoccupied energies grid must be given') calc = HotCarriersCalculator(density_matrices=density_matrices, voronoi=voronoi, energies_occ=energies_occ, energies_unocc=energies_unocc, sigma=sigma, ) calc_kwargs = dict(yield_total_hcdists=True, yield_proj_hcdists=True) cls = TimeAverageResultsCollector if average_times else TimeResultsCollector writer = HotCarriersWriter(cls(calc, calc_kwargs), only_one_pulse=True) _data = dict(**writer.common_arrays) _data.update(writer.calculate_data()._data) if world.rank > 0: return pulsefreq = _data.pop('pulsefreq', None) pulsefwhm = _data.pop('pulsefwhm', None) data = cast(dict[str, NDArray[np.float64]], _data) # Set up array to be written in the data file. # Rows are energies, columns are projections and/or times nI = len(voronoi) ne = max(len(energies_occ), len(energies_unocc)) # Longest length of energies eh_labels = [] if len(energies_occ) > 0: # Compute hole distributions eh_labels.append('H') if len(energies_unocc) > 0: # Compute electron distributions eh_labels.append('E') if average_times: nt = 1 else: nt = len(data['time_t']) ncolspertime = len(eh_labels) * (1 + nI) savedata = np.full((ne, len(eh_labels) + nt * ncolspertime), np.nan) savedata_by_times = [savedata[:, col:col + ncolspertime] for col in range(len(eh_labels), savedata.shape[1], ncolspertime)] # Set up format string fmt = len(eh_labels) * ['%15.6f'] + nt * ncolspertime * ['%18.8e'] # Set up header contents header_lines = [f'Hot carrier (H=hole, E=electron) distributions {zerostr}'] if pulsefreq is not None: header_lines.append(f'Response to pulse with frequency {pulsefreq:.2f}eV, ' f'FWHM {pulsefwhm:.2f}fs') if average_times: avgtimes = data['avgtime_t'] header_lines.append(f'Averaged for {len(avgtimes)} times between ' f'{avgtimes[0]:.1f}fs-{avgtimes[-1]:.1f}fs') else: times = data['time_t'] header_lines.append(f'Computed for the following {len(times)} times (in fs)') header_lines += [f' {time:.4f}' for t, time in enumerate(times)] if nI > 0: header_lines.append('Atomic projections') header_lines += [f' {i:4.0f}: {str(proj)}' for i, proj in enumerate(voronoi.atom_projections)] header_lines.append(f'Gaussian folding, Width {sigma:.4f}eV') desc_entries = ([f'{label} energy (eV)' for label in eh_labels] + [f'Total {label} (1/eV)' for label in eh_labels] + [f'Proj {s} {i:2.0f} (1/eV)' for i in range(nI) for s in eh_labels]) desc_entries = ([f'{s:>15}' for s in desc_entries[:len(eh_labels)]] + [f'{s:>18}' for s in desc_entries[len(eh_labels):]]) desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' if not average_times: desc_entries.append(' ... repeated for next times') header_lines.append(' '.join(desc_entries)) # Write the data to the array if average_times: if len(eh_labels) == 2: # Computed both electron and hole distributions savedata[:len(energies_occ), 0] = energies_occ savedata[:len(energies_unocc), 1] = energies_unocc savedata[:len(energies_occ), 2] = data['hcdist_o'] savedata[:len(energies_unocc), 3] = data['hcdist_u'] if nI > 0: savedata[:len(energies_occ), 4::2] = data['hcdist_proj_Io'].T savedata[:len(energies_unocc), 5::2] = data['hcdist_proj_Iu'].T elif 'H' in eh_labels: # Only hole distributions savedata[:, 0] = energies_occ savedata[:, 1] = data['hcdist_o'] if nI > 0: savedata[:, 2:] = data['hcdist_proj_Io'].T else: # Only electron distributions savedata[:, 0] = energies_unocc savedata[:, 1] = data['hcdist_u'] if nI > 0: savedata[:, 1:] = data['hcdist_proj_Iu'].T else: if len(eh_labels) == 2: # Computed both electron and hole distributions savedata[:len(energies_occ), 0] = energies_occ savedata[:len(energies_unocc), 1] = energies_unocc for t, sdata in enumerate(savedata_by_times): sdata[:len(energies_occ), 0] = data['hcdist_to'][t] sdata[:len(energies_unocc), 1] = data['hcdist_tu'][t] if nI > 0: sdata[:len(energies_occ), 2::2] = data['hcdist_proj_tIo'][t].T sdata[:len(energies_unocc), 3::2] = data['hcdist_proj_tIu'][t].T elif 'H' in eh_labels: # Only hole distributions savedata[:, 0] = energies_occ for t, sdata in enumerate(savedata_by_times): sdata[:, 0] = data['hcdist_to'][t] if nI > 0: sdata[:, 1:] = data['hcdist_proj_tIo'][t].T else: # Only electron distributions savedata[:, 0] = energies_unocc for t, sdata in enumerate(savedata_by_times): sdata[:, 0] = data['hcdist_tu'][t] if nI > 0: sdata[:, 1:] = data['hcdist_proj_tIu'][t].T np.savetxt(out_fname, savedata, fmt, header='\n'.join(header_lines)) print(f'Written {out_fname}', flush=True)
[docs] def calculate_hcsweeppulse_and_save_dat(out_fname: str, density_matrices: ConvolutionDensityMatrices, voronoi: VoronoiWeights | None): """ Calculate the number of generated hot carriers, projected on groups of atoms, for a list of pulses HC distributions are saved in a text file Parameters ---------- out_fname File name of data file where data is to be saved density_matrices Object that gives the density matrix in the time domain voronoi Voronoi weights object """ if voronoi is None: voronoi = EmptyVoronoiWeights() Np = len(density_matrices.pulses) calc = HotCarriersCalculator(density_matrices=density_matrices, voronoi=voronoi, energies_occ=[], energies_unocc=[], sigma=None, ) calc_kwargs = dict(yield_total_hcdists=False, yield_proj_hcdists=False) collector = PulseConvolutionAverageResultsCollector(calc, calc_kwargs) writer = HotCarriersWriter(collector, only_one_pulse=False) data = dict(**writer.common_arrays) data.update(writer.calculate_data()._data) if world.rank > 0: return nI = len(voronoi) avgtimes = data['avgtime_t'] assert isinstance(avgtimes, np.ndarray) savedata = np.full((Np, 2*(2+nI)), np.nan) savedata[:, 0] = data['pulsefreq_p'] savedata[:, 1] = data['pulsefwhm_p'] savedata[:, 2] = data['sumocc_p'] savedata[:, 3] = data['sumunocc_p'] savedata[:, 4::2] = data['sumocc_proj_pI'] savedata[:, 5::2] = data['sumunocc_proj_pI'] projectionsstr = '\n'.join([f' {i:4.0f}: {str(proj)}' for i, proj in enumerate(voronoi.atom_projections)]) desc_entries = (['Pulse freq (eV)', 'Pulse FWHM (fs)', 'Total H', 'Total E'] + [f'Proj {s} {i:2.0f}' for i in range(nI) for s in 'HE']) desc_entries = ([f'{s:>17}' for s in desc_entries[:2]] + [f'{s:>15}' for s in desc_entries[2:]]) desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' header = (f'Hot carrier (H=hole, E=electron) numbers\n' f'Averaged for {len(avgtimes)} times between ' f'{avgtimes[0]:.1f}fs-{avgtimes[-1]:.1f}fs\n' 'Atomic projections:\n' f'{projectionsstr}\n' f'{" ".join(desc_entries)}') fmt = 2*['%17.6f'] + (2*(nI + 1))*['%15.8e'] np.savetxt(out_fname, savedata, fmt, header=header) print(f'Written {out_fname}', flush=True)
[docs] def calculate_hcsweeptime_and_save_dat(out_fname: str, density_matrices: TimeDensityMatrices | ConvolutionDensityMatrices, voronoi: VoronoiWeights | None): """ Calculate the number of generated hot carriers, projected on groups of atoms, for a list of times The ground state including all unoccupied states and KohnShamDecomposition file are loaded. The delta-kick response density matrix in frequency space is loaded, convoluted with a pulse in frequency space, and inverse Fourier transformed to real time. Then number of generated HCs are computed for each of the atom projections. HC distributions are saved in a text file Parameters ---------- out_fname File name of data file where data is to be saved density_matrices Object that gives the density matrix in the time domain voronoi Voronoi weights object """ if voronoi is None: voronoi = EmptyVoronoiWeights() Nt = len(density_matrices.times) calc = HotCarriersCalculator(density_matrices=density_matrices, voronoi=voronoi, energies_occ=[], energies_unocc=[], sigma=None, ) calc_kwargs = dict(yield_total_hcdists=False, yield_proj_hcdists=False) collector = TimeResultsCollector(calc, calc_kwargs) writer = HotCarriersWriter(collector, only_one_pulse=True) data = dict(**writer.common_arrays) data.update(writer.calculate_data()._data) if world.rank > 0: return nI = len(voronoi) savedata = np.full((Nt, 1 + 2*(1+nI)), np.nan) savedata[:, 0] = data['time_t'] savedata[:, 1] = data['sumocc_t'] savedata[:, 2] = data['sumunocc_t'] savedata[:, 3::2] = data['sumocc_proj_tI'] savedata[:, 4::2] = data['sumunocc_proj_tI'] projectionsstr = '\n'.join([f' {i:4.0f}: {str(proj)}' for i, proj in enumerate(voronoi.atom_projections)]) desc_entries = (['Time (fs)', 'Total H', 'Total E'] + [f'Proj {s} {i:2.0f}' for i in range(nI) for s in 'HE']) desc_entries = ([f'{s:>17}' for s in desc_entries[:1]] + [f'{s:>15}' for s in desc_entries[1:]]) desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' header_lines = ['Hot carrier (H=hole, E=electron) numbers\n'] if isinstance(density_matrices, ConvolutionDensityMatrices): header_lines += [f'Response to pulse with frequency {data["pulsefreq"]:.2f}eV, ' f'FWHM {data["pulsefwhm"]:.2f}fs'] header_lines += ['Atomic projections:', f'{projectionsstr}\n', ' '.join(desc_entries)] header = '\n'.join(header_lines) fmt = ['%17.6f'] + (2*(nI + 1))*['%15.8e'] np.savetxt(out_fname, savedata, fmt, header=header) print(f'Written {out_fname}', flush=True)
[docs] def calculate_hcdist_and_save_npz(out_fname: str, density_matrices: TimeDensityMatrices | ConvolutionDensityMatrices, voronoi: VoronoiWeights | None, energies_occ: list[float] | NDArray[np.float64] = [], energies_unocc: list[float] | NDArray[np.float64] = [], *, sigma: float, average_times: bool = True): """ Calculate broadened hot-carrier energy distributions, optionally averaged over time HC distributions are saved in a compressed numpy archive Parameters ---------- out_fname File name of the resulting data file density_matrices Collection of density matrices in the time domain voronoi Optional Voronoi weights object. If given, then the hot-carrier distributions are additonally projected according to the weights. energies_occ Energy grid in eV for occupied levels (hole carriers). If given, hole distributions are computed and saved. energies_unocc Energy grid in eV for unoccupied levels (excited electrons). If given, electron distributions are computed and saved. sigma Gaussian broadening width in eV for the broadened distributions. average_times If true, an average over the given times will be taken. If false, then hot-carrier distributions are computed separately over the times, and each output is written separately for each time """ if voronoi is None: voronoi = EmptyVoronoiWeights() if len(energies_occ) == 0 and len(energies_unocc) == 0: raise ValueError('Either occupied or unoccupied energies grid must be given') calc = HotCarriersCalculator(density_matrices=density_matrices, voronoi=voronoi, energies_occ=energies_occ, energies_unocc=energies_unocc, sigma=sigma, ) calc_kwargs = dict(yield_total_hcdists=True, yield_proj_hcdists=True) cls = TimeAverageResultsCollector if average_times else TimeResultsCollector writer = HotCarriersWriter(cls(calc, calc_kwargs), only_one_pulse=True) writer.calculate_and_save_npz(out_fname)
[docs] def calculate_hcsweeppulse_and_save_npz(out_fname: str, density_matrices: ConvolutionDensityMatrices, voronoi: VoronoiWeights | None): """ Calculate the number of generated hot carriers, projected on groups of atoms, for a list of pulses HC distributions are saved in a text file Parameters ---------- out_fname File name of data file where data is to be saved density_matrices Object that gives the density matrix in the time domain voronoi Voronoi weights object """ if voronoi is None: voronoi = EmptyVoronoiWeights() calc = HotCarriersCalculator(density_matrices=density_matrices, voronoi=voronoi, energies_occ=[], energies_unocc=[], sigma=None, ) calc_kwargs = dict(yield_total_hcdists=False, yield_proj_hcdists=False) collector = PulseConvolutionAverageResultsCollector(calc, calc_kwargs) writer = HotCarriersWriter(collector, only_one_pulse=False) writer.calculate_and_save_npz(out_fname)
[docs] def calculate_hcsweeptime_and_save_npz(out_fname: str, density_matrices: TimeDensityMatrices | ConvolutionDensityMatrices, voronoi: VoronoiWeights | None): """ Calculate the number of generated hot carriers, projected on groups of atoms, for a list of times The ground state including all unoccupied states and KohnShamDecomposition file are loaded. The delta-kick response density matrix in frequency space is loaded, convoluted with a pulse in frequency space, and inverse Fourier transformed to real time. Then number of generated HCs are computed for each of the atom projections. HC distributions are saved in a text file Parameters ---------- out_fname File name of data file where data is to be saved density_matrices Object that gives the density matrix in the time domain voronoi Voronoi weights object """ if voronoi is None: voronoi = EmptyVoronoiWeights() calc = HotCarriersCalculator(density_matrices=density_matrices, voronoi=voronoi, energies_occ=[], energies_unocc=[], sigma=None, ) calc_kwargs = dict(yield_total_hcdists=False, yield_proj_hcdists=False) collector = TimeResultsCollector(calc, calc_kwargs) writer = HotCarriersWriter(collector, only_one_pulse=True) writer.calculate_and_save_npz(out_fname)