Coverage for tests/unittests/density_matrices/readers/test_numpy.py: 100%
126 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3from pathlib import Path
4import numpy as np
5import pytest
7from gpaw.mpi import world, broadcast
8from rhodent.utils import create_pulse
9from rhodent.density_matrices.readers.numpy import FrequencyDensityMatrixReader, TimeDensityMatrixReader
10from rhodent.utils import get_gaussian_pulse_values
13def rho_p_from_rho_ia(ksd, rho_ia):
14 """ Transform from non-ravelled form to ravelled form.
16 """
17 ia_p = ksd.ia_p
18 imin, amin = ia_p.min(axis=0)
19 i_p, a_p = ia_p.T
20 rho_p = rho_ia[i_p - imin, a_p - amin]
21 return rho_p
24def legacy_write_to_disk_time(density_matrices,
25 pulserho_fmt: str):
26 """ Legacy implementation of writing to disk,
27 where density matrices are stored in ravelled form (single index for
28 electron-hole pairs).
30 parameters
31 ----------
32 pulserho_fmt
33 formatting string for the density matrices saved to disk.
35 example:
37 * pulserho_fmt = `pulserho/t{time:09.1f}{tag}.npy`.
39 accepts variables
41 * `{time}` - time in units of as.
42 * `{tag}` - derivative tag, `''`, `'-Iomega'`, or `'-omega2'`.
43 * `{pulsefreq}` - pulse frequency in units of ev.
44 * `{pulsefwhm}` - pulse fwhm in units of fs.
45 """
46 nlocaltot = len(density_matrices.local_work_plan)
48 tags_keys = [(tag, key) for s, (tag, key) in enumerate(
49 [('', 'rho_ia'), ('-Iomega', 'drho_ia'), ('-omega2', 'ddrho_ia')]) if s in density_matrices.derivative_order_s]
51 # iterate over density matrices on all ranks
52 for ndone, (work, dm) in enumerate(density_matrices, 1):
53 avg = density_matrices.log.elapsed('read')/ndone
54 estrem = avg * (nlocaltot - ndone)
55 density_matrices.log(
56 f'obtained density matrix {ndone:4.0f}/{nlocaltot:4.0f} on this rank. '
57 f'avg: {avg:10.3f}s, estimated remaining: {estrem:10.3f}s', who='writer', flush=True)
58 for tag, key in tags_keys:
59 fname_kw = dict(time=work.time, tag=tag, **get_gaussian_pulse_values(work.pulse))
60 fpath = Path(pulserho_fmt.format(**fname_kw))
61 rho_p = rho_p_from_rho_ia(density_matrices.ksd, getattr(dm, key))
62 if density_matrices.calc_comm.rank == 0:
63 assert isinstance(rho_p, np.ndarray)
64 fpath.parent.mkdir(parents=True, exist_ok=True)
65 if fpath.suffix == '.npz':
66 # write numpy archive
67 np.savez(str(fpath), rho_p=rho_p)
68 else:
69 # save numpy binary file
70 np.save(str(fpath), rho_p)
71 density_matrices.log(f'written {fpath}', who='writer', flush=True)
72 world.barrier()
75def legacy_write_to_disk_frequencies(density_matrices,
76 frho_fmt: str):
77 """ Legacy implementation of writing to disk,
78 where density matrices are stored in ravelled form (single index for
79 electron-hole pairs).
81 Parameters
82 ----------
83 frho_fmt
84 Formatting string for the density matrices saved to disk.
86 Example:
88 * frho_fmt = ``frho/w{freq:05.2f}-{reim}.npy``.
90 Accepts variables:
92 * ``{freq}`` - Frequency in units of eV.
93 * ``{reim}`` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary
94 part of density matrix.
95 """
96 nlocaltot = len(density_matrices.local_work_plan)
98 # Iterate over density matrices on all ranks
99 for ndone, (work, dm) in enumerate(density_matrices, 1):
100 avg = density_matrices.log.elapsed('read')/ndone
101 estrem = avg * (nlocaltot - ndone)
102 density_matrices.log(
103 f'Obtained density matrix {ndone:4.0f}/{nlocaltot:4.0f} on this rank. '
104 f'Avg: {avg:10.3f}s, estimated remaining: {estrem:10.3f}s', who='Writer', flush=True)
105 fname_kw = dict(freq=work.freq, reim=work.reim)
106 fpath = Path(frho_fmt.format(**fname_kw))
107 rho_p = rho_p_from_rho_ia(density_matrices.ksd, dm.rho_ia)
108 if density_matrices.ksd.only_ia:
109 # Twice the rho is saved by the KohnShamDecomposition transform
110 rho_p *= 2
111 if density_matrices.calc_comm.rank == 0:
112 assert isinstance(rho_p, np.ndarray)
113 fpath.parent.mkdir(parents=True, exist_ok=True)
114 if fpath.suffix == '.npz':
115 # Write numpy archive
116 np.savez(str(fpath), rho_p=rho_p)
117 else:
118 # Save numpy binary file
119 np.save(str(fpath), rho_p)
120 density_matrices.log(f'Written {fpath}', who='Writer', flush=True)
121 world.barrier()
124@pytest.mark.parametrize('fmt', [
125 'pulserho_pf{pulsefreq:.2f}/t{time:09.1f}{tag}.npy',
126 'pulserho_pf{pulsefreq:.2f}/t{time:09.1f}{tag}.npz',
127 ])
128@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
129def test_read_write_time(tmp_path, mock_response, ksd_fname, fmt):
130 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same
132 times = np.arange(10e3, 30e3, 3e3)
133 pulses = [create_pulse(pf) for pf in [3.3, 4.0]]
134 kwargs = dict(times=times, pulses=pulses, derivative_order_s=[0, 1, 2])
136 response = mock_response()
138 # Write
139 pulserho_fmt = str(tmp_path / fmt)
140 response.write_in_time(pulserho_fmt, **kwargs)
142 # Set up reader, with slightly different files. The correct files should
143 # still be found on disk as they are the closest.
144 reader = TimeDensityMatrixReader(pulserho_fmt,
145 ksd=ksd_fname,
146 filter_times=times + 0.04e3,
147 pulses=pulses,
148 derivative_order_s=[0, 1, 2])
149 np.testing.assert_almost_equal(times, reader.times)
151 # Compare against density matrices reference
152 for work, ref_dm in response.iterate_density_matrices_in_time(**kwargs):
153 for derivative, attr in zip([0, 1, 2], ['rho_ia', 'drho_ia', 'ddrho_ia']):
154 test_rho = reader.read(work.time, work.pulse, derivative)
155 ref_rho = getattr(ref_dm, attr)
156 # Mask wrong side of diagonal in mock response
157 np.testing.assert_allclose(test_rho, ref_rho, err_msg=f'{work} {derivative}')
160@pytest.mark.parametrize('fmt', [
161 'pulserho_pf{pulsefreq:.2f}/t{time:09.1f}{tag}.npy',
162 'pulserho_pf{pulsefreq:.2f}/t{time:09.1f}{tag}.npz',
163 ])
164@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
165def test_read_legacy_write_time(tmp_path, mock_response, ksd_fname, fmt):
166 """ Make sure that the density matrices written the legacy way can still be read. """
167 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same
169 times = [12e3, 14e3]
170 pulses = [create_pulse(pf) for pf in [3.3, 4.0]]
171 kwargs = dict(times=times, pulses=pulses, derivative_order_s=[0, 1, 2])
173 response = mock_response()
175 # Write
176 pulserho_fmt = str(tmp_path / fmt)
177 legacy_write_to_disk_time(response._get_time_density_matrices(**kwargs), pulserho_fmt)
179 # Set up reader, with slightly different files. The correct files should
180 # still be found on disk as they are the closest.
181 reader = TimeDensityMatrixReader(pulserho_fmt,
182 ksd=ksd_fname,
183 filter_times=times,
184 pulses=pulses,
185 derivative_order_s=[0, 1, 2])
186 np.testing.assert_almost_equal(times, reader.times)
188 # Calculate occupation number difference
189 f_n = reader.ksd.occ_un[0]
190 imin, imax, amin, amax = reader.ksd.ialims()
191 f_ia = f_n[imin:imax+1, None] - f_n[None, amin:amax+1]
193 # Calculate mask
194 min_occdiff = min(reader.ksd.f_p)
195 mask_ia = f_ia >= min_occdiff
197 # Compare against density matrices reference
198 for work, ref_dm in response.iterate_density_matrices_in_time(**kwargs):
199 for derivative, attr in zip([0, 1, 2], ['rho_ia', 'drho_ia', 'ddrho_ia']):
200 test_rho = reader.read(work.time, work.pulse, derivative)
201 ref_rho = getattr(ref_dm, attr) * mask_ia
202 # Mask wrong side of diagonal in mock response
203 np.testing.assert_allclose(test_rho, ref_rho, err_msg=f'{work} {derivative}')
206@pytest.mark.parametrize('fmt', [
207 'frho/w{freq:05.2f}-{reim}.npy',
208 'frho/w{freq:05.2f}-{reim}.npz'
209 ])
210@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
211def test_read_write_freq(tmp_path, mock_response, ksd_fname, fmt):
212 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same
214 frequencies = np.arange(1, 5, 0.3)
215 kwargs = dict(frequencies=frequencies)
217 response = mock_response()
219 # Write
220 frho_fmt = str(tmp_path / fmt)
221 response.write_in_frequency(frho_fmt, **kwargs)
223 # Set up reader
224 reader = FrequencyDensityMatrixReader(
225 frho_fmt,
226 ksd=ksd_fname,
227 filter_frequencies=frequencies + 0.1)
228 np.testing.assert_almost_equal(frequencies, reader.frequencies)
230 # Compare against density matrices reference
231 for work, ref_dm in response.iterate_density_matrices_in_frequency(**kwargs):
232 test_rho = reader.read(work.freq, work.real)
233 ref_rho = ref_dm.rho_ia
234 # Mask wrong side of diagonal in mock response
235 np.testing.assert_allclose(test_rho, ref_rho, err_msg=f'{work}')
238@pytest.mark.parametrize('fmt', [
239 'frho/w{freq:05.2f}-{reim}.npy',
240 'frho/w{freq:05.2f}-{reim}.npz'
241 ])
242@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
243def test_read_legacy_write_freq(tmp_path, mock_response, ksd_fname, fmt):
244 """ Make sure that the density matrices written the legacy way can still be read. """
245 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same
247 frequencies = [1.1, 1.3]
248 kwargs = dict(frequencies=frequencies)
250 response = mock_response()
252 # Write
253 frho_fmt = str(tmp_path / fmt)
254 legacy_write_to_disk_frequencies(response._get_frequency_density_matrices(**kwargs), frho_fmt)
256 # Set up reader
257 reader = FrequencyDensityMatrixReader(
258 frho_fmt,
259 ksd=ksd_fname,
260 filter_frequencies=frequencies)
261 np.testing.assert_almost_equal(frequencies, reader.frequencies)
263 # Calculate occupation number difference
264 f_n = reader.ksd.occ_un[0]
265 imin, imax, amin, amax = reader.ksd.ialims()
266 f_ia = f_n[imin:imax+1, None] - f_n[None, amin:amax+1]
268 # Calculate mask
269 min_occdiff = min(reader.ksd.f_p)
270 mask_ia = f_ia >= min_occdiff
272 # Compare against density matrices reference
273 for work, ref_dm in response.iterate_density_matrices_in_frequency(**kwargs):
274 test_rho = reader.read(work.freq, work.real)
275 ref_rho = ref_dm.rho_ia * mask_ia
276 # Mask wrong side of diagonal in mock response
277 np.testing.assert_allclose(test_rho, ref_rho, err_msg=f'{work}')