Coverage for tests/unittests/writers/test_hcdist_writers.py: 97%

211 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3import pytest 

4import numpy as np 

5from json import loads 

6 

7from gpaw.mpi import world 

8from rhodent.calculators.hotcarriers import HotCarriersCalculator 

9from rhodent.utils import create_pulse 

10 

11 

12def compare(leftdata, rightdata, leftkeys, rightkeys): 

13 for leftkey, rightkey in zip(leftkeys, rightkeys): 

14 np.testing.assert_allclose(leftdata[leftkey], rightdata[rightkey], 

15 err_msg=f'left: {leftkey} - right: {rightkey}') 

16 

17 

18def compare_known(leftdata, rightdata, leftkeys, rightkeys, reference): 

19 # Loose tolerance to allow for energy and time grids which are written out with less precision 

20 for leftkey, rightkey, ref in zip(leftkeys, rightkeys, reference): 

21 np.testing.assert_allclose(leftdata[leftkey], rightdata[rightkey], atol=1e-5, 

22 err_msg=f'left: {leftkey} - right: {rightkey}') 

23 np.testing.assert_allclose(leftdata[leftkey], ref, atol=1e-5, 

24 err_msg=f'{leftkey} != reference') 

25 

26 

27def read_header(fname, truncate=140): 

28 """ Return header lines and the first line of data """ 

29 def reader(): 

30 with open(fname) as fp: 

31 for line in fp: 

32 yield line[:truncate].rstrip('\n') 

33 if not line.startswith('#'): 

34 return 

35 

36 return list(reader()) 

37 

38 

39def parse_projections(header): 

40 if isinstance(header, str): 

41 header_lines = header.split('\n') 

42 else: 

43 header_lines = header 

44 

45 begin = -1 

46 for i, line in enumerate(header_lines): 

47 if 'Atomic projections' in line: 

48 begin = i+1 

49 break 

50 if begin == -1: 

51 raise ValueError('No projections') 

52 projections = [] 

53 for i, line in enumerate(header_lines[begin:]): 

54 if line[0] != '#': 

55 # We have parsed the entire header 

56 break 

57 try: 

58 num, projstr = line[1:].split(':') 

59 num = int(num) 

60 assert num == i, line 

61 proj_atoms = loads(projstr) 

62 assert isinstance(proj_atoms, list) 

63 assert all(isinstance(a, int) for a in proj_atoms) 

64 projections.append(proj_atoms) 

65 except ValueError: 

66 # Format does not match, we are at the next part of the header 

67 break 

68 return projections 

69 

70 

71@pytest.mark.parametrize('test_system', ['Na55']) 

72@pytest.mark.parametrize('only_one_pulse', [True, False]) 

73def test_hcdist_avg_formats(tmp_path, mock_voronoi, mock_response, only_one_pulse): 

74 time_t = np.linspace(0, 30, 7) # In fs 

75 

76 # Set up mock objects 

77 voronoi = mock_voronoi(atom_projections=[[0, 1, 2], [3]]) 

78 pulse = create_pulse(4.0) 

79 

80 calc_kwargs = dict(response=mock_response(), 

81 voronoi=None, 

82 times=time_t * 1e3, 

83 pulses=[pulse], 

84 energies_occ=np.linspace(-5, 1, 10), 

85 energies_unocc=np.linspace(-1, 5, 10), 

86 sigma=0.05) 

87 calc = HotCarriersCalculator(**calc_kwargs) 

88 kwargs = dict(only_one_pulse=only_one_pulse) 

89 

90 # Calculate with and without Voronoi 

91 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist.npz'), **kwargs) 

92 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist.dat'), **kwargs) 

93 

94 calc_kwargs['voronoi'] = voronoi 

95 calc = HotCarriersCalculator(**calc_kwargs) 

96 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist_voronoi.npz'), **kwargs) 

97 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist_voronoi.dat'), **kwargs) 

98 

99 if world.rank != 0: 

100 return 

101 

102 # Check header contents of text files 

103 header = '\n'.join(read_header(tmp_path / 'hcdist.dat')) 

104 assert 'relative to Fermi level' in header 

105 assert 'Averaged for' in header 

106 if only_one_pulse: 

107 assert 'Response to pulse' in header 

108 header = '\n'.join(read_header(tmp_path / 'hcdist_voronoi.dat')) 

109 assert 'relative to Fermi level' in header 

110 assert 'Averaged for' in header 

111 np.testing.assert_equal(voronoi.atom_projections, parse_projections(header)) 

112 if only_one_pulse: 

113 assert 'Response to pulse' in header 

114 

115 # Verify without Voronoi 

