Coverage for rhodent/writers/hcdist.py: 85%

176 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3from typing import cast 

4import numpy as np 

5from numpy.typing import NDArray 

6from gpaw.mpi import world 

7 

8from ..density_matrices.time import ConvolutionDensityMatrices 

9from .writer import (Writer, ResultsCollector, TimeAverageResultsCollector, 

10 PulseConvolutionAverageResultsCollector) 

11from ..utils import get_gaussian_pulse_values 

12 

13 

14class HotCarriersWriter(Writer): 

15 

16 """ Calculate hot-carrier totals, and optionally broadened hot-carrier energy distributions 

17 

18 Parameters 

19 ---------- 

20 collector 

21 ResultsCollector object 

22 only_one_pulse 

23 False if the resulting outputs should have one dimension corresponding 

24 to different pulses. True if there should be no such dimension. If True, 

25 then the calculator must only hold one pulse. 

26 """ 

27 

28 def __init__(self, 

29 collector: ResultsCollector, 

30 only_one_pulse: bool): 

31 super().__init__(collector) 

32 self.only_one_pulse = only_one_pulse 

33 if only_one_pulse: 

34 if isinstance(self.density_matrices, ConvolutionDensityMatrices): 

35 assert len(self.density_matrices.pulses) == 1, 'Only one pulse allowed' 

36 else: 

37 assert isinstance(self.density_matrices, ConvolutionDensityMatrices) 

38 

39 @property 

40 def common_arrays(self) -> dict[str, NDArray[np.float64] | NDArray[np.int64] | int | float]: 

41 common = super().common_arrays 

42 assert isinstance(self.density_matrices, ConvolutionDensityMatrices) 

43 

44 if isinstance(self.collector, (TimeAverageResultsCollector, PulseConvolutionAverageResultsCollector)): 

45 # Averages over time are taken 

46 common['avgtime_t'] = self.density_matrices.times * 1e-3 

47 else: 

48 common['time_t'] = self.density_matrices.times * 1e-3 

49 

50 if isinstance(self.density_matrices, ConvolutionDensityMatrices): 

51 # If pulses are Gaussian pulses, then get dictionaries of 'pulsefreq' and 'pulsefwhm' 

52 pulsedicts = [get_gaussian_pulse_values(pulse) for pulse in self.density_matrices.pulses] 

53 

54 try: 

55 pulsefreqs = [d['pulsefreq'] for d in pulsedicts] 

56 pulsefwhms = [d['pulsefwhm'] for d in pulsedicts] 

57 

58 if self.only_one_pulse: 

59 common['pulsefreq'] = pulsefreqs[0] 

60 common['pulsefwhm'] = pulsefwhms[0] 

61 else: 

62 common['pulsefreq_p'] = np.array(pulsefreqs) 

63 common['pulsefwhm_p'] = np.array(pulsefwhms) 

64 except KeyError: 

65 # Not GaussianPulses 

66 pass 

67 

68 if self.calc.sigma is not None: 

69 # There is an energy grid 

70 common['sigma'] = self.calc.sigma 

71 common['energy_o'] = np.array(self.calc.energies_occ) 

72 common['energy_u'] = np.array(self.calc.energies_unocc) 

73 

74 return common 

75 

76 

77def write_hot_carrier_distributions(out_fname: str, 

78 writer: HotCarriersWriter): 

79 zerostr = 'relative to Fermi level' 

80 average_times = isinstance(writer.collector, (TimeAverageResultsCollector, PulseConvolutionAverageResultsCollector)) 

81 

82 energies_occ = writer.calc.energies_occ 

83 energies_unocc = writer.calc.energies_unocc 

84 if len(energies_occ) == 0 and len(energies_unocc) == 0: 

85 raise ValueError('Either occupied or unoccupied energies grid must be given') 

86 

87 _data = dict(**writer.common_arrays) 

88 _data.update(writer.calculate_data()._data) 

89 

90 if world.rank > 0: 

91 return 

92 

93 pulsefreq = _data.pop('pulsefreq', None) 

94 pulsefwhm = _data.pop('pulsefwhm', None) 

95 

96 data = cast(dict[str, NDArray[np.float64]], _data) 

97 

98 # Set up array to be written in the data file. 

99 # Rows are energies, columns are projections and/or times 

100 nI = len(writer.voronoi) 

101 ne = max(len(energies_occ), len(energies_unocc)) # Longest length of energies 

102 

103 eh_labels = [] 

104 if len(energies_occ) > 0: 

105 # Compute hole distributions 

106 eh_labels.append('H') 

107 if len(energies_unocc) > 0: 

108 # Compute electron distributions 

109 eh_labels.append('E') 

110 

111 if average_times: 

112 nt = 1 

113 else: 

114 nt = len(data['time_t']) 

