Coverage for tests/unittests/density_matrices/readers/test_numpy.py: 100%

126 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-01 16:57 +0000

1from __future__ import annotations 

2 

3from pathlib import Path 

4import numpy as np 

5import pytest 

6 

7from gpaw.mpi import world, broadcast 

8from rhodent.utils import create_pulse 

9from rhodent.density_matrices.readers.numpy import FrequencyDensityMatrixReader, TimeDensityMatrixReader 

10from rhodent.utils import get_gaussian_pulse_values 

11 

12 

13def rho_p_from_rho_ia(ksd, rho_ia): 

14 """ Transform from non-ravelled form to ravelled form. 

15 

16 """ 

17 ia_p = ksd.ia_p 

18 imin, amin = ia_p.min(axis=0) 

19 i_p, a_p = ia_p.T 

20 rho_p = rho_ia[i_p - imin, a_p - amin] 

21 return rho_p 

22 

23 

24def legacy_write_to_disk_time(density_matrices, 

25 pulserho_fmt: str): 

26 """ Legacy implementation of writing to disk, 

27 where density matrices are stored in ravelled form (single index for 

28 electron-hole pairs). 

29 

30 parameters 

31 ---------- 

32 pulserho_fmt 

33 formatting string for the density matrices saved to disk. 

34 

35 example: 

36 

37 * pulserho_fmt = `pulserho/t{time:09.1f}{tag}.npy`. 

38 

39 accepts variables 

40 

41 * `{time}` - time in units of as. 

42 * `{tag}` - derivative tag, `''`, `'-Iomega'`, or `'-omega2'`. 

43 * `{pulsefreq}` - pulse frequency in units of ev. 

44 * `{pulsefwhm}` - pulse fwhm in units of fs. 

45 """ 

46 nlocaltot = len(density_matrices.local_work_plan) 

47 

48 tags_keys = [(tag, key) for s, (tag, key) in enumerate( 

49 [('', 'rho_ia'), ('-Iomega', 'drho_ia'), ('-omega2', 'ddrho_ia')]) if s in density_matrices.derivative_order_s] 

50 

51 # iterate over density matrices on all ranks 

52 for ndone, (work, dm) in enumerate(density_matrices, 1): 

53 avg = density_matrices.log.elapsed('read')/ndone 

54 estrem = avg * (nlocaltot - ndone) 

55 density_matrices.log( 

56 f'obtained density matrix {ndone:4.0f}/{nlocaltot:4.0f} on this rank. ' 

57 f'avg: {avg:10.3f}s, estimated remaining: {estrem:10.3f}s', who='writer', flush=True) 

58 for tag, key in tags_keys: 

59 fname_kw = dict(time=work.time, tag=tag, **get_gaussian_pulse_values(work.pulse)) 

60 fpath = Path(pulserho_fmt.format(**fname_kw)) 

61 rho_p = rho_p_from_rho_ia(density_matrices.ksd, getattr(dm, key)) 

62 if density_matrices.calc_comm.rank == 0: 

63 assert isinstance(rho_p, np.ndarray) 

64 fpath.parent.mkdir(parents=True, exist_ok=True) 

65 if fpath.suffix == '.npz': 

66 # write numpy archive 

67 np.savez(str(fpath), rho_p=rho_p) 

68 else: 

69 # save numpy binary file 

70 np.save(str(fpath), rho_p) 

71 density_matrices.log(f'written {fpath}', who='writer', flush=True) 

72 world.barrier() 

73 

74 

75def legacy_write_to_disk_frequencies(density_matrices, 

76 frho_fmt: str): 

77 """ Legacy implementation of writing to disk, 

78 where density matrices are stored in ravelled form (single index for 

79 electron-hole pairs). 

80 

81 Parameters 

82 ---------- 

83 frho_fmt 

84 Formatting string for the density matrices saved to disk. 

85 

86 Example: 

87 

88 * frho_fmt = ``frho/w{freq:05.2f}-{reim}.npy``. 

89 

90 Accepts variables: 

91 

92 * ``{freq}`` - Frequency in units of eV. 

93 * ``{reim}`` - ``'Re'`` or ``'Im'`` for Fourier transform of real/imaginary 

94 part of density matrix. 

95 """ 

96 nlocaltot = len(density_matrices.local_work_plan) 

97 

98 # Iterate over density matrices on all ranks 

99 for ndone, (work, dm) in enumerate(density_matrices, 1): 

100 avg = density_matrices.log.elapsed('read')/ndone 

