Coverage for tests/integration/density_matrices/distributed/test_frequency.py: 96%
52 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-01 16:57 +0000
1from __future__ import annotations
3import pytest
4import numpy as np
7@pytest.mark.bigdata
8@pytest.mark.parametrize('test_system', ['Na8', 'Ag8'])
9def test_density_matrix_frho_against_wave_functions(ksd_fname, frho_dname, wfs_fname):
10 """ Test that we can use the 'distributed' class to compute density matrices
11 in frequency space from a wave function file. Tests that the following works:
13 - Wave functions are read in time and transformed
14 into the Kohn-Sham basis (KohnShamRhoWfsReader).
15 Each MPI rank holds the same part of the density
16 matrix at different times.
17 - The MPI ranks exchange data so that each rank now
18 holds a smaller part of the density matrix for
19 contiguous times (AlltoallvTimeDistributor).
20 - The matrices are elementwise Fourier transformed
21 into frequency space (FourierTransformer).
22 - Everything is collected to a large buffer on the
23 root rank (collect_on_root).
25 Compare against the density matrices in frequency space read from the
26 FrequencyDensityMatrix file.
28 Reading with different strides etc is tested in unit tests with mock data.
29 """
30 from gpaw.lcaotddft.ksdecomposition import KohnShamDecomposition
31 from gpaw.tddft.units import eV_to_au
32 from gpaw.mpi import world
34 from rhodent.density_matrices.readers.numpy import FrequencyDensityMatrixReader
35 from rhodent.density_matrices.distributed.frequency import FourierTransformer
36 from rhodent.utils import add_fake_kpts
38 ksd = KohnShamDecomposition(filename=ksd_fname)
39 add_fake_kpts(ksd)
40 imin, imax, amin, amax = ksd.ialims()
42 # Array of frequencies for frho file
43 frho_freq_w = np.arange(1.0, 2.01, 0.05)
45 perturbation = {'name': 'SincPulse', 'strength': 1e-6,
46 'cutoff_freq': 4, 'time0': 5, 'relative_t0': True}
48 # Set up the KohnShamRhoWfsReader, AlltoallvTimeDistributor, and FourierTransformer
49 rho_nn_fft = FourierTransformer.from_parameters(
50 wfs_fname=wfs_fname,
51 ksd=ksd_fname,
52 perturbation=perturbation,
53 yield_re=True,
54 yield_im=True,
55 filter_frequencies=frho_freq_w * eV_to_au)
57 # Read and Fourier transform, collect on the root rank
58 wfs_buffer = rho_nn_fft.collect_on_root()
60 frho_fmt = str(frho_dname / 'w{freq:05.2f}-{reim}.npy')
61 reader = FrequencyDensityMatrixReader(frho_fmt, ksd_fname, frho_freq_w, real=True, imag=True, log=rho_nn_fft.log)
63 if world.rank > 0:
64 # Let the root rank test
65 assert wfs_buffer is None
66 return
68 # Create an indentical, zero-initialized buffer
69 frho_buffer = wfs_buffer.new()
70 frho_buffer.zero_buffers(real=True, imag=True, derivative_order_s=[0])
72 # Read files, fill buffer
73 for w, freq in enumerate(frho_freq_w):
74 for real in [True, False]:
75 rho_ia = reader.read(freq, real)
76 frho_buffer[w]._get_data(real=real, derivative=0)[:] = rho_ia
78 # Normalize by the perturbation. The fourier_transformer data is already normalized
79 frho_buffer.real[:] /= 1e-5
80 frho_buffer.imag[:] /= 1e-5
82 # Take average of absolutes across frequencies
83 frho_avg_real_ia = np.abs(frho_buffer.real).mean(axis=2)
84 wfs_avg_real_ia = np.abs(wfs_buffer.real).mean(axis=2)
85 frho_avg_imag_ia = np.abs(frho_buffer.imag).mean(axis=2)
86 wfs_avg_imag_ia = np.abs(wfs_buffer.imag).mean(axis=2)
88 atol = frho_avg_real_ia.max() * 0.0001
89 # Filter out almost zero elements (makes debugging easier)
90 # Use frho since it has zeros above the diagonals, unlike wfs
91 filter_ia = frho_avg_real_ia > atol
92 numel = filter_ia.size
93 numnonzero = filter_ia.sum()
94 assert numnonzero > 0.05 * numel, f'too few elements nonzero ({numnonzero}) compared to {numel}'
95 frho_avg_real_ia *= filter_ia
96 wfs_avg_real_ia *= filter_ia
98 filter_ia = frho_avg_imag_ia > atol
99 numel = filter_ia.size
100 numnonzero = filter_ia.sum()
101 assert numnonzero > 0.05 * numel, f'too few elements nonzero ({numnonzero}) compared to {numel}'
102 frho_avg_imag_ia *= filter_ia
103 wfs_avg_imag_ia *= filter_ia
105 # Make sure that roughly the same elements are large
106 atol = frho_avg_real_ia.max() * 0.01
107 np.testing.assert_allclose(frho_avg_real_ia, wfs_avg_real_ia, atol=atol, rtol=0.2)
108 np.testing.assert_allclose(frho_avg_imag_ia, wfs_avg_imag_ia, atol=atol, rtol=0.2)