Coverage for tests/unittests/writers/test_density_writers.py: 81%

85 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3import pytest 

4import numpy as np 

5from ase.io import read 

6from ase.io.ulm import Reader 

7 

8from gpaw.mpi import world, broadcast 

9from rhodent.calculators.density import DensityCalculator 

10from rhodent.utils import create_pulse 

11 

12 

13def load_cube(cube_fmt, which_list, times=None, freqs=None): 

14 cube_fmt = str(cube_fmt) 

15 d = {} 

16 assert (times is None) ^ (freqs is None) 

17 collectlabel = 't' if freqs is None else 'w' 

18 for which in which_list: 

19 key = 'rho' if which == 'induced' else 'occ_rho' if which == 'holes' else 'unocc_rho' 

20 if freqs is None: 

21 filenames = [cube_fmt.format(which=which, time=time) for time in times] 

22 else: 

23 filenames = [cube_fmt.format(which=which, freq=freq) for freq in freqs] 

24 data = [read(fname, format='cube', parallel=False, read_data=True, full_output=True)['data'] 

25 for fname in filenames] 

26 d[f'{key}_{collectlabel}g'] = np.array(data) 

27 

28 return d 

29 

30 

31def load_ulm(fname): 

32 with Reader(fname) as reader: 

33 return reader.asdict() 

34 

35 

36def compare(leftdata, rightdata, keys, **kw): 

37 for key in keys: 

38 np.testing.assert_allclose(leftdata[key], rightdata[key], err_msg=key, **kw) 

39 

40 

41@pytest.mark.bigdata 

42@pytest.mark.parametrize('test_system', ['2H2']) 

43def test_density_time(tmp_path, gpw_fname, mock_response): 

44 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same 

45 

46 time_t = np.linspace(0, 30, 7) # In fs 

47 

48 # Set up mock objects 

49 pulse = create_pulse(4.0) 

50 calc = DensityCalculator(response=mock_response(), 

51 times=time_t * 1e3, 

52 pulses=[pulse], 

53 gpw_file=gpw_fname) 

54 

55 which = ['induced', 'electrons', 'holes'] 

56 calc.calculate_and_write(str(tmp_path / 'density.npz'), which=which) 

57 calc.calculate_and_write(str(tmp_path / 'density.ulm'), which=which) 

58 calc.calculate_and_write(str(tmp_path / '{which}_density_t{time:09.1f}.cube'), which=which) 

59 times = calc.times 

60 

61 if world.rank != 0: 

62 return 

63 

64 data_npz = np.load(tmp_path / 'density.npz') 

65 data_ulm = load_ulm(tmp_path / 'density.ulm') 

66 data_cube = load_cube(tmp_path / '{which}_density_t{time:09.1f}.cube', which, times=times) 

67 

68 compare(data_npz, data_ulm, ['rho_tg', 'occ_rho_tg', 'unocc_rho_tg']) 

69 compare(data_npz, data_cube, ['rho_tg', 'occ_rho_tg', 'unocc_rho_tg'], atol=1e-10) 

70 

71 

72@pytest.mark.bigdata 

73@pytest.mark.parametrize('test_system', ['2H2']) 

74def test_density_frequency(tmp_path, gpw_fname, mock_response): 

75 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same 

76 

77 freq_w = np.linspace(2, 6, 7) # In units of eV 

78 

79 # Set up mock objects 

80 calc = DensityCalculator(response=mock_response(perturbation={'name': 'deltakick', 'strength': 1e-5}), 

81 frequencies=freq_w, 

82 gpw_file=gpw_fname) 

83 

84 which = 'induced' 

85 calc.calculate_and_write(str(tmp_path / 'density.npz'), which=which) 

86 calc.calculate_and_write(str(tmp_path / 'density.ulm'), which=which) 

87 calc.calculate_and_write(str(tmp_path / '{which}_density_w{freq:05.2f}.cube'), which=which) 

88 freqs = calc.frequencies 

89 

90 if world.rank != 0: 

91 return 

92 

93 data_npz = np.load(tmp_path / 'density.npz') 

94 data_ulm = load_ulm(tmp_path / 'density.ulm') 

95 data_cube = load_cube(tmp_path / '{which}_density_w{freq:05.2f}.cube', ['induced'], freqs=freqs) 

96 

97 compare(data_npz, data_ulm, ['rho_wg']) 

98 compare(data_npz, data_cube, ['rho_wg'], atol=1e-10) 

99 

100 

101@pytest.mark.bigdata 

102@pytest.mark.parametrize('test_system', ['2H2']) 

103def test_density_calc_size(tmp_path, gpw_fname, mock_response): 

104 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same 

105 

106 time_t = np.linspace(0, 30, 7) # In fs 

107 

108 if world.size == 1: 

109 pytest.skip('Parallel only test') 

110 

111 pulse = create_pulse(4.0) 

112 which = ['induced', 'electrons', 'holes'] 

113 

114 # Calculate on calc size 1 

115 calc = DensityCalculator(response=mock_response(), 

116 times=time_t * 1e3, 

117 pulses=[pulse], 

118 gpw_file=gpw_fname) 

119 

120 calc.calculate_and_write(str(tmp_path / 'ref_density.npz'), which=which) 

121 

122 if world.rank == 0: 

123 data_ref_npz = np.load(tmp_path / 'ref_density.npz') 

124 

125 # Calculate on other calc size 

126 for calc_size in range(2, world.size + 1): 

127 if world.size % calc_size != 0: 

128 continue 

129 

130 calc = DensityCalculator(response=mock_response(calc_size=calc_size), 

131 times=time_t * 1e3, 

132 pulses=[pulse], 

133 gpw_file=gpw_fname) 

134 

135 calc.calculate_and_write(str(tmp_path / 'density.npz'), which=which) 

136 

137 if world.rank == 0: 

138 

139 data_npz = np.load(tmp_path / 'density.npz') 

140 compare(data_ref_npz, data_npz, ['rho_tg', 'occ_rho_tg', 'unocc_rho_tg'])