Coverage for tests/unittests/writers/test_density_writers.py: 81%
85 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3import pytest
4import numpy as np
5from ase.io import read
6from ase.io.ulm import Reader
8from gpaw.mpi import world, broadcast
9from rhodent.calculators.density import DensityCalculator
10from rhodent.utils import create_pulse
13def load_cube(cube_fmt, which_list, times=None, freqs=None):
14 cube_fmt = str(cube_fmt)
15 d = {}
16 assert (times is None) ^ (freqs is None)
17 collectlabel = 't' if freqs is None else 'w'
18 for which in which_list:
19 key = 'rho' if which == 'induced' else 'occ_rho' if which == 'holes' else 'unocc_rho'
20 if freqs is None:
21 filenames = [cube_fmt.format(which=which, time=time) for time in times]
22 else:
23 filenames = [cube_fmt.format(which=which, freq=freq) for freq in freqs]
24 data = [read(fname, format='cube', parallel=False, read_data=True, full_output=True)['data']
25 for fname in filenames]
26 d[f'{key}_{collectlabel}g'] = np.array(data)
28 return d
31def load_ulm(fname):
32 with Reader(fname) as reader:
33 return reader.asdict()
36def compare(leftdata, rightdata, keys, **kw):
37 for key in keys:
38 np.testing.assert_allclose(leftdata[key], rightdata[key], err_msg=key, **kw)
41@pytest.mark.bigdata
42@pytest.mark.parametrize('test_system', ['2H2'])
43def test_density_time(tmp_path, gpw_fname, mock_response):
44 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same
46 time_t = np.linspace(0, 30, 7) # In fs
48 # Set up mock objects
49 pulse = create_pulse(4.0)
50 calc = DensityCalculator(response=mock_response(),
51 times=time_t * 1e3,
52 pulses=[pulse],
53 gpw_file=gpw_fname)
55 which = ['induced', 'electrons', 'holes']
56 calc.calculate_and_write(str(tmp_path / 'density.npz'), which=which)
57 calc.calculate_and_write(str(tmp_path / 'density.ulm'), which=which)
58 calc.calculate_and_write(str(tmp_path / '{which}_density_t{time:09.1f}.cube'), which=which)
59 times = calc.times
61 if world.rank != 0:
62 return
64 data_npz = np.load(tmp_path / 'density.npz')
65 data_ulm = load_ulm(tmp_path / 'density.ulm')
66 data_cube = load_cube(tmp_path / '{which}_density_t{time:09.1f}.cube', which, times=times)
68 compare(data_npz, data_ulm, ['rho_tg', 'occ_rho_tg', 'unocc_rho_tg'])
69 compare(data_npz, data_cube, ['rho_tg', 'occ_rho_tg', 'unocc_rho_tg'], atol=1e-10)
72@pytest.mark.bigdata
73@pytest.mark.parametrize('test_system', ['2H2'])
74def test_density_frequency(tmp_path, gpw_fname, mock_response):
75 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same
77 freq_w = np.linspace(2, 6, 7) # In units of eV
79 # Set up mock objects
80 calc = DensityCalculator(response=mock_response(perturbation={'name': 'deltakick', 'strength': 1e-5}),
81 frequencies=freq_w,
82 gpw_file=gpw_fname)
84 which = 'induced'
85 calc.calculate_and_write(str(tmp_path / 'density.npz'), which=which)
86 calc.calculate_and_write(str(tmp_path / 'density.ulm'), which=which)
87 calc.calculate_and_write(str(tmp_path / '{which}_density_w{freq:05.2f}.cube'), which=which)
88 freqs = calc.frequencies
90 if world.rank != 0:
91 return
93 data_npz = np.load(tmp_path / 'density.npz')
94 data_ulm = load_ulm(tmp_path / 'density.ulm')
95 data_cube = load_cube(tmp_path / '{which}_density_w{freq:05.2f}.cube', ['induced'], freqs=freqs)
97 compare(data_npz, data_ulm, ['rho_wg'])
98 compare(data_npz, data_cube, ['rho_wg'], atol=1e-10)
101@pytest.mark.bigdata
102@pytest.mark.parametrize('test_system', ['2H2'])
103def test_density_calc_size(tmp_path, gpw_fname, mock_response):
104 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same
106 time_t = np.linspace(0, 30, 7) # In fs
108 if world.size == 1:
109 pytest.skip('Parallel only test')
111 pulse = create_pulse(4.0)
112 which = ['induced', 'electrons', 'holes']
114 # Calculate on calc size 1
115 calc = DensityCalculator(response=mock_response(),
116 times=time_t * 1e3,
117 pulses=[pulse],
118 gpw_file=gpw_fname)
120 calc.calculate_and_write(str(tmp_path / 'ref_density.npz'), which=which)
122 if world.rank == 0:
123 data_ref_npz = np.load(tmp_path / 'ref_density.npz')
125 # Calculate on other calc size
126 for calc_size in range(2, world.size + 1):
127 if world.size % calc_size != 0:
128 continue
130 calc = DensityCalculator(response=mock_response(calc_size=calc_size),
131 times=time_t * 1e3,
132 pulses=[pulse],
133 gpw_file=gpw_fname)
135 calc.calculate_and_write(str(tmp_path / 'density.npz'), which=which)
137 if world.rank == 0:
139 data_npz = np.load(tmp_path / 'density.npz')
140 compare(data_ref_npz, data_npz, ['rho_tg', 'occ_rho_tg', 'unocc_rho_tg'])