116 data_dat = np.loadtxt(tmp_path / 'hcdist.dat') 

117 data_npz = np.load(tmp_path / 'hcdist.npz') 

118 

119 if only_one_pulse: 

120 np.testing.assert_allclose(data_npz['pulsefreq'], 4.0) 

121 np.testing.assert_allclose(data_npz['pulsefwhm'], 5.0) 

122 assert data_dat.shape[1] == 4 

123 np.testing.assert_allclose(data_npz['avgtime_t'], time_t) 

124 np.testing.assert_allclose(data_npz['sigma'], calc_kwargs['sigma']) 

125 compare_known(data_npz, data_dat.T, 

126 ['energy_o', 'energy_u'], [0, 1], 

127 [calc_kwargs['energies_occ'], calc_kwargs['energies_unocc']]) 

128 

129 compare(data_npz, data_dat.T, ['hcdist_o', 'hcdist_u'], [2, 3]) 

130 hcdist_o = data_npz['hcdist_o'] 

131 hcdist_u = data_npz['hcdist_u'] 

132 

133 # Verify with Voronoi 

134 data_dat = np.loadtxt(tmp_path / 'hcdist_voronoi.dat') 

135 data_npz = np.load(tmp_path / 'hcdist_voronoi.npz') 

136 

137 if only_one_pulse: 

138 np.testing.assert_allclose(data_npz['pulsefreq'], 4.0) 

139 np.testing.assert_allclose(data_npz['pulsefwhm'], 5.0) 

140 assert data_dat.shape[1] == 4 + 2 * len(voronoi) 

141 np.testing.assert_allclose(data_npz['avgtime_t'], time_t) 

142 np.testing.assert_allclose(data_npz['sigma'], calc_kwargs['sigma']) 

143 compare_known(data_npz, data_dat.T, 

144 ['energy_o', 'energy_u', 'hcdist_o', 'hcdist_u'], [0, 1, 2, 3], 

145 [calc_kwargs['energies_occ'], calc_kwargs['energies_unocc'], hcdist_o, hcdist_u]) 

146 

147 compare(data_npz, data_dat.T, ['hcdist_proj_Io', 'hcdist_proj_Iu'], [(slice(4, None, 2)), slice(5, None, 2)]) 

148 

149 

150@pytest.mark.parametrize('test_system', ['Na55']) 

151def test_hcdist_formats(tmp_path, mock_response, mock_voronoi): 

152 time_t = np.linspace(0, 30, 7) # In fs 

153 pulse = create_pulse(4.0) 

154 

155 # Set up mock objects 

156 voronoi = mock_voronoi(atom_projections=[[0, 1, 2], [3]]) 

157 calc_kwargs = dict(response=mock_response(), 

158 voronoi=None, 

159 times=time_t * 1e3, 

160 pulses=[pulse], 

161 energies_occ=np.linspace(-5, 1, 10), 

162 energies_unocc=np.linspace(-1, 5, 10), 

163 sigma=0.05) 

164 calc = HotCarriersCalculator(**calc_kwargs) 

165 kwargs = dict(only_one_pulse=True, average_times=False) 

166 

167 # Calculate with and without Voronoi 

168 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist.npz'), **kwargs) 

169 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist.dat'), **kwargs) 

170 

171 calc_kwargs['voronoi'] = voronoi 

172 calc = HotCarriersCalculator(**calc_kwargs) 

173 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist_voronoi.npz'), **kwargs) 

174 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist_voronoi.dat'), **kwargs) 

175 

176 if world.rank != 0: 

177 return 

178 # Check header contents of text files 

179 header = '\n'.join(read_header(tmp_path / 'hcdist.dat')) 

180 assert 'relative to Fermi level' in header 

181 header = '\n'.join(read_header(tmp_path / 'hcdist_voronoi.dat')) 

182 assert 'relative to Fermi level' in header 

183 np.testing.assert_equal(voronoi.atom_projections, parse_projections(header)) 

184 

185 # The format of the text files should be 

186 # (i) Energy grids: first two columns 

187 # (ii) Total distributions: next two columns 

188 # (iii) Projected distributions: next 2 * len(voronoi) columns 

189 # Repeat (ii)-(iii) for as many times as there are 

190 

191 # Verify without Voronoi 

192 data_dat = np.loadtxt(tmp_path / 'hcdist.dat') 

193 data_npz = np.load(tmp_path / 'hcdist.npz') 

194 

195 assert data_dat.shape[1] == 2 + 2 * (len(time_t)) 

196 np.testing.assert_allclose(data_npz['time_t'], time_t) 

197 np.testing.assert_allclose(data_npz['sigma'], calc_kwargs['sigma']) 

