Coverage for tests/unittests/density_matrices/distributed/test_distr_frequency.py: 74%
84 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3import os
4import pytest
5import numpy as np
7from gpaw.tddft.units import fs_to_au, eV_to_au
8from gpaw.mpi import world
9from rhodent.density_matrices.buffer import DensityMatrixBuffer
10from rhodent.density_matrices.distributed.time import AlltoallvTimeDistributor, RhoParameters
11from rhodent.density_matrices.distributed.frequency import FourierTransformer
14@pytest.mark.parametrize('test_system', ['Na8'])
15def test_fourier_frequency_broadening(mock_ks_rho_reader, ksd_fname):
16 """ Test that 0 and almost 0 frequency broadening give the same results
18 This is necessary because an additional ifft->fft is done in the latter case.
19 Testing with a small system is enough."""
20 os.environ.pop('RHODENT_REDISTRIBUTE_MAXSIZE', None)
22 freq_w = np.array([20, 21, 22])
24 # Set up mock KS density matrices reader
25 rho_chunk_reader = mock_ks_rho_reader(filter_times=np.linspace(0, 30, 300) * fs_to_au)
26 # Set up the pulse convolver
27 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=30, striden2=30)
28 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters)
30 def calculate(frequency_broadening: float = 0):
31 fourier_transformer = FourierTransformer(time_distributor,
32 perturbation={'name': 'deltakick', 'strength': 1e-5},
33 frequency_broadening=frequency_broadening,
34 filter_frequencies=freq_w * eV_to_au)
36 # Collect on root. This calculates different chunks on different ranks
37 # and gathers and groups the results on the root rank
38 buffer = fourier_transformer.collect_on_root()
39 return buffer
41 # Compare no broadening against very little broadening
42 ref_buffer = calculate()
43 test_buffer = calculate(1e-10)
45 if world.rank == 0:
46 for test_array_iaw, ref_array_iaw in zip(test_buffer._iter_buffers(),
47 ref_buffer._iter_buffers()):
48 np.testing.assert_allclose(test_array_iaw, ref_array_iaw)
50 # Slightly more broadening
51 test_buffer = calculate(1e-5)
53 if world.rank == 0:
54 for test_array_iaw, ref_array_iaw in zip(test_buffer._iter_buffers(),
55 ref_buffer._iter_buffers()):
56 with pytest.raises(AssertionError):
57 np.testing.assert_allclose(test_array_iaw, ref_array_iaw)
59 np.testing.assert_allclose(test_array_iaw, ref_array_iaw, atol=1e-4, rtol=0.1)
62@pytest.mark.parametrize('maxsize', [5000, None])
63@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
64def test_fourier_transformer_redistribute_max_sizes(mock_ks_rho_reader, ksd_fname, maxsize):
65 if maxsize is None:
66 os.environ.pop('RHODENT_REDISTRIBUTE_MAXSIZE', None)
67 else:
68 os.environ['RHODENT_REDISTRIBUTE_MAXSIZE'] = str(maxsize)
70 freq_w = np.array([20, 21, 22])
72 # Set up mock KS density matrices reader
73 rho_chunk_reader = mock_ks_rho_reader(filter_times=np.linspace(0, 30, 300) * fs_to_au)
74 # Set up the pulse convolver
75 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=30, striden2=30)
76 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters)
77 fourier_transformer = FourierTransformer(time_distributor,
78 perturbation={'name': 'deltakick', 'strength': 1e-5},
79 filter_frequencies=freq_w * eV_to_au)
81 # Collect on root. This calculates different chunks on different ranks
82 # and gathers and groups the results on the root rank
83 ref_full_dm = fourier_transformer.collect_on_root()
85 freq_w = fourier_transformer.freq_w
86 parameters = fourier_transformer.rho_nn_reader._parameters
87 myw = fourier_transformer.my_work()
89 # Redistribute the data so that different ranks now have different times
90 # This is usually done by the FrequencyDensityMatrices
91 full_dm = fourier_transformer.redistribute()
93 # Loop over the results and sum them to the root rank
94 nnshape = parameters.full_nnshape
95 test_dm = DensityMatrixBuffer(nnshape=nnshape, xshape=(len(freq_w), ), dtype=complex)
96 test_dm.zero_buffers(real=fourier_transformer.yield_re,
97 imag=fourier_transformer.yield_im,
98 derivative_order_s=[0])
100 for array_iaw, part_array_iaw in zip(test_dm._iter_buffers(),
101 full_dm._iter_buffers()):
102 array_iaw[..., myw] = part_array_iaw[..., :len(myw)]
103 world.sum(array_iaw, 0)
105 if world.rank != 0:
106 return
108 # Check that data is the same
109 for test_array_iaw, ref_array_iaw in zip(test_dm._iter_buffers(),
110 ref_full_dm._iter_buffers()):
111 np.testing.assert_allclose(test_array_iaw, ref_array_iaw)
114@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
115@pytest.mark.parametrize('striden1', [5, 30, 100])
116@pytest.mark.parametrize('result_on_ranks', [
117 [0, 1], [0, 2], [0, 1, 3], [1, 2], [3, 2], [3, 1]
118])
119def test_fourier_transformer_redistribute_result_on_ranks(mock_ks_rho_reader, ksd_fname, result_on_ranks, striden1):
120 if max(result_on_ranks) >= world.size:
121 pytest.skip('World size not compatible with suggested ranks for distribution')
123 freq_w = np.linspace(0, 30, 20)
125 # Set up mock KS density matrices reader
126 rho_chunk_reader = mock_ks_rho_reader(filter_times=freq_w * fs_to_au)
127 # Set up the pulse convolver
128 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=striden1, striden2=striden1)
129 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters)
130 fourier_transformer = FourierTransformer(time_distributor,
131 perturbation={'name': 'deltakick', 'strength': 1e-5},
132 filter_frequencies=freq_w * eV_to_au)
134 # Collect on root. This calculates different chunks on different ranks
135 # and gathers and groups the results on the root rank
136 ref_full_dm = fourier_transformer.collect_on_root()
138 # Set up with different ranks collecting the data
139 fourier_transformer = FourierTransformer(time_distributor,
140 perturbation={'name': 'deltakick', 'strength': 1e-5},
141 filter_frequencies=freq_w * eV_to_au,
142 result_on_ranks=result_on_ranks)
143 freq_w = fourier_transformer.freq_w
144 parameters = fourier_transformer.rho_nn_reader._parameters
145 myw = fourier_transformer.my_work()
147 # Redistribute the data so that different ranks now have different times
148 # This is usually done by the FrequencyDensityMatrices
149 full_dm = fourier_transformer.redistribute()
151 # Loop over the results and sum them to the root rank
152 nnshape = parameters.full_nnshape
153 test_dm = DensityMatrixBuffer(nnshape=nnshape, xshape=(len(freq_w), ), dtype=complex)
154 test_dm.zero_buffers(real=fourier_transformer.yield_re,
155 imag=fourier_transformer.yield_im,
156 derivative_order_s=[0])
158 for array_iaw, part_array_iaw in zip(test_dm._iter_buffers(),
159 full_dm._iter_buffers()):
160 array_iaw[..., myw] = part_array_iaw[..., :len(myw)]
161 world.sum(array_iaw, 0)
163 if world.rank != 0:
164 return
166 # Check that data is the same
167 for test_array_iaw, ref_array_iaw in zip(test_dm._iter_buffers(),
168 ref_full_dm._iter_buffers()):
169 np.testing.assert_allclose(test_array_iaw, ref_array_iaw)