Coverage for tests/unittests/writers/test_hcdist_writers.py: 97%
211 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3import pytest
4import numpy as np
5from json import loads
7from gpaw.mpi import world
8from rhodent.calculators.hotcarriers import HotCarriersCalculator
9from rhodent.utils import create_pulse
12def compare(leftdata, rightdata, leftkeys, rightkeys):
13 for leftkey, rightkey in zip(leftkeys, rightkeys):
14 np.testing.assert_allclose(leftdata[leftkey], rightdata[rightkey],
15 err_msg=f'left: {leftkey} - right: {rightkey}')
18def compare_known(leftdata, rightdata, leftkeys, rightkeys, reference):
19 # Loose tolerance to allow for energy and time grids which are written out with less precision
20 for leftkey, rightkey, ref in zip(leftkeys, rightkeys, reference):
21 np.testing.assert_allclose(leftdata[leftkey], rightdata[rightkey], atol=1e-5,
22 err_msg=f'left: {leftkey} - right: {rightkey}')
23 np.testing.assert_allclose(leftdata[leftkey], ref, atol=1e-5,
24 err_msg=f'{leftkey} != reference')
27def read_header(fname, truncate=140):
28 """ Return header lines and the first line of data """
29 def reader():
30 with open(fname) as fp:
31 for line in fp:
32 yield line[:truncate].rstrip('\n')
33 if not line.startswith('#'):
34 return
36 return list(reader())
39def parse_projections(header):
40 if isinstance(header, str):
41 header_lines = header.split('\n')
42 else:
43 header_lines = header
45 begin = -1
46 for i, line in enumerate(header_lines):
47 if 'Atomic projections' in line:
48 begin = i+1
49 break
50 if begin == -1:
51 raise ValueError('No projections')
52 projections = []
53 for i, line in enumerate(header_lines[begin:]):
54 if line[0] != '#':
55 # We have parsed the entire header
56 break
57 try:
58 num, projstr = line[1:].split(':')
59 num = int(num)
60 assert num == i, line
61 proj_atoms = loads(projstr)
62 assert isinstance(proj_atoms, list)
63 assert all(isinstance(a, int) for a in proj_atoms)
64 projections.append(proj_atoms)
65 except ValueError:
66 # Format does not match, we are at the next part of the header
67 break
68 return projections
71@pytest.mark.parametrize('test_system', ['Na55'])
72@pytest.mark.parametrize('only_one_pulse', [True, False])
73def test_hcdist_avg_formats(tmp_path, mock_voronoi, mock_response, only_one_pulse):
74 time_t = np.linspace(0, 30, 7) # In fs
76 # Set up mock objects
77 voronoi = mock_voronoi(atom_projections=[[0, 1, 2], [3]])
78 pulse = create_pulse(4.0)
80 calc_kwargs = dict(response=mock_response(),
81 voronoi=None,
82 times=time_t * 1e3,
83 pulses=[pulse],
84 energies_occ=np.linspace(-5, 1, 10),
85 energies_unocc=np.linspace(-1, 5, 10),
86 sigma=0.05)
87 calc = HotCarriersCalculator(**calc_kwargs)
88 kwargs = dict(only_one_pulse=only_one_pulse)
90 # Calculate with and without Voronoi
91 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist.npz'), **kwargs)
92 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist.dat'), **kwargs)
94 calc_kwargs['voronoi'] = voronoi
95 calc = HotCarriersCalculator(**calc_kwargs)
96 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist_voronoi.npz'), **kwargs)
97 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist_voronoi.dat'), **kwargs)
99 if world.rank != 0:
100 return
102 # Check header contents of text files
103 header = '\n'.join(read_header(tmp_path / 'hcdist.dat'))
104 assert 'relative to Fermi level' in header
105 assert 'Averaged for' in header
106 if only_one_pulse:
107 assert 'Response to pulse' in header
108 header = '\n'.join(read_header(tmp_path / 'hcdist_voronoi.dat'))
109 assert 'relative to Fermi level' in header
110 assert 'Averaged for' in header
111 np.testing.assert_equal(voronoi.atom_projections, parse_projections(header))
112 if only_one_pulse:
113 assert 'Response to pulse' in header
115 # Verify without Voronoi
116 data_dat = np.loadtxt(tmp_path / 'hcdist.dat')
117 data_npz = np.load(tmp_path / 'hcdist.npz')
119 if only_one_pulse:
120 np.testing.assert_allclose(data_npz['pulsefreq'], 4.0)
121 np.testing.assert_allclose(data_npz['pulsefwhm'], 5.0)
122 assert data_dat.shape[1] == 4
123 np.testing.assert_allclose(data_npz['avgtime_t'], time_t)
124 np.testing.assert_allclose(data_npz['sigma'], calc_kwargs['sigma'])
125 compare_known(data_npz, data_dat.T,
126 ['energy_o', 'energy_u'], [0, 1],
127 [calc_kwargs['energies_occ'], calc_kwargs['energies_unocc']])
129 compare(data_npz, data_dat.T, ['hcdist_o', 'hcdist_u'], [2, 3])
130 hcdist_o = data_npz['hcdist_o']
131 hcdist_u = data_npz['hcdist_u']
133 # Verify with Voronoi
134 data_dat = np.loadtxt(tmp_path / 'hcdist_voronoi.dat')
135 data_npz = np.load(tmp_path / 'hcdist_voronoi.npz')
137 if only_one_pulse:
138 np.testing.assert_allclose(data_npz['pulsefreq'], 4.0)
139 np.testing.assert_allclose(data_npz['pulsefwhm'], 5.0)
140 assert data_dat.shape[1] == 4 + 2 * len(voronoi)
141 np.testing.assert_allclose(data_npz['avgtime_t'], time_t)
142 np.testing.assert_allclose(data_npz['sigma'], calc_kwargs['sigma'])
143 compare_known(data_npz, data_dat.T,
144 ['energy_o', 'energy_u', 'hcdist_o', 'hcdist_u'], [0, 1, 2, 3],
145 [calc_kwargs['energies_occ'], calc_kwargs['energies_unocc'], hcdist_o, hcdist_u])
147 compare(data_npz, data_dat.T, ['hcdist_proj_Io', 'hcdist_proj_Iu'], [(slice(4, None, 2)), slice(5, None, 2)])
150@pytest.mark.parametrize('test_system', ['Na55'])
151def test_hcdist_formats(tmp_path, mock_response, mock_voronoi):
152 time_t = np.linspace(0, 30, 7) # In fs
153 pulse = create_pulse(4.0)
155 # Set up mock objects
156 voronoi = mock_voronoi(atom_projections=[[0, 1, 2], [3]])
157 calc_kwargs = dict(response=mock_response(),
158 voronoi=None,
159 times=time_t * 1e3,
160 pulses=[pulse],
161 energies_occ=np.linspace(-5, 1, 10),
162 energies_unocc=np.linspace(-1, 5, 10),
163 sigma=0.05)
164 calc = HotCarriersCalculator(**calc_kwargs)
165 kwargs = dict(only_one_pulse=True, average_times=False)
167 # Calculate with and without Voronoi
168 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist.npz'), **kwargs)
169 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist.dat'), **kwargs)
171 calc_kwargs['voronoi'] = voronoi
172 calc = HotCarriersCalculator(**calc_kwargs)
173 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist_voronoi.npz'), **kwargs)
174 calc.calculate_distributions_and_write(str(tmp_path / 'hcdist_voronoi.dat'), **kwargs)
176 if world.rank != 0:
177 return
178 # Check header contents of text files
179 header = '\n'.join(read_header(tmp_path / 'hcdist.dat'))
180 assert 'relative to Fermi level' in header
181 header = '\n'.join(read_header(tmp_path / 'hcdist_voronoi.dat'))
182 assert 'relative to Fermi level' in header
183 np.testing.assert_equal(voronoi.atom_projections, parse_projections(header))
185 # The format of the text files should be
186 # (i) Energy grids: first two columns
187 # (ii) Total distributions: next two columns
188 # (iii) Projected distributions: next 2 * len(voronoi) columns
189 # Repeat (ii)-(iii) for as many times as there are
191 # Verify without Voronoi
192 data_dat = np.loadtxt(tmp_path / 'hcdist.dat')
193 data_npz = np.load(tmp_path / 'hcdist.npz')
195 assert data_dat.shape[1] == 2 + 2 * (len(time_t))
196 np.testing.assert_allclose(data_npz['time_t'], time_t)
197 np.testing.assert_allclose(data_npz['sigma'], calc_kwargs['sigma'])
198 compare_known(data_npz, data_dat.T,
199 ['energy_o', 'energy_u'], [0, 1],
200 [calc_kwargs['energies_occ'], calc_kwargs['energies_unocc']])
202 compare(data_npz, data_dat.T, ['hcdist_to', 'hcdist_tu'], [(slice(2, None, 2)), slice(3, None, 2)])
203 hcdist_to = data_npz['hcdist_to']
204 hcdist_tu = data_npz['hcdist_tu']
206 # Verify with Voronoi
207 data_dat = np.loadtxt(tmp_path / 'hcdist_voronoi.dat')
208 data_npz = np.load(tmp_path / 'hcdist_voronoi.npz')
210 assert data_dat.shape[1] == 2 + 2 * (len(voronoi) + 1) * len(time_t)
211 np.testing.assert_allclose(data_npz['time_t'], time_t)
212 np.testing.assert_allclose(data_npz['sigma'], calc_kwargs['sigma'])
213 compare_known(data_npz, data_dat.T,
214 ['energy_o', 'energy_u', 'hcdist_to', 'hcdist_tu'],
215 [0, 1, (slice(2, None, 2 * (len(voronoi) + 1))), slice(3, None, 2 * (len(voronoi) + 1))],
216 [calc_kwargs['energies_occ'], calc_kwargs['energies_unocc'], hcdist_to, hcdist_tu])
218 # Drop energy_grids and reshape into energy, time, total + voronoi
219 hcdist_otI = data_dat[:, 2::2].reshape((-1, len(time_t), len(voronoi) + 1))
220 hcdist_utI = data_dat[:, 3::2].reshape((-1, len(time_t), len(voronoi) + 1))
221 # Drop the totals and reshape into time, voronoi, energy
222 hcdist_tIo = hcdist_otI[:, :, 1:].swapaxes(0, 1).swapaxes(1, 2)
223 hcdist_tIu = hcdist_utI[:, :, 1:].swapaxes(0, 1).swapaxes(1, 2)
224 # Verify
225 np.testing.assert_allclose(data_npz['hcdist_proj_tIo'], hcdist_tIo)
226 np.testing.assert_allclose(data_npz['hcdist_proj_tIu'], hcdist_tIu)
229@pytest.mark.parametrize('test_system', ['Na55'])
230def test_hctime_formats(tmp_path, mock_response, mock_voronoi):
231 time_t = np.linspace(0, 30, 12) # In fs
232 pulse = create_pulse(4.0)
234 # Set up mock objects
235 voronoi = mock_voronoi(atom_projections=[[0, 1, 2], [3]])
236 calc_kwargs = dict(response=mock_response(),
237 voronoi=None,
238 times=time_t * 1e3,
239 pulses=[pulse],
240 energies_occ=[],
241 energies_unocc=[],
242 sigma=None)
243 calc = HotCarriersCalculator(**calc_kwargs)
245 # Calculate with and without Voronoi
246 calc.calculate_totals_by_time_and_write(str(tmp_path / 'hctime.npz'))
247 calc.calculate_totals_by_time_and_write(str(tmp_path / 'hctime.dat'))
249 calc_kwargs['voronoi'] = voronoi
250 calc = HotCarriersCalculator(**calc_kwargs)
251 calc.calculate_totals_by_time_and_write(str(tmp_path / 'hctime_voronoi.npz'))
252 calc.calculate_totals_by_time_and_write(str(tmp_path / 'hctime_voronoi.dat'))
254 if world.rank != 0:
255 return
256 # Check header contents of text files
257 header = '\n'.join(read_header(tmp_path / 'hctime.dat'))
258 header = '\n'.join(read_header(tmp_path / 'hctime_voronoi.dat'))
259 np.testing.assert_equal(voronoi.atom_projections, parse_projections(header))
261 # Verify without Voronoi
262 data_dat = np.loadtxt(tmp_path / 'hctime.dat')
263 data_npz = np.load(tmp_path / 'hctime.npz')
265 assert data_dat.shape[1] == 3
266 np.testing.assert_allclose(data_npz['time_t'], time_t)
267 np.testing.assert_allclose(data_npz['time_t'], data_dat[:, 0], atol=1e-5)
268 compare_known(data_npz, data_dat.T, ['time_t'], [0], [time_t])
269 compare(data_npz, data_dat.T, ['sumocc_t', 'sumunocc_t'], [1, 2])
270 sumocc_t = data_npz['sumocc_t']
271 sumunocc_t = data_npz['sumunocc_t']
273 # Verify with Voronoi
274 data_dat = np.loadtxt(tmp_path / 'hctime_voronoi.dat')
275 data_npz = np.load(tmp_path / 'hctime_voronoi.npz')
277 assert data_dat.shape[1] == 3 + 2 * len(voronoi)
278 np.testing.assert_allclose(data_npz['time_t'], time_t)
279 compare_known(data_npz, data_dat.T,
280 ['time_t', 'sumocc_t', 'sumunocc_t'], [0, 1, 2],
281 [time_t, sumocc_t, sumunocc_t])
283 np.testing.assert_allclose(data_npz['sumocc_proj_tI'], data_dat[:, 3::2])
284 np.testing.assert_allclose(data_npz['sumunocc_proj_tI'], data_dat[:, 4::2])
287@pytest.mark.parametrize('test_system', ['Na55'])
288def test_hcpulse_formats(tmp_path, mock_response, mock_voronoi):
289 time_t = np.linspace(0, 30, 3) # In fs
290 pfs = [3.2, 3.3]
291 fwhms = [5.0, 6.0]
292 pulses = [create_pulse(pf, fwhm) for pf, fwhm in zip(pfs, fwhms)]
294 # Set up mock objects
295 voronoi = mock_voronoi(atom_projections=[[0, 1, 2], [3]])
296 calc_kwargs = dict(response=mock_response(),
297 voronoi=None,
298 times=time_t * 1e3,
299 pulses=pulses,
300 energies_occ=[],
301 energies_unocc=[],
302 sigma=None)
303 calc = HotCarriersCalculator(**calc_kwargs)
305 # Calculate with and without Voronoi
306 calc.calculate_totals_by_pulse_and_write(str(tmp_path / 'hcpulse.npz'))
307 calc.calculate_totals_by_pulse_and_write(str(tmp_path / 'hcpulse.dat'))
309 calc_kwargs['voronoi'] = voronoi
310 calc = HotCarriersCalculator(**calc_kwargs)
311 calc.calculate_totals_by_pulse_and_write(str(tmp_path / 'hcpulse_voronoi.npz'))
312 calc.calculate_totals_by_pulse_and_write(str(tmp_path / 'hcpulse_voronoi.dat'))
314 if world.rank != 0:
315 return
316 # Check header contents of text files
317 header = '\n'.join(read_header(tmp_path / 'hcpulse.dat'))
318 assert 'Averaged for' in header
319 header = '\n'.join(read_header(tmp_path / 'hcpulse_voronoi.dat'))
320 assert 'Averaged for' in header
321 np.testing.assert_equal(voronoi.atom_projections, parse_projections(header))
323 # Verify without Voronoi
324 data_dat = np.loadtxt(tmp_path / 'hcpulse.dat')
325 data_npz = np.load(tmp_path / 'hcpulse.npz')
327 assert data_dat.shape[1] == 4
328 np.testing.assert_allclose(data_npz['avgtime_t'], time_t)
329 compare_known(data_npz, data_dat.T, ['pulsefreq_p', 'pulsefwhm_p'], [0, 1], [pfs, fwhms])
330 compare(data_npz, data_dat.T, ['sumocc_p', 'sumunocc_p'], [2, 3])
331 sumocc_p = data_npz['sumocc_p']
332 sumunocc_p = data_npz['sumunocc_p']
334 # Verify with Voronoi
335 data_dat = np.loadtxt(tmp_path / 'hcpulse_voronoi.dat')
336 data_npz = np.load(tmp_path / 'hcpulse_voronoi.npz')
338 assert data_dat.shape[1] == 4 + 2 * len(voronoi)
339 np.testing.assert_allclose(data_npz['avgtime_t'], time_t)
340 compare_known(data_npz, data_dat.T,
341 ['pulsefreq_p', 'pulsefwhm_p', 'sumocc_p', 'sumunocc_p'],
342 [0, 1, 2, 3], [pfs, fwhms, sumocc_p, sumunocc_p])
344 np.testing.assert_allclose(data_npz['sumocc_proj_pI'], data_dat[:, 4::2])
345 np.testing.assert_allclose(data_npz['sumunocc_proj_pI'], data_dat[:, 5::2])