198 compare_known(data_npz, data_dat.T, 

199 ['energy_o', 'energy_u'], [0, 1], 

200 [calc_kwargs['energies_occ'], calc_kwargs['energies_unocc']]) 

201 

202 compare(data_npz, data_dat.T, ['hcdist_to', 'hcdist_tu'], [(slice(2, None, 2)), slice(3, None, 2)]) 

203 hcdist_to = data_npz['hcdist_to'] 

204 hcdist_tu = data_npz['hcdist_tu'] 

205 

206 # Verify with Voronoi 

207 data_dat = np.loadtxt(tmp_path / 'hcdist_voronoi.dat') 

208 data_npz = np.load(tmp_path / 'hcdist_voronoi.npz') 

209 

210 assert data_dat.shape[1] == 2 + 2 * (len(voronoi) + 1) * len(time_t) 

211 np.testing.assert_allclose(data_npz['time_t'], time_t) 

212 np.testing.assert_allclose(data_npz['sigma'], calc_kwargs['sigma']) 

213 compare_known(data_npz, data_dat.T, 

214 ['energy_o', 'energy_u', 'hcdist_to', 'hcdist_tu'], 

215 [0, 1, (slice(2, None, 2 * (len(voronoi) + 1))), slice(3, None, 2 * (len(voronoi) + 1))], 

216 [calc_kwargs['energies_occ'], calc_kwargs['energies_unocc'], hcdist_to, hcdist_tu]) 

217 

218 # Drop energy_grids and reshape into energy, time, total + voronoi 

219 hcdist_otI = data_dat[:, 2::2].reshape((-1, len(time_t), len(voronoi) + 1)) 

220 hcdist_utI = data_dat[:, 3::2].reshape((-1, len(time_t), len(voronoi) + 1)) 

221 # Drop the totals and reshape into time, voronoi, energy 

222 hcdist_tIo = hcdist_otI[:, :, 1:].swapaxes(0, 1).swapaxes(1, 2) 

223 hcdist_tIu = hcdist_utI[:, :, 1:].swapaxes(0, 1).swapaxes(1, 2) 

224 # Verify 

225 np.testing.assert_allclose(data_npz['hcdist_proj_tIo'], hcdist_tIo) 

226 np.testing.assert_allclose(data_npz['hcdist_proj_tIu'], hcdist_tIu) 

227 

228 

229@pytest.mark.parametrize('test_system', ['Na55']) 

230def test_hctime_formats(tmp_path, mock_response, mock_voronoi): 

231 time_t = np.linspace(0, 30, 12) # In fs 

232 pulse = create_pulse(4.0) 

233 

234 # Set up mock objects 

235 voronoi = mock_voronoi(atom_projections=[[0, 1, 2], [3]]) 

236 calc_kwargs = dict(response=mock_response(), 

237 voronoi=None, 

238 times=time_t * 1e3, 

239 pulses=[pulse], 

240 energies_occ=[], 

241 energies_unocc=[], 

242 sigma=None) 

243 calc = HotCarriersCalculator(**calc_kwargs) 

244 

245 # Calculate with and without Voronoi 

246 calc.calculate_totals_by_time_and_write(str(tmp_path / 'hctime.npz')) 

247 calc.calculate_totals_by_time_and_write(str(tmp_path / 'hctime.dat')) 

248 

249 calc_kwargs['voronoi'] = voronoi 

250 calc = HotCarriersCalculator(**calc_kwargs) 

251 calc.calculate_totals_by_time_and_write(str(tmp_path / 'hctime_voronoi.npz')) 

252 calc.calculate_totals_by_time_and_write(str(tmp_path / 'hctime_voronoi.dat')) 

253 

254 if world.rank != 0: 

255 return 

256 # Check header contents of text files 

257 header = '\n'.join(read_header(tmp_path / 'hctime.dat')) 

258 header = '\n'.join(read_header(tmp_path / 'hctime_voronoi.dat')) 

259 np.testing.assert_equal(voronoi.atom_projections, parse_projections(header)) 

260 

261 # Verify without Voronoi 

262 data_dat = np.loadtxt(tmp_path / 'hctime.dat') 

263 data_npz = np.load(tmp_path / 'hctime.npz') 

264 

265 assert data_dat.shape[1] == 3 

266 np.testing.assert_allclose(data_npz['time_t'], time_t) 

267 np.testing.assert_allclose(data_npz['time_t'], data_dat[:, 0], atol=1e-5) 

268 compare_known(data_npz, data_dat.T, ['time_t'], [0], [time_t]) 

269 compare(data_npz, data_dat.T, ['sumocc_t', 'sumunocc_t'], [1, 2]) 

