Coverage for tests/unittests/density_matrices/distributed/test_distr_frequency.py: 74%

84 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3import os 

4import pytest 

5import numpy as np 

6 

7from gpaw.tddft.units import fs_to_au, eV_to_au 

8from gpaw.mpi import world 

9from rhodent.density_matrices.buffer import DensityMatrixBuffer 

10from rhodent.density_matrices.distributed.time import AlltoallvTimeDistributor, RhoParameters 

11from rhodent.density_matrices.distributed.frequency import FourierTransformer 

12 

13 

14@pytest.mark.parametrize('test_system', ['Na8']) 

15def test_fourier_frequency_broadening(mock_ks_rho_reader, ksd_fname): 

16 """ Test that 0 and almost 0 frequency broadening give the same results 

17 

18 This is necessary because an additional ifft->fft is done in the latter case. 

19 Testing with a small system is enough.""" 

20 os.environ.pop('RHODENT_REDISTRIBUTE_MAXSIZE', None) 

21 

22 freq_w = np.array([20, 21, 22]) 

23 

24 # Set up mock KS density matrices reader 

25 rho_chunk_reader = mock_ks_rho_reader(filter_times=np.linspace(0, 30, 300) * fs_to_au) 

26 # Set up the pulse convolver 

27 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=30, striden2=30) 

28 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters) 

29 

30 def calculate(frequency_broadening: float = 0): 

31 fourier_transformer = FourierTransformer(time_distributor, 

32 perturbation={'name': 'deltakick', 'strength': 1e-5}, 

33 frequency_broadening=frequency_broadening, 

34 filter_frequencies=freq_w * eV_to_au) 

35 

36 # Collect on root. This calculates different chunks on different ranks 

37 # and gathers and groups the results on the root rank 

38 buffer = fourier_transformer.collect_on_root() 

39 return buffer 

40 

41 # Compare no broadening against very little broadening 

42 ref_buffer = calculate() 

43 test_buffer = calculate(1e-10) 

44 

45 if world.rank == 0: 

46 for test_array_iaw, ref_array_iaw in zip(test_buffer._iter_buffers(), 

47 ref_buffer._iter_buffers()): 

48 np.testing.assert_allclose(test_array_iaw, ref_array_iaw) 

49 

50 # Slightly more broadening 

51 test_buffer = calculate(1e-5) 

52 

53 if world.rank == 0: 

54 for test_array_iaw, ref_array_iaw in zip(test_buffer._iter_buffers(), 

55 ref_buffer._iter_buffers()): 

56 with pytest.raises(AssertionError): 

57 np.testing.assert_allclose(test_array_iaw, ref_array_iaw) 

58 

59 np.testing.assert_allclose(test_array_iaw, ref_array_iaw, atol=1e-4, rtol=0.1) 

60 

61 

62@pytest.mark.parametrize('maxsize', [5000, None]) 

63@pytest.mark.parametrize('test_system', ['Na8', 'Ag8']) 

64def test_fourier_transformer_redistribute_max_sizes(mock_ks_rho_reader, ksd_fname, maxsize): 

65 if maxsize is None: 

66 os.environ.pop('RHODENT_REDISTRIBUTE_MAXSIZE', None) 

67 else: 

68 os.environ['RHODENT_REDISTRIBUTE_MAXSIZE'] = str(maxsize) 

69 

70 freq_w = np.array([20, 21, 22]) 

71 

72 # Set up mock KS density matrices reader 

73 rho_chunk_reader = mock_ks_rho_reader(filter_times=np.linspace(0, 30, 300) * fs_to_au) 

74 # Set up the pulse convolver 

75 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=30, striden2=30) 

76 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters) 

77 fourier_transformer = FourierTransformer(time_distributor, 

78 perturbation={'name': 'deltakick', 'strength': 1e-5}, 

79 filter_frequencies=freq_w * eV_to_au) 

80 

81 # Collect on root. This calculates different chunks on different ranks 

82 # and gathers and groups the results on the root rank 

83 ref_full_dm = fourier_transformer.collect_on_root() 

84 

85 freq_w = fourier_transformer.freq_w 

86 parameters = fourier_transformer.rho_nn_reader._parameters 

87 myw = fourier_transformer.my_work() 

88 

89 # Redistribute the data so that different ranks now have different times 

90 # This is usually done by the FrequencyDensityMatrices 

91 full_dm = fourier_transformer.redistribute() 

92 

93 # Loop over the results and sum them to the root rank 

94 nnshape = parameters.full_nnshape 

95 test_dm = DensityMatrixBuffer(nnshape=nnshape, xshape=(len(freq_w), ), dtype=complex) 

96 test_dm.zero_buffers(real=fourier_transformer.yield_re, 

97 imag=fourier_transformer.yield_im, 

98 derivative_order_s=[0]) 

99 

100 for array_iaw, part_array_iaw in zip(test_dm._iter_buffers(), 

101 full_dm._iter_buffers()): 

102 array_iaw[..., myw] = part_array_iaw[..., :len(myw)] 

103 world.sum(array_iaw, 0) 

104 

105 if world.rank != 0: 

106 return 

107 

108 # Check that data is the same 

109 for test_array_iaw, ref_array_iaw in zip(test_dm._iter_buffers(), 

110 ref_full_dm._iter_buffers()): 

111 np.testing.assert_allclose(test_array_iaw, ref_array_iaw) 

112 

113 

114@pytest.mark.parametrize('test_system', ['Na8', 'Ag8']) 

115@pytest.mark.parametrize('striden1', [5, 30, 100]) 

116@pytest.mark.parametrize('result_on_ranks', [ 

117 [0, 1], [0, 2], [0, 1, 3], [1, 2], [3, 2], [3, 1] 

118]) 

119def test_fourier_transformer_redistribute_result_on_ranks(mock_ks_rho_reader, ksd_fname, result_on_ranks, striden1): 

120 if max(result_on_ranks) >= world.size: 

121 pytest.skip('World size not compatible with suggested ranks for distribution') 

122 

123 freq_w = np.linspace(0, 30, 20) 

124 

125 # Set up mock KS density matrices reader 

126 rho_chunk_reader = mock_ks_rho_reader(filter_times=freq_w * fs_to_au) 

127 # Set up the pulse convolver 

128 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=striden1, striden2=striden1) 

129 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters) 

130 fourier_transformer = FourierTransformer(time_distributor, 

131 perturbation={'name': 'deltakick', 'strength': 1e-5}, 

132 filter_frequencies=freq_w * eV_to_au) 

133 

134 # Collect on root. This calculates different chunks on different ranks 

135 # and gathers and groups the results on the root rank 

136 ref_full_dm = fourier_transformer.collect_on_root() 

137 

138 # Set up with different ranks collecting the data 

139 fourier_transformer = FourierTransformer(time_distributor, 

140 perturbation={'name': 'deltakick', 'strength': 1e-5}, 

141 filter_frequencies=freq_w * eV_to_au, 

142 result_on_ranks=result_on_ranks) 

143 freq_w = fourier_transformer.freq_w 

144 parameters = fourier_transformer.rho_nn_reader._parameters 

145 myw = fourier_transformer.my_work() 

146 

147 # Redistribute the data so that different ranks now have different times 

148 # This is usually done by the FrequencyDensityMatrices 

149 full_dm = fourier_transformer.redistribute() 

150 

151 # Loop over the results and sum them to the root rank 

152 nnshape = parameters.full_nnshape 

153 test_dm = DensityMatrixBuffer(nnshape=nnshape, xshape=(len(freq_w), ), dtype=complex) 

154 test_dm.zero_buffers(real=fourier_transformer.yield_re, 

155 imag=fourier_transformer.yield_im, 

156 derivative_order_s=[0]) 

157 

158 for array_iaw, part_array_iaw in zip(test_dm._iter_buffers(), 

159 full_dm._iter_buffers()): 

160 array_iaw[..., myw] = part_array_iaw[..., :len(myw)] 

161 world.sum(array_iaw, 0) 

162 

163 if world.rank != 0: 

164 return 

165 

166 # Check that data is the same 

167 for test_array_iaw, ref_array_iaw in zip(test_dm._iter_buffers(), 

168 ref_full_dm._iter_buffers()): 

169 np.testing.assert_allclose(test_array_iaw, ref_array_iaw)