Coverage for tests/unittests/density_matrices/distributed/test_distr_pulse.py: 64%
66 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3import os
4import pytest
5import numpy as np
8from gpaw.tddft.units import fs_to_au
9from gpaw.mpi import world
10from rhodent.density_matrices.buffer import DensityMatrixBuffer
11from rhodent.density_matrices.distributed.time import AlltoallvTimeDistributor, RhoParameters
12from rhodent.density_matrices.distributed.pulse import PulseConvolver
13from rhodent.utils import create_pulse
16@pytest.mark.parametrize('maxsize', [5000, None])
17@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
18def test_pulse_convolver_redistribute_max_sizes(mock_ks_rho_reader, ksd_fname, maxsize):
19 if maxsize is None:
20 os.environ.pop('RHODENT_REDISTRIBUTE_MAXSIZE', None)
21 else:
22 os.environ['RHODENT_REDISTRIBUTE_MAXSIZE'] = str(maxsize)
24 pulses = [create_pulse(pf, 5.0, 10.0) for pf in [3.0, 4.0]]
25 time_t = np.array([20, 21, 22])
27 # Set up mock KS density matrices reader
28 rho_chunk_reader = mock_ks_rho_reader(filter_times=np.linspace(0, 30, 300) * fs_to_au)
29 # Set up the pulse convolver
30 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=30, striden2=30)
31 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters)
32 pulse_convolver = PulseConvolver(time_distributor,
33 pulses=pulses,
34 perturbation={'name': 'deltakick', 'strength': 1e-5},
35 filter_times=time_t * fs_to_au,
36 derivative_order_s=[0, 1, 2])
38 # Collect on root. This calculates different chunks on different ranks
39 # and gathers and groups the results on the root rank
40 ref_full_dm = pulse_convolver.collect_on_root()
42 time_t = pulse_convolver.time_t
43 parameters = pulse_convolver.rho_nn_reader._parameters
44 myt = pulse_convolver.my_work()
46 # Redistribute the data so that different ranks now have different times
47 # This is usually done by the ConvolutionDensityMatrices
48 full_dm = pulse_convolver.redistribute()
50 # Loop over the results and sum them to the root rank
51 nnshape = parameters.full_nnshape
52 test_dm = DensityMatrixBuffer(nnshape=nnshape, xshape=(len(pulses), len(time_t)), dtype=float)
53 test_dm.zero_buffers(real=pulse_convolver.yield_re,
54 imag=pulse_convolver.yield_im,
55 derivative_order_s=pulse_convolver.derivative_order_s)
57 for array_iapt, part_array_iapt in zip(test_dm._iter_buffers(),
58 full_dm._iter_buffers()):
59 array_iapt[..., myt] = part_array_iapt[..., :len(myt)]
60 world.sum(array_iapt, 0)
62 if world.rank != 0:
63 return
65 # Check that data is the same
66 for test_array_iapt, ref_array_iapt in zip(test_dm._iter_buffers(),
67 ref_full_dm._iter_buffers()):
68 np.testing.assert_allclose(test_array_iapt, ref_array_iapt)
71@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
72@pytest.mark.parametrize('striden1', [5, 30, 100])
73@pytest.mark.parametrize('result_on_ranks', [
74 [0, 1], [0, 2], [0, 1, 3], [1, 2], [3, 2], [3, 1]
75 ])
76def test_pulse_convolver_redistribute_result_on_ranks(mock_ks_rho_reader, ksd_fname, result_on_ranks, striden1):
77 if max(result_on_ranks) >= world.size:
78 pytest.skip('World size not compatible with suggested ranks for distribution')
80 pulses = [create_pulse(pf, 5.0, 10.0) for pf in [3.0, 4.0]]
81 time_t = np.linspace(0, 30, 20)
83 # Set up mock KS density matrices reader
84 rho_chunk_reader = mock_ks_rho_reader(filter_times=time_t * fs_to_au)
85 # Set up the pulse convolver
86 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=striden1, striden2=striden1)
87 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters)
88 pulse_kwargs = dict(rho_nn_reader=time_distributor,
89 pulses=pulses,
90 perturbation={'name': 'deltakick', 'strength': 1e-5},
91 filter_times=time_t * fs_to_au,
92 derivative_order_s=[0, 1, 2])
93 pulse_convolver = PulseConvolver(**pulse_kwargs)
95 # Collect on root. This calculates different chunks on different ranks
96 # and gathers and groups the results on the root rank
97 ref_full_dm = pulse_convolver.collect_on_root()
99 # Set up with different ranks collecting the data
100 pulse_convolver = PulseConvolver(result_on_ranks=result_on_ranks, **pulse_kwargs)
101 time_t = pulse_convolver.time_t
102 parameters = pulse_convolver.rho_nn_reader._parameters
103 myt = pulse_convolver.my_work()
105 # Redistribute the data so that different ranks now have different times
106 # This is usually done by the ConvolutionDensityMatrices
107 full_dm = pulse_convolver.redistribute()
109 # Loop over the results and sum them to the root rank
110 nnshape = parameters.full_nnshape
111 test_dm = DensityMatrixBuffer(nnshape=nnshape, xshape=(len(pulses), len(time_t)), dtype=float)
112 test_dm.zero_buffers(real=pulse_convolver.yield_re,
113 imag=pulse_convolver.yield_im,
114 derivative_order_s=pulse_convolver.derivative_order_s)
116 for array_iapt, part_array_iapt in zip(test_dm._iter_buffers(),
117 full_dm._iter_buffers()):
118 array_iapt[..., myt] = part_array_iapt[..., :len(myt)]
119 world.sum(array_iapt, 0)
121 if world.rank != 0:
122 return
124 # Check that data is the same
125 for test_array_iapt, ref_array_iapt in zip(test_dm._iter_buffers(),
126 ref_full_dm._iter_buffers()):
127 np.testing.assert_allclose(test_array_iapt, ref_array_iapt)