270 sumocc_t = data_npz['sumocc_t'] 

271 sumunocc_t = data_npz['sumunocc_t'] 

272 

273 # Verify with Voronoi 

274 data_dat = np.loadtxt(tmp_path / 'hctime_voronoi.dat') 

275 data_npz = np.load(tmp_path / 'hctime_voronoi.npz') 

276 

277 assert data_dat.shape[1] == 3 + 2 * len(voronoi) 

278 np.testing.assert_allclose(data_npz['time_t'], time_t) 

279 compare_known(data_npz, data_dat.T, 

280 ['time_t', 'sumocc_t', 'sumunocc_t'], [0, 1, 2], 

281 [time_t, sumocc_t, sumunocc_t]) 

282 

283 np.testing.assert_allclose(data_npz['sumocc_proj_tI'], data_dat[:, 3::2]) 

284 np.testing.assert_allclose(data_npz['sumunocc_proj_tI'], data_dat[:, 4::2]) 

285 

286 

287@pytest.mark.parametrize('test_system', ['Na55']) 

288def test_hcpulse_formats(tmp_path, mock_response, mock_voronoi): 

289 time_t = np.linspace(0, 30, 3) # In fs 

290 pfs = [3.2, 3.3] 

291 fwhms = [5.0, 6.0] 

292 pulses = [create_pulse(pf, fwhm) for pf, fwhm in zip(pfs, fwhms)] 

293 

294 # Set up mock objects 

295 voronoi = mock_voronoi(atom_projections=[[0, 1, 2], [3]]) 

296 calc_kwargs = dict(response=mock_response(), 

297 voronoi=None, 

298 times=time_t * 1e3, 

299 pulses=pulses, 

300 energies_occ=[], 

301 energies_unocc=[], 

302 sigma=None) 

303 calc = HotCarriersCalculator(**calc_kwargs) 

304 

305 # Calculate with and without Voronoi 

306 calc.calculate_totals_by_pulse_and_write(str(tmp_path / 'hcpulse.npz')) 

307 calc.calculate_totals_by_pulse_and_write(str(tmp_path / 'hcpulse.dat')) 

308 

309 calc_kwargs['voronoi'] = voronoi 

310 calc = HotCarriersCalculator(**calc_kwargs) 

311 calc.calculate_totals_by_pulse_and_write(str(tmp_path / 'hcpulse_voronoi.npz')) 

312 calc.calculate_totals_by_pulse_and_write(str(tmp_path / 'hcpulse_voronoi.dat')) 

313 

314 if world.rank != 0: 

315 return 

316 # Check header contents of text files 

317 header = '\n'.join(read_header(tmp_path / 'hcpulse.dat')) 

318 assert 'Averaged for' in header 

319 header = '\n'.join(read_header(tmp_path / 'hcpulse_voronoi.dat')) 

320 assert 'Averaged for' in header 

321 np.testing.assert_equal(voronoi.atom_projections, parse_projections(header)) 

322 

323 # Verify without Voronoi 

324 data_dat = np.loadtxt(tmp_path / 'hcpulse.dat') 

325 data_npz = np.load(tmp_path / 'hcpulse.npz') 

326 

327 assert data_dat.shape[1] == 4 

328 np.testing.assert_allclose(data_npz['avgtime_t'], time_t) 

329 compare_known(data_npz, data_dat.T, ['pulsefreq_p', 'pulsefwhm_p'], [0, 1], [pfs, fwhms]) 

330 compare(data_npz, data_dat.T, ['sumocc_p', 'sumunocc_p'], [2, 3]) 

331 sumocc_p = data_npz['sumocc_p'] 

332 sumunocc_p = data_npz['sumunocc_p'] 

333 

334 # Verify with Voronoi 

335 data_dat = np.loadtxt(tmp_path / 'hcpulse_voronoi.dat') 

336 data_npz = np.load(tmp_path / 'hcpulse_voronoi.npz') 

337 

338 assert data_dat.shape[1] == 4 + 2 * len(voronoi) 

339 np.testing.assert_allclose(data_npz['avgtime_t'], time_t) 

340 compare_known(data_npz, data_dat.T, 

341 ['pulsefreq_p', 'pulsefwhm_p', 'sumocc_p', 'sumunocc_p'], 

342 [0, 1, 2, 3], [pfs, fwhms, sumocc_p, sumunocc_p]) 

343 

344 np.testing.assert_allclose(data_npz['sumocc_proj_pI'], data_dat[:, 4::2]) 

345 np.testing.assert_allclose(data_npz['sumunocc_proj_pI'], data_dat[:, 5::2])