115 

116 ncolspertime = len(eh_labels) * (1 + nI) 

117 

118 savedata = np.full((ne, len(eh_labels) + nt * ncolspertime), np.nan) 

119 savedata_by_times = [savedata[:, col:col + ncolspertime] 

120 for col in range(len(eh_labels), savedata.shape[1], ncolspertime)] 

121 

122 # Set up format string 

123 fmt = len(eh_labels) * ['%15.6f'] + nt * ncolspertime * ['%18.8e'] 

124 

125 # Set up header contents 

126 header_lines = [f'Hot carrier (H=hole, E=electron) distributions {zerostr}'] 

127 

128 if pulsefreq is not None: 

129 header_lines.append(f'Response to pulse with frequency {pulsefreq:.2f}eV, ' 

130 f'FWHM {pulsefwhm:.2f}fs') 

131 

132 if average_times: 

133 avgtimes = data['avgtime_t'] 

134 header_lines.append(f'Averaged for {len(avgtimes)} times between ' 

135 f'{avgtimes[0]:.1f}fs-{avgtimes[-1]:.1f}fs') 

136 else: 

137 times = data['time_t'] 

138 header_lines.append(f'Computed for the following {len(times)} times (in units of fs)') 

139 header_lines += [f' {time:.4f}' for t, time in enumerate(times)] 

140 

141 if nI > 0: 

142 header_lines.append('Atomic projections') 

143 header_lines += [f' {i:4.0f}: {str(proj)}' for i, proj in enumerate(writer.voronoi.atom_projections)] 

144 

145 header_lines.append(f'Gaussian folding, Width {writer.calc.sigma:.4f}eV') 

146 desc_entries = ([f'{label} energy (eV)' for label in eh_labels] + 

147 [f'Total {label} (1/eV)' for label in eh_labels] + 

148 [f'Proj {s} {i:2.0f} (1/eV)' for i in range(nI) for s in eh_labels]) 

149 desc_entries = ([f'{s:>15}' for s in desc_entries[:len(eh_labels)]] + 

150 [f'{s:>18}' for s in desc_entries[len(eh_labels):]]) 

151 desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' 

152 if not average_times: 

153 desc_entries.append(' ... repeated for next times') 

154 header_lines.append(' '.join(desc_entries)) 

155 

156 # Write the data to the array 

157 if average_times: 

158 if len(eh_labels) == 2: 

159 # Computed both electron and hole distributions 

160 savedata[:len(energies_occ), 0] = energies_occ 

161 savedata[:len(energies_unocc), 1] = energies_unocc 

162 

163 savedata[:len(energies_occ), 2] = data['hcdist_o'] 

164 savedata[:len(energies_unocc), 3] = data['hcdist_u'] 

165 

166 if nI > 0: 

167 savedata[:len(energies_occ), 4::2] = data['hcdist_proj_Io'].T 

168 savedata[:len(energies_unocc), 5::2] = data['hcdist_proj_Iu'].T 

169 elif 'H' in eh_labels: 

170 # Only hole distributions 

171 savedata[:, 0] = energies_occ 

172 savedata[:, 1] = data['hcdist_o'] 

173 

174 if nI > 0: 

175 savedata[:, 2:] = data['hcdist_proj_Io'].T 

176 else: 

177 # Only electron distributions 

178 savedata[:, 0] = energies_unocc 

179 savedata[:, 1] = data['hcdist_u'] 

180 

181 if nI > 0: 

182 savedata[:, 1:] = data['hcdist_proj_Iu'].T 

183 else: 

184 if len(eh_labels) == 2: 

185 # Computed both electron and hole distributions 

186 savedata[:len(energies_occ), 0] = energies_occ 

187 savedata[:len(energies_unocc), 1] = energies_unocc 

188 

189 for t, sdata in enumerate(savedata_by_times): 

190 sdata[:len(energies_occ), 0] = data['hcdist_to'][t] 

191 sdata[:len(energies_unocc), 1] = data['hcdist_tu'][t] 

192 

193 if nI > 0: 

194 sdata[:len(energies_occ), 2::2] = data['hcdist_proj_tIo'][t].T 

195 sdata[:len(energies_unocc), 3::2] = data['hcdist_proj_tIu'][t].T 

196 elif 'H' in eh_labels: 

197 # Only hole distributions 

198 savedata[:, 0] = energies_occ 

199 

200 for t, sdata in enumerate(savedata_by_times): 

201 sdata[:, 0] = data['hcdist_to'][t] 

202 

203 if nI > 0: 

204 sdata[:, 1:] = data['hcdist_proj_tIo'][t].T 

205 else: 

206 # Only electron distributions 

207 savedata[:, 0] = energies_unocc 

208 

209 for t, sdata in enumerate(savedata_by_times): 

