Source code for rhodent.writers.hcdist

from __future__ import annotations

from typing import cast
import numpy as np
from numpy.typing import NDArray
from gpaw.mpi import world

from ..density_matrices.time import ConvolutionDensityMatrices
from .writer import (Writer, ResultsCollector, TimeAverageResultsCollector,
                     PulseConvolutionAverageResultsCollector)
from ..utils import get_gaussian_pulse_values


[docs] class HotCarriersWriter(Writer): """ Calculate hot-carrier totals, and optionally broadened hot-carrier energy distributions Parameters ---------- collector ResultsCollector object only_one_pulse False if the resulting outputs should have one dimension corresponding to different pulses. True if there should be no such dimension. If True, then the calculator must only hold one pulse. """ def __init__(self, collector: ResultsCollector, only_one_pulse: bool): super().__init__(collector) self.only_one_pulse = only_one_pulse if only_one_pulse: if isinstance(self.density_matrices, ConvolutionDensityMatrices): assert len(self.density_matrices.pulses) == 1, 'Only one pulse allowed' else: assert isinstance(self.density_matrices, ConvolutionDensityMatrices) @property def common_arrays(self) -> dict[str, NDArray[np.float64] | int | float]: common = super().common_arrays assert isinstance(self.density_matrices, ConvolutionDensityMatrices) if isinstance(self.collector, (TimeAverageResultsCollector, PulseConvolutionAverageResultsCollector)): # Averages over time are taken common['avgtime_t'] = self.density_matrices.times * 1e-3 else: common['time_t'] = self.density_matrices.times * 1e-3 if isinstance(self.density_matrices, ConvolutionDensityMatrices): # If pulses are Gaussian pulses, then get dictionaries of 'pulsefreq' and 'pulsefwhm' pulsedicts = [get_gaussian_pulse_values(pulse) for pulse in self.density_matrices.pulses] try: pulsefreqs = [d['pulsefreq'] for d in pulsedicts] pulsefwhms = [d['pulsefwhm'] for d in pulsedicts] if self.only_one_pulse: common['pulsefreq'] = pulsefreqs[0] common['pulsefwhm'] = pulsefwhms[0] else: common['pulsefreq_p'] = np.array(pulsefreqs) common['pulsefwhm_p'] = np.array(pulsefwhms) except KeyError: # Not GaussianPulses pass if self.calc.sigma is not None: # There is an energy grid common['sigma'] = self.calc.sigma common['energy_o'] = np.array(self.calc.energies_occ) common['energy_u'] = np.array(self.calc.energies_unocc) return common
def hcdist_save_dat(out_fname: str, writer: HotCarriersWriter): zerostr = 'relative to Fermi level' average_times = isinstance(writer.collector, (TimeAverageResultsCollector, PulseConvolutionAverageResultsCollector)) energies_occ = writer.calc.energies_occ energies_unocc = writer.calc.energies_unocc if len(energies_occ) == 0 and len(energies_unocc) == 0: raise ValueError('Either occupied or unoccupied energies grid must be given') _data = dict(**writer.common_arrays) _data.update(writer.calculate_data()._data) if world.rank > 0: return pulsefreq = _data.pop('pulsefreq', None) pulsefwhm = _data.pop('pulsefwhm', None) data = cast(dict[str, NDArray[np.float64]], _data) # Set up array to be written in the data file. # Rows are energies, columns are projections and/or times nI = len(writer.voronoi) ne = max(len(energies_occ), len(energies_unocc)) # Longest length of energies eh_labels = [] if len(energies_occ) > 0: # Compute hole distributions eh_labels.append('H') if len(energies_unocc) > 0: # Compute electron distributions eh_labels.append('E') if average_times: nt = 1 else: nt = len(data['time_t']) ncolspertime = len(eh_labels) * (1 + nI) savedata = np.full((ne, len(eh_labels) + nt * ncolspertime), np.nan) savedata_by_times = [savedata[:, col:col + ncolspertime] for col in range(len(eh_labels), savedata.shape[1], ncolspertime)] # Set up format string fmt = len(eh_labels) * ['%15.6f'] + nt * ncolspertime * ['%18.8e'] # Set up header contents header_lines = [f'Hot carrier (H=hole, E=electron) distributions {zerostr}'] if pulsefreq is not None: header_lines.append(f'Response to pulse with frequency {pulsefreq:.2f}eV, ' f'FWHM {pulsefwhm:.2f}fs') if average_times: avgtimes = data['avgtime_t'] header_lines.append(f'Averaged for {len(avgtimes)} times between ' f'{avgtimes[0]:.1f}fs-{avgtimes[-1]:.1f}fs') else: times = data['time_t'] header_lines.append(f'Computed for the following {len(times)} times (in fs)') header_lines += [f' {time:.4f}' for t, time in enumerate(times)] if nI > 0: header_lines.append('Atomic projections') header_lines += [f' {i:4.0f}: {str(proj)}' for i, proj in enumerate(writer.voronoi.atom_projections)] header_lines.append(f'Gaussian folding, Width {writer.calc.sigma:.4f}eV') desc_entries = ([f'{label} energy (eV)' for label in eh_labels] + [f'Total {label} (1/eV)' for label in eh_labels] + [f'Proj {s} {i:2.0f} (1/eV)' for i in range(nI) for s in eh_labels]) desc_entries = ([f'{s:>15}' for s in desc_entries[:len(eh_labels)]] + [f'{s:>18}' for s in desc_entries[len(eh_labels):]]) desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' if not average_times: desc_entries.append(' ... repeated for next times') header_lines.append(' '.join(desc_entries)) # Write the data to the array if average_times: if len(eh_labels) == 2: # Computed both electron and hole distributions savedata[:len(energies_occ), 0] = energies_occ savedata[:len(energies_unocc), 1] = energies_unocc savedata[:len(energies_occ), 2] = data['hcdist_o'] savedata[:len(energies_unocc), 3] = data['hcdist_u'] if nI > 0: savedata[:len(energies_occ), 4::2] = data['hcdist_proj_Io'].T savedata[:len(energies_unocc), 5::2] = data['hcdist_proj_Iu'].T elif 'H' in eh_labels: # Only hole distributions savedata[:, 0] = energies_occ savedata[:, 1] = data['hcdist_o'] if nI > 0: savedata[:, 2:] = data['hcdist_proj_Io'].T else: # Only electron distributions savedata[:, 0] = energies_unocc savedata[:, 1] = data['hcdist_u'] if nI > 0: savedata[:, 1:] = data['hcdist_proj_Iu'].T else: if len(eh_labels) == 2: # Computed both electron and hole distributions savedata[:len(energies_occ), 0] = energies_occ savedata[:len(energies_unocc), 1] = energies_unocc for t, sdata in enumerate(savedata_by_times): sdata[:len(energies_occ), 0] = data['hcdist_to'][t] sdata[:len(energies_unocc), 1] = data['hcdist_tu'][t] if nI > 0: sdata[:len(energies_occ), 2::2] = data['hcdist_proj_tIo'][t].T sdata[:len(energies_unocc), 3::2] = data['hcdist_proj_tIu'][t].T elif 'H' in eh_labels: # Only hole distributions savedata[:, 0] = energies_occ for t, sdata in enumerate(savedata_by_times): sdata[:, 0] = data['hcdist_to'][t] if nI > 0: sdata[:, 1:] = data['hcdist_proj_tIo'][t].T else: # Only electron distributions savedata[:, 0] = energies_unocc for t, sdata in enumerate(savedata_by_times): sdata[:, 0] = data['hcdist_tu'][t] if nI > 0: sdata[:, 1:] = data['hcdist_proj_tIu'][t].T np.savetxt(out_fname, savedata, fmt, header='\n'.join(header_lines)) print(f'Written {out_fname}', flush=True) def hcsweeppulse_save_dat(out_fname: str, writer: HotCarriersWriter): Np = len(writer.calc.pulses) data = dict(**writer.common_arrays) data.update(writer.calculate_data()._data) if world.rank > 0: return nI = len(writer.voronoi) avgtimes = data['avgtime_t'] assert isinstance(avgtimes, np.ndarray) savedata = np.full((Np, 2*(2+nI)), np.nan) savedata[:, 0] = data['pulsefreq_p'] savedata[:, 1] = data['pulsefwhm_p'] savedata[:, 2] = data['sumocc_p'] savedata[:, 3] = data['sumunocc_p'] savedata[:, 4::2] = data['sumocc_proj_pI'] savedata[:, 5::2] = data['sumunocc_proj_pI'] projectionsstr = '\n'.join([f' {i:4.0f}: {str(proj)}' for i, proj in enumerate(writer.voronoi.atom_projections)]) desc_entries = (['Pulse freq (eV)', 'Pulse FWHM (fs)', 'Total H', 'Total E'] + [f'Proj {s} {i:2.0f}' for i in range(nI) for s in 'HE']) desc_entries = ([f'{s:>17}' for s in desc_entries[:2]] + [f'{s:>15}' for s in desc_entries[2:]]) desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' header = (f'Hot carrier (H=hole, E=electron) numbers\n' f'Averaged for {len(avgtimes)} times between ' f'{avgtimes[0]:.1f}fs-{avgtimes[-1]:.1f}fs\n' 'Atomic projections:\n' f'{projectionsstr}\n' f'{" ".join(desc_entries)}') fmt = 2*['%17.6f'] + (2*(nI + 1))*['%15.8e'] np.savetxt(out_fname, savedata, fmt, header=header) print(f'Written {out_fname}', flush=True) def hcsweeptime_save_dat(out_fname: str, writer: HotCarriersWriter): Nt = len(writer.calc.times) data = dict(**writer.common_arrays) data.update(writer.calculate_data()._data) if world.rank > 0: return nI = len(writer.voronoi) savedata = np.full((Nt, 1 + 2*(1+nI)), np.nan) savedata[:, 0] = data['time_t'] savedata[:, 1] = data['sumocc_t'] savedata[:, 2] = data['sumunocc_t'] savedata[:, 3::2] = data['sumocc_proj_tI'] savedata[:, 4::2] = data['sumunocc_proj_tI'] projectionsstr = '\n'.join([f' {i:4.0f}: {str(proj)}' for i, proj in enumerate(writer.voronoi.atom_projections)]) desc_entries = (['Time (fs)', 'Total H', 'Total E'] + [f'Proj {s} {i:2.0f}' for i in range(nI) for s in 'HE']) desc_entries = ([f'{s:>17}' for s in desc_entries[:1]] + [f'{s:>15}' for s in desc_entries[1:]]) desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' header_lines = ['Hot carrier (H=hole, E=electron) numbers\n'] if 'pulsefreq' in data: header_lines += [f'Response to pulse with frequency {data["pulsefreq"]:.2f}eV, ' f'FWHM {data["pulsefwhm"]:.2f}fs'] header_lines += ['Atomic projections:', f'{projectionsstr}\n', ' '.join(desc_entries)] header = '\n'.join(header_lines) fmt = ['%17.6f'] + (2*(nI + 1))*['%15.8e'] np.savetxt(out_fname, savedata, fmt, header=header) print(f'Written {out_fname}', flush=True)