101 estrem = avg * (nlocaltot - ndone) 

102 density_matrices.log( 

103 f'Obtained density matrix {ndone:4.0f}/{nlocaltot:4.0f} on this rank. ' 

104 f'Avg: {avg:10.3f}s, estimated remaining: {estrem:10.3f}s', who='Writer', flush=True) 

105 fname_kw = dict(freq=work.freq, reim=work.reim) 

106 fpath = Path(frho_fmt.format(**fname_kw)) 

107 rho_p = rho_p_from_rho_ia(density_matrices.ksd, dm.rho_ia) 

108 if density_matrices.ksd.only_ia: 

109 # Twice the rho is saved by the KohnShamDecomposition transform 

110 rho_p *= 2 

111 if density_matrices.calc_comm.rank == 0: 

112 assert isinstance(rho_p, np.ndarray) 

113 fpath.parent.mkdir(parents=True, exist_ok=True) 

114 if fpath.suffix == '.npz': 

115 # Write numpy archive 

116 np.savez(str(fpath), rho_p=rho_p) 

117 else: 

118 # Save numpy binary file 

119 np.save(str(fpath), rho_p) 

120 density_matrices.log(f'Written {fpath}', who='Writer', flush=True) 

121 world.barrier() 

122 

123 

124@pytest.mark.parametrize('fmt', [ 

125 'pulserho_pf{pulsefreq:.2f}/t{time:09.1f}{tag}.npy', 

126 'pulserho_pf{pulsefreq:.2f}/t{time:09.1f}{tag}.npz', 

127 ]) 

128@pytest.mark.parametrize('test_system', ['Na8', 'Ag8']) 

129def test_read_write_time(tmp_path, mock_response, ksd_fname, fmt): 

130 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same 

131 

132 times = np.arange(10e3, 30e3, 3e3) 

133 pulses = [create_pulse(pf) for pf in [3.3, 4.0]] 

134 kwargs = dict(times=times, pulses=pulses, derivative_order_s=[0, 1, 2]) 

135 

136 response = mock_response() 

137 

138 # Write 

139 pulserho_fmt = str(tmp_path / fmt) 

140 response.write_in_time(pulserho_fmt, **kwargs) 

141 

142 # Set up reader, with slightly different files. The correct files should 

143 # still be found on disk as they are the closest. 

144 reader = TimeDensityMatrixReader(pulserho_fmt, 

145 ksd=ksd_fname, 

146 filter_times=times + 0.04e3, 

147 pulses=pulses, 

148 derivative_order_s=[0, 1, 2]) 

149 np.testing.assert_almost_equal(times, reader.times) 

150 

151 # Compare against density matrices reference 

152 for work, ref_dm in response.iterate_density_matrices_in_time(**kwargs): 

153 for derivative, attr in zip([0, 1, 2], ['rho_ia', 'drho_ia', 'ddrho_ia']): 

154 test_rho = reader.read(work.time, work.pulse, derivative) 

155 ref_rho = getattr(ref_dm, attr) 

156 # Mask wrong side of diagonal in mock response 

157 np.testing.assert_allclose(test_rho, ref_rho, err_msg=f'{work} {derivative}') 

158 

159 

160@pytest.mark.parametrize('fmt', [ 

161 'pulserho_pf{pulsefreq:.2f}/t{time:09.1f}{tag}.npy', 

162 'pulserho_pf{pulsefreq:.2f}/t{time:09.1f}{tag}.npz', 

163 ]) 

164@pytest.mark.parametrize('test_system', ['Na8', 'Ag8']) 

165def test_read_legacy_write_time(tmp_path, mock_response, ksd_fname, fmt): 

166 """ Make sure that the density matrices written the legacy way can still be read. """ 

167 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same 

168 

169 times = [12e3, 14e3] 

170 pulses = [create_pulse(pf) for pf in [3.3, 4.0]] 

171 kwargs = dict(times=times, pulses=pulses, derivative_order_s=[0, 1, 2]) 

172 

173 response = mock_response() 

174 

175 # Write 

176 pulserho_fmt = str(tmp_path / fmt) 

177 legacy_write_to_disk_time(response._get_time_density_matrices(**kwargs), pulserho_fmt) 

178 

179 # Set up reader, with slightly different files. The correct files should 

180 # still be found on disk as they are the closest. 

181 reader = TimeDensityMatrixReader(pulserho_fmt, 

182 ksd=ksd_fname, 

183 filter_times=times, 

184 pulses=pulses, 

185 derivative_order_s=[0, 1, 2]) 