210 sdata[:, 0] = data['hcdist_tu'][t] 

211 

212 if nI > 0: 

213 sdata[:, 1:] = data['hcdist_proj_tIu'][t].T 

214 

215 np.savetxt(out_fname, savedata, fmt, header='\n'.join(header_lines)) 

216 writer.calc.log(f'Written {out_fname}', who='Calculator', flush=True) 

217 

218 

219def write_hot_carrier_totals_by_pulse(out_fname: str, 

220 writer: HotCarriersWriter): 

221 

222 Np = len(writer.calc.pulses) 

223 data = dict(**writer.common_arrays) 

224 data.update(writer.calculate_data()._data) 

225 

226 if world.rank > 0: 

227 return 

228 

229 nI = len(writer.voronoi) 

230 avgtimes = data['avgtime_t'] 

231 assert isinstance(avgtimes, np.ndarray) 

232 

233 savedata = np.full((Np, 2*(2+nI)), np.nan) 

234 savedata[:, 0] = data['pulsefreq_p'] 

235 savedata[:, 1] = data['pulsefwhm_p'] 

236 savedata[:, 2] = data['sumocc_p'] 

237 savedata[:, 3] = data['sumunocc_p'] 

238 savedata[:, 4::2] = data['sumocc_proj_pI'] 

239 savedata[:, 5::2] = data['sumunocc_proj_pI'] 

240 

241 projectionsstr = '\n'.join([f' {i:4.0f}: {str(proj)}' 

242 for i, proj in enumerate(writer.voronoi.atom_projections)]) 

243 desc_entries = (['Pulse freq (eV)', 'Pulse FWHM (fs)', 'Total H', 'Total E'] + 

244 [f'Proj {s} {i:2.0f}' for i in range(nI) for s in 'HE']) 

245 desc_entries = ([f'{s:>17}' for s in desc_entries[:2]] + 

246 [f'{s:>15}' for s in desc_entries[2:]]) 

247 desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' 

248 

249 header = (f'Hot carrier (H=hole, E=electron) numbers\n' 

250 f'Averaged for {len(avgtimes)} times between ' 

251 f'{avgtimes[0]:.1f}fs-{avgtimes[-1]:.1f}fs\n' 

252 'Atomic projections:\n' 

253 f'{projectionsstr}\n' 

254 f'{" ".join(desc_entries)}') 

255 fmt = 2*['%17.6f'] + (2*(nI + 1))*['%15.8e'] 

256 np.savetxt(out_fname, savedata, fmt, header=header) 

257 writer.calc.log(f'Written {out_fname}', who='Calculator', flush=True) 

258 

259 

260def write_hot_carrier_totals_by_time(out_fname: str, 

261 writer: HotCarriersWriter): 

262 Nt = len(writer.calc.times) 

263 data = dict(**writer.common_arrays) 

264 data.update(writer.calculate_data()._data) 

265 

266 if world.rank > 0: 

267 return 

268 

269 nI = len(writer.voronoi) 

270 

271 savedata = np.full((Nt, 1 + 2*(1+nI)), np.nan) 

272 savedata[:, 0] = data['time_t'] 

273 savedata[:, 1] = data['sumocc_t'] 

274 savedata[:, 2] = data['sumunocc_t'] 

275 savedata[:, 3::2] = data['sumocc_proj_tI'] 

276 savedata[:, 4::2] = data['sumunocc_proj_tI'] 

277 

278 projectionsstr = '\n'.join([f' {i:4.0f}: {str(proj)}' 

279 for i, proj in enumerate(writer.voronoi.atom_projections)]) 

280 desc_entries = (['Time (fs)', 'Total H', 'Total E'] + 

281 [f'Proj {s} {i:2.0f}' for i in range(nI) for s in 'HE']) 

282 desc_entries = ([f'{s:>17}' for s in desc_entries[:1]] + 

283 [f'{s:>15}' for s in desc_entries[1:]]) 

284 desc_entries[0] = desc_entries[0][2:] # Remove two spaces to account for '# ' 

285 

286 header_lines = ['Hot carrier (H=hole, E=electron) numbers\n'] 

287 if 'pulsefreq' in data: 

288 header_lines += [f'Response to pulse with frequency {data["pulsefreq"]:.2f}eV, ' 

289 f'FWHM {data["pulsefwhm"]:.2f}fs'] 

290 header_lines += ['Atomic projections:', 

291 f'{projectionsstr}\n', 

292 ' '.join(desc_entries)] 

293 header = '\n'.join(header_lines) 

294 fmt = ['%17.6f'] + (2*(nI + 1))*['%15.8e'] 

295 np.savetxt(out_fname, savedata, fmt, header=header) 

296 writer.calc.log(f'Written {out_fname}', who='Calculator', flush=True)