Coverage for rhodent/writers/hcdist.py: 85%
176 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3from typing import cast
4import numpy as np
5from numpy.typing import NDArray
6from gpaw.mpi import world
8from ..density_matrices.time import ConvolutionDensityMatrices
9from .writer import (Writer, ResultsCollector, TimeAverageResultsCollector,
10 PulseConvolutionAverageResultsCollector)
11from ..utils import get_gaussian_pulse_values
14class HotCarriersWriter(Writer):
16 """ Calculate hot-carrier totals, and optionally broadened hot-carrier energy distributions
18 Parameters
19 ----------
20 collector
21 ResultsCollector object
22 only_one_pulse
23 False if the resulting outputs should have one dimension corresponding
24 to different pulses. True if there should be no such dimension. If True,
25 then the calculator must only hold one pulse.
26 """
28 def __init__(self,
29 collector: ResultsCollector,
30 only_one_pulse: bool):
31 super().__init__(collector)
32 self.only_one_pulse = only_one_pulse
33 if only_one_pulse:
34 if isinstance(self.density_matrices, ConvolutionDensityMatrices):
35 assert len(self.density_matrices.pulses) == 1, 'Only one pulse allowed'
36 else:
37 assert isinstance(self.density_matrices, ConvolutionDensityMatrices)
39 @property
40 def common_arrays(self) -> dict[str, NDArray[np.float64] | NDArray[np.int64] | int | float]:
41 common = super().common_arrays
42 assert isinstance(self.density_matrices, ConvolutionDensityMatrices)
44 if isinstance(self.collector, (TimeAverageResultsCollector, PulseConvolutionAverageResultsCollector)):
45 # Averages over time are taken
46 common['avgtime_t'] = self.density_matrices.times * 1e-3
47 else:
48 common['time_t'] = self.density_matrices.times * 1e-3
50 if isinstance(self.density_matrices, ConvolutionDensityMatrices):
51 # If pulses are Gaussian pulses, then get dictionaries of 'pulsefreq' and 'pulsefwhm'
52 pulsedicts = [get_gaussian_pulse_values(pulse) for pulse in self.density_matrices.pulses]
54 try:
55 pulsefreqs = [d['pulsefreq'] for d in pulsedicts]
56 pulsefwhms = [d['pulsefwhm'] for d in pulsedicts]
58 if self.only_one_pulse:
59 common['pulsefreq'] = pulsefreqs[0]
60 common['pulsefwhm'] = pulsefwhms[0]
61 else:
62 common['pulsefreq_p'] = np.array(pulsefreqs)
63 common['pulsefwhm_p'] = np.array(pulsefwhms)
64 except KeyError:
65 # Not GaussianPulses
66 pass
68 if self.calc.sigma is not None:
69 # There is an energy grid
70 common['sigma'] = self.calc.sigma
71 common['energy_o'] = np.array(self.calc.energies_occ)
72 common['energy_u'] = np.array(self.calc.energies_unocc)
74 return common
77def write_hot_carrier_distributions(out_fname: str,
78 writer: HotCarriersWriter):
79 zerostr = 'relative to Fermi level'
80 average_times = isinstance(writer.collector, (TimeAverageResultsCollector, PulseConvolutionAverageResultsCollector))
82 energies_occ = writer.calc.energies_occ
83 energies_unocc = writer.calc.energies_unocc
84 if len(energies_occ) == 0 and len(energies_unocc) == 0:
85 raise ValueError('Either occupied or unoccupied energies grid must be given')
87 _data = dict(**writer.common_arrays)
88 _data.update(writer.calculate_data()._data)
90 if world.rank > 0:
91 return
93 pulsefreq = _data.pop('pulsefreq', None)
94 pulsefwhm = _data.pop('pulsefwhm', None)
96 data = cast(dict[str, NDArray[np.float64]], _data)
98 # Set up array to be written in the data file.
99 # Rows are energies, columns are projections and/or times
100 nI = len(writer.voronoi)
101 ne = max(len(energies_occ), len(energies_unocc)) # Longest length of energies
103 eh_labels = []
104 if len(energies_occ) > 0:
105 # Compute hole distributions
106 eh_labels.append('H')
107 if len(energies_unocc) > 0:
108 # Compute electron distributions
109 eh_labels.append('E')
111 if average_times:
112 nt = 1
113 else:
114 nt = len(data['time_t'])
116 ncolspertime = len(eh_labels) * (1 + nI)
118 savedata = np.full((ne, len(eh_labels) + nt * ncolspertime), np.nan)
119 savedata_by_times = [savedata[:, col:col + ncolspertime]
120 for col in range(len(eh_labels), savedata.shape[1], ncolspertime)]
122 # Set up format string
123 fmt = len(eh_labels) * ['%15.6f'] + nt * ncolspertime * ['%18.8e']
125 # Set up header contents
126 header_lines = [f'Hot carrier (H=hole, E=electron) distributions {zerostr}']
128 if pulsefreq is not None:
129 header_lines.append(f'Response to pulse with frequency {pulsefreq:.2f}eV, '
130 f'FWHM {pulsefwhm:.2f}fs')
132 if average_times:
133 avgtimes = data['avgtime_t']
134 header_lines.append(f'Averaged for {len(avgtimes)} times between '
135 f'{avgtimes[0]:.1f}fs-{avgtimes[-1]:.1f}fs')
136 else:
137 times = data['time_t']
138 header_lines.append(f'Computed for the following {len(times)} times (in units of fs)')
139 header_lines += [f' {time:.4f}' for t, time in enumerate(times)]
141 if nI > 0:
142 header_lines.append('Atomic projections')
143 header_lines += [f' {i:4.0f}: {str(proj)}' for i, proj in enumerate(writer.voronoi.atom_projections)]
145 header_lines.append(f'Gaussian folding, Width {writer.calc.sigma:.4f}eV')
146 desc_entries = ([f'{label} energy (eV)' for label in eh_labels] +
147 [f'Total {label} (1/eV)' for label in eh_labels] +
148 [f'Proj {s} {i:2.0f} (1/eV)' for i in range(nI) for s in eh_labels])
149 desc_entries = ([f'{s:>15}' for s in desc_entries[:len(eh_labels)]] +
150 [f'{s:>18}' for s in desc_entries[len(eh_labels):]])
151 desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# '
152 if not average_times:
153 desc_entries.append(' ... repeated for next times')
154 header_lines.append(' '.join(desc_entries))
156 # Write the data to the array
157 if average_times:
158 if len(eh_labels) == 2:
159 # Computed both electron and hole distributions
160 savedata[:len(energies_occ), 0] = energies_occ
161 savedata[:len(energies_unocc), 1] = energies_unocc
163 savedata[:len(energies_occ), 2] = data['hcdist_o']
164 savedata[:len(energies_unocc), 3] = data['hcdist_u']
166 if nI > 0:
167 savedata[:len(energies_occ), 4::2] = data['hcdist_proj_Io'].T
168 savedata[:len(energies_unocc), 5::2] = data['hcdist_proj_Iu'].T
169 elif 'H' in eh_labels:
170 # Only hole distributions
171 savedata[:, 0] = energies_occ
172 savedata[:, 1] = data['hcdist_o']
174 if nI > 0:
175 savedata[:, 2:] = data['hcdist_proj_Io'].T
176 else:
177 # Only electron distributions
178 savedata[:, 0] = energies_unocc
179 savedata[:, 1] = data['hcdist_u']
181 if nI > 0:
182 savedata[:, 1:] = data['hcdist_proj_Iu'].T
183 else:
184 if len(eh_labels) == 2:
185 # Computed both electron and hole distributions
186 savedata[:len(energies_occ), 0] = energies_occ
187 savedata[:len(energies_unocc), 1] = energies_unocc
189 for t, sdata in enumerate(savedata_by_times):
190 sdata[:len(energies_occ), 0] = data['hcdist_to'][t]
191 sdata[:len(energies_unocc), 1] = data['hcdist_tu'][t]
193 if nI > 0:
194 sdata[:len(energies_occ), 2::2] = data['hcdist_proj_tIo'][t].T
195 sdata[:len(energies_unocc), 3::2] = data['hcdist_proj_tIu'][t].T
196 elif 'H' in eh_labels:
197 # Only hole distributions
198 savedata[:, 0] = energies_occ
200 for t, sdata in enumerate(savedata_by_times):
201 sdata[:, 0] = data['hcdist_to'][t]
203 if nI > 0:
204 sdata[:, 1:] = data['hcdist_proj_tIo'][t].T
205 else:
206 # Only electron distributions
207 savedata[:, 0] = energies_unocc
209 for t, sdata in enumerate(savedata_by_times):
210 sdata[:, 0] = data['hcdist_tu'][t]
212 if nI > 0:
213 sdata[:, 1:] = data['hcdist_proj_tIu'][t].T
215 np.savetxt(out_fname, savedata, fmt, header='\n'.join(header_lines))
216 writer.calc.log(f'Written {out_fname}', who='Calculator', flush=True)
219def write_hot_carrier_totals_by_pulse(out_fname: str,
220 writer: HotCarriersWriter):
222 Np = len(writer.calc.pulses)
223 data = dict(**writer.common_arrays)
224 data.update(writer.calculate_data()._data)
226 if world.rank > 0:
227 return
229 nI = len(writer.voronoi)
230 avgtimes = data['avgtime_t']
231 assert isinstance(avgtimes, np.ndarray)
233 savedata = np.full((Np, 2*(2+nI)), np.nan)
234 savedata[:, 0] = data['pulsefreq_p']
235 savedata[:, 1] = data['pulsefwhm_p']
236 savedata[:, 2] = data['sumocc_p']
237 savedata[:, 3] = data['sumunocc_p']
238 savedata[:, 4::2] = data['sumocc_proj_pI']
239 savedata[:, 5::2] = data['sumunocc_proj_pI']
241 projectionsstr = '\n'.join([f' {i:4.0f}: {str(proj)}'
242 for i, proj in enumerate(writer.voronoi.atom_projections)])
243 desc_entries = (['Pulse freq (eV)', 'Pulse FWHM (fs)', 'Total H', 'Total E'] +
244 [f'Proj {s} {i:2.0f}' for i in range(nI) for s in 'HE'])
245 desc_entries = ([f'{s:>17}' for s in desc_entries[:2]] +
246 [f'{s:>15}' for s in desc_entries[2:]])
247 desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# '
249 header = (f'Hot carrier (H=hole, E=electron) numbers\n'
250 f'Averaged for {len(avgtimes)} times between '
251 f'{avgtimes[0]:.1f}fs-{avgtimes[-1]:.1f}fs\n'
252 'Atomic projections:\n'
253 f'{projectionsstr}\n'
254 f'{" ".join(desc_entries)}')
255 fmt = 2*['%17.6f'] + (2*(nI + 1))*['%15.8e']
256 np.savetxt(out_fname, savedata, fmt, header=header)
257 writer.calc.log(f'Written {out_fname}', who='Calculator', flush=True)
260def write_hot_carrier_totals_by_time(out_fname: str,
261 writer: HotCarriersWriter):
262 Nt = len(writer.calc.times)
263 data = dict(**writer.common_arrays)
264 data.update(writer.calculate_data()._data)
266 if world.rank > 0:
267 return
269 nI = len(writer.voronoi)
271 savedata = np.full((Nt, 1 + 2*(1+nI)), np.nan)
272 savedata[:, 0] = data['time_t']
273 savedata[:, 1] = data['sumocc_t']
274 savedata[:, 2] = data['sumunocc_t']
275 savedata[:, 3::2] = data['sumocc_proj_tI']
276 savedata[:, 4::2] = data['sumunocc_proj_tI']
278 projectionsstr = '\n'.join([f' {i:4.0f}: {str(proj)}'
279 for i, proj in enumerate(writer.voronoi.atom_projections)])
280 desc_entries = (['Time (fs)', 'Total H', 'Total E'] +
281 [f'Proj {s} {i:2.0f}' for i in range(nI) for s in 'HE'])
282 desc_entries = ([f'{s:>17}' for s in desc_entries[:1]] +
283 [f'{s:>15}' for s in desc_entries[1:]])
284 desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# '
286 header_lines = ['Hot carrier (H=hole, E=electron) numbers\n']
287 if 'pulsefreq' in data:
288 header_lines += [f'Response to pulse with frequency {data["pulsefreq"]:.2f}eV, '
289 f'FWHM {data["pulsefwhm"]:.2f}fs']
290 header_lines += ['Atomic projections:',
291 f'{projectionsstr}\n',
292 ' '.join(desc_entries)]
293 header = '\n'.join(header_lines)
294 fmt = ['%17.6f'] + (2*(nI + 1))*['%15.8e']
295 np.savetxt(out_fname, savedata, fmt, header=header)
296 writer.calc.log(f'Written {out_fname}', who='Calculator', flush=True)