186 np.testing.assert_almost_equal(times, reader.times) 

187 

188 # Calculate occupation number difference 

189 f_n = reader.ksd.occ_un[0] 

190 imin, imax, amin, amax = reader.ksd.ialims() 

191 f_ia = f_n[imin:imax+1, None] - f_n[None, amin:amax+1] 

192 

193 # Calculate mask 

194 min_occdiff = min(reader.ksd.f_p) 

195 mask_ia = f_ia >= min_occdiff 

196 

197 # Compare against density matrices reference 

198 for work, ref_dm in response.iterate_density_matrices_in_time(**kwargs): 

199 for derivative, attr in zip([0, 1, 2], ['rho_ia', 'drho_ia', 'ddrho_ia']): 

200 test_rho = reader.read(work.time, work.pulse, derivative) 

201 ref_rho = getattr(ref_dm, attr) * mask_ia 

202 # Mask wrong side of diagonal in mock response 

203 np.testing.assert_allclose(test_rho, ref_rho, err_msg=f'{work} {derivative}') 

204 

205 

206@pytest.mark.parametrize('fmt', [ 

207 'frho/w{freq:05.2f}-{reim}.npy', 

208 'frho/w{freq:05.2f}-{reim}.npz' 

209 ]) 

210@pytest.mark.parametrize('test_system', ['Na8', 'Ag8']) 

211def test_read_write_freq(tmp_path, mock_response, ksd_fname, fmt): 

212 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same 

213 

214 frequencies = np.arange(1, 5, 0.3) 

215 kwargs = dict(frequencies=frequencies) 

216 

217 response = mock_response() 

218 

219 # Write 

220 frho_fmt = str(tmp_path / fmt) 

221 response.write_in_frequency(frho_fmt, **kwargs) 

222 

223 # Set up reader 

224 reader = FrequencyDensityMatrixReader( 

225 frho_fmt, 

226 ksd=ksd_fname, 

227 filter_frequencies=frequencies + 0.1) 

228 np.testing.assert_almost_equal(frequencies, reader.frequencies) 

229 

230 # Compare against density matrices reference 

231 for work, ref_dm in response.iterate_density_matrices_in_frequency(**kwargs): 

232 test_rho = reader.read(work.freq, work.real) 

233 ref_rho = ref_dm.rho_ia 

234 # Mask wrong side of diagonal in mock response 

235 np.testing.assert_allclose(test_rho, ref_rho, err_msg=f'{work}') 

236 

237 

238@pytest.mark.parametrize('fmt', [ 

239 'frho/w{freq:05.2f}-{reim}.npy', 

240 'frho/w{freq:05.2f}-{reim}.npz' 

241 ]) 

242@pytest.mark.parametrize('test_system', ['Na8', 'Ag8']) 

243def test_read_legacy_write_freq(tmp_path, mock_response, ksd_fname, fmt): 

244 """ Make sure that the density matrices written the legacy way can still be read. """ 

245 tmp_path = broadcast(tmp_path if world.rank == 0 else None) # Make sure that the temp path is the same 

246 

247 frequencies = [1.1, 1.3] 

248 kwargs = dict(frequencies=frequencies) 

249 

250 response = mock_response() 

251 

252 # Write 

253 frho_fmt = str(tmp_path / fmt) 

254 legacy_write_to_disk_frequencies(response._get_frequency_density_matrices(**kwargs), frho_fmt) 

255 

256 # Set up reader 

257 reader = FrequencyDensityMatrixReader( 

258 frho_fmt, 

259 ksd=ksd_fname, 

260 filter_frequencies=frequencies) 

261 np.testing.assert_almost_equal(frequencies, reader.frequencies) 

262 

263 # Calculate occupation number difference 

264 f_n = reader.ksd.occ_un[0] 

265 imin, imax, amin, amax = reader.ksd.ialims() 

266 f_ia = f_n[imin:imax+1, None] - f_n[None, amin:amax+1] 

267 

268 # Calculate mask 

269 min_occdiff = min(reader.ksd.f_p) 

270 mask_ia = f_ia >= min_occdiff 

271 

272 # Compare against density matrices reference 

273 for work, ref_dm in response.iterate_density_matrices_in_frequency(**kwargs): 

274 test_rho = reader.read(work.freq, work.real) 

275 ref_rho = ref_dm.rho_ia * mask_ia 

276 # Mask wrong side of diagonal in mock response 

277 np.testing.assert_allclose(test_rho, ref_rho, err_msg=f'{work}')