Coverage for tests/unittests/density_matrices/distributed/test_distr_pulse.py: 64%

66 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3import os 

4import pytest 

5import numpy as np 

6 

7 

8from gpaw.tddft.units import fs_to_au 

9from gpaw.mpi import world 

10from rhodent.density_matrices.buffer import DensityMatrixBuffer 

11from rhodent.density_matrices.distributed.time import AlltoallvTimeDistributor, RhoParameters 

12from rhodent.density_matrices.distributed.pulse import PulseConvolver 

13from rhodent.utils import create_pulse 

14 

15 

16@pytest.mark.parametrize('maxsize', [5000, None]) 

17@pytest.mark.parametrize('test_system', ['Na8', 'Ag8']) 

18def test_pulse_convolver_redistribute_max_sizes(mock_ks_rho_reader, ksd_fname, maxsize): 

19 if maxsize is None: 

20 os.environ.pop('RHODENT_REDISTRIBUTE_MAXSIZE', None) 

21 else: 

22 os.environ['RHODENT_REDISTRIBUTE_MAXSIZE'] = str(maxsize) 

23 

24 pulses = [create_pulse(pf, 5.0, 10.0) for pf in [3.0, 4.0]] 

25 time_t = np.array([20, 21, 22]) 

26 

27 # Set up mock KS density matrices reader 

28 rho_chunk_reader = mock_ks_rho_reader(filter_times=np.linspace(0, 30, 300) * fs_to_au) 

29 # Set up the pulse convolver 

30 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=30, striden2=30) 

31 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters) 

32 pulse_convolver = PulseConvolver(time_distributor, 

33 pulses=pulses, 

34 perturbation={'name': 'deltakick', 'strength': 1e-5}, 

35 filter_times=time_t * fs_to_au, 

36 derivative_order_s=[0, 1, 2]) 

37 

38 # Collect on root. This calculates different chunks on different ranks 

39 # and gathers and groups the results on the root rank 

40 ref_full_dm = pulse_convolver.collect_on_root() 

41 

42 time_t = pulse_convolver.time_t 

43 parameters = pulse_convolver.rho_nn_reader._parameters 

44 myt = pulse_convolver.my_work() 

45 

46 # Redistribute the data so that different ranks now have different times 

47 # This is usually done by the ConvolutionDensityMatrices 

48 full_dm = pulse_convolver.redistribute() 

49 

50 # Loop over the results and sum them to the root rank 

51 nnshape = parameters.full_nnshape 

52 test_dm = DensityMatrixBuffer(nnshape=nnshape, xshape=(len(pulses), len(time_t)), dtype=float) 

53 test_dm.zero_buffers(real=pulse_convolver.yield_re, 

54 imag=pulse_convolver.yield_im, 

55 derivative_order_s=pulse_convolver.derivative_order_s) 

56 

57 for array_iapt, part_array_iapt in zip(test_dm._iter_buffers(), 

58 full_dm._iter_buffers()): 

59 array_iapt[..., myt] = part_array_iapt[..., :len(myt)] 

60 world.sum(array_iapt, 0) 

61 

62 if world.rank != 0: 

63 return 

64 

65 # Check that data is the same 

66 for test_array_iapt, ref_array_iapt in zip(test_dm._iter_buffers(), 

67 ref_full_dm._iter_buffers()): 

68 np.testing.assert_allclose(test_array_iapt, ref_array_iapt) 

69 

70 

71@pytest.mark.parametrize('test_system', ['Na8', 'Ag8']) 

72@pytest.mark.parametrize('striden1', [5, 30, 100]) 

73@pytest.mark.parametrize('result_on_ranks', [ 

74 [0, 1], [0, 2], [0, 1, 3], [1, 2], [3, 2], [3, 1] 

75 ]) 

76def test_pulse_convolver_redistribute_result_on_ranks(mock_ks_rho_reader, ksd_fname, result_on_ranks, striden1): 

77 if max(result_on_ranks) >= world.size: 

78 pytest.skip('World size not compatible with suggested ranks for distribution') 

79 

80 pulses = [create_pulse(pf, 5.0, 10.0) for pf in [3.0, 4.0]] 

81 time_t = np.linspace(0, 30, 20) 

82 

83 # Set up mock KS density matrices reader 

84 rho_chunk_reader = mock_ks_rho_reader(filter_times=time_t * fs_to_au) 

85 # Set up the pulse convolver 

86 parameters = RhoParameters.from_ksd(rho_chunk_reader.ksd, striden1=striden1, striden2=striden1) 

87 time_distributor = AlltoallvTimeDistributor(rho_chunk_reader, parameters) 

88 pulse_kwargs = dict(rho_nn_reader=time_distributor, 

89 pulses=pulses, 

90 perturbation={'name': 'deltakick', 'strength': 1e-5}, 

91 filter_times=time_t * fs_to_au, 

92 derivative_order_s=[0, 1, 2]) 

93 pulse_convolver = PulseConvolver(**pulse_kwargs) 

94 

95 # Collect on root. This calculates different chunks on different ranks 

96 # and gathers and groups the results on the root rank 

97 ref_full_dm = pulse_convolver.collect_on_root() 

98 

99 # Set up with different ranks collecting the data 

100 pulse_convolver = PulseConvolver(result_on_ranks=result_on_ranks, **pulse_kwargs) 

101 time_t = pulse_convolver.time_t 

102 parameters = pulse_convolver.rho_nn_reader._parameters 

103 myt = pulse_convolver.my_work() 

104 

105 # Redistribute the data so that different ranks now have different times 

106 # This is usually done by the ConvolutionDensityMatrices 

107 full_dm = pulse_convolver.redistribute() 

108 

109 # Loop over the results and sum them to the root rank 

110 nnshape = parameters.full_nnshape 

111 test_dm = DensityMatrixBuffer(nnshape=nnshape, xshape=(len(pulses), len(time_t)), dtype=float) 

112 test_dm.zero_buffers(real=pulse_convolver.yield_re, 

113 imag=pulse_convolver.yield_im, 

114 derivative_order_s=pulse_convolver.derivative_order_s) 

115 

116 for array_iapt, part_array_iapt in zip(test_dm._iter_buffers(), 

117 full_dm._iter_buffers()): 

118 array_iapt[..., myt] = part_array_iapt[..., :len(myt)] 

119 world.sum(array_iapt, 0) 

120 

121 if world.rank != 0: 

122 return 

123 

124 # Check that data is the same 

125 for test_array_iapt, ref_array_iapt in zip(test_dm._iter_buffers(), 

126 ref_full_dm._iter_buffers()): 

127 np.testing.assert_allclose(test_array_iapt, ref_array_iapt)