Source code for pyrato.edc

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
    The edc_noise_handling module provides various methods for noise
    compensation of room impulse responses.
"""

import numpy as np
from matplotlib import pyplot as plt
from pyrato import dsp
import warnings
import pyfar as pf


def subtract_noise_from_squared_rir(data, noise_level='auto'):
    """Subtract the noise power from a squared room impulse response.

    Parameters
    ----------
    data : pyfar.TimeData
        The squared room impulse response.
    noise_level : str or numpy.ndarray, float, optional
        The noise power for each channel. The default is 'auto', which will
        try to estimate the noise power from the room impulse response.

    Returns
    -------
    pyfar.TimeData
        The squared room impulse response after noise power subtraction.

    """
    subtracted = _subtract_noise_from_squared_rir(
        data.time, noise_level=noise_level)

    return pf.TimeData(subtracted, data.times, comment=data.comment)


def _subtract_noise_from_squared_rir(data, noise_level='auto'):
    """Subtract the noise power from a squared room impulse response.

    Parameters
    ----------
    data : ndarray, double
        The squared room impulse response with dimension [..., n_samples]

    Returns
    -------
    result : ndarray, double
        The noise-subtracted RIR.

    """
    if noise_level == "auto":
        noise_level = dsp._estimate_noise_energy(
            data,
            interval=[0.9, 1.0])
    return (data.T - noise_level).T


[docs] def schroeder_integration(room_impulse_response, is_energy=False): r"""Calculate the Schroeder integral of a room impulse response [#]_. The result is the energy decay curve for the given room impulse response. .. math: \langle e^2(t) \rangle = N\cdot \int_{t}^{\infty} h^2(\tau) \mathrm{d} \tau Parameters ---------- room_impulse_response : pyfar.Signal Room impulse response as array is_energy : boolean, optional Whether the input represents energy data or sound pressure values. Returns ------- energy_decay_curve : pyfar.TimeData The energy decay curve Note ---- This function does not apply any compensation of measurement noise and integrates the full length of the input signal. It should only be used if no measurement noise or artifacts are present in the data. References ---------- .. [#] M. R. Schroeder, “New Method of Measuring Reverberation Time,” The Journal of the Acoustical Society of America, vol. 37, no. 6, pp. 1187-1187, 1965. Example ------- Calculate the Schroeder integral of a simulated RIR and plot. .. plot:: >>> import numpy as np >>> import pyfar as pf >>> import pyrato as ra >>> from pyrato.analytic import rectangular_room_rigid_walls ... >>> L = np.array([8, 5, 3])/10 >>> source_pos = np.array([5, 3, 1.2])/10 >>> receiver_pos = np.array([1, 1, 1.2])/10 >>> rir, _ = rectangular_room_rigid_walls( ... L, source_pos, receiver_pos, ... reverberation_time=1, max_freq=1e3, n_samples=2**16, ... speed_of_sound=343.9) >>> edc = ra.schroeder_integration(rir) >>> pf.plot.time(rir/np.abs(rir.time).max(), dB=True, label='RIR') >>> ax = pf.plot.time( ... edc/edc.time[..., 0], dB=True, log_prefix=10, label='EDC') >>> ax.set_ylim(-65, 5) >>> ax.legend() """ edc = _schroeder_integration( room_impulse_response.time, is_energy=is_energy) return pf.TimeData(edc, room_impulse_response.times)
def _schroeder_integration(impulse_response, is_energy=False): r"""Calculate the Schroeder integral of a room impulse response [#]_. The result is the energy decay curve for the given room impulse response. .. math: \langle e^2(t) \rangle = N\cdot \int_{t}^{\infty} h^2(\tau) \mathrm{d} \tau Parameters ---------- impulse_response : ndarray, double Room impulse response as array is_energy : boolean, optional Whether the input represents energy data or sound pressure values. Returns ------- energy_decay_curve : ndarray, double The energy decay curve References ---------- .. [#] M. R. Schroeder, “New Method of Measuring Reverberation Time,” The Journal of the Acoustical Society of America, vol. 37, no. 6, pp. 1187-1187, 1965. """ if not is_energy: data = np.abs(impulse_response)**2 else: data = impulse_response.copy() ndim = data.ndim data = np.atleast_2d(data) energy_decay_curve = np.fliplr(np.nancumsum(np.fliplr(data), axis=-1)) if ndim < energy_decay_curve.ndim: energy_decay_curve = np.squeeze(energy_decay_curve) return energy_decay_curve
[docs] def energy_decay_curve_truncation( data, freq='broadband', noise_level='auto', is_energy=False, time_shift=True, channel_independent=False, normalize=True, threshold=15, plot=False): """ This function truncates a given room impulse response by the intersection time after Lundeby and calculates the energy decay curve. Parameters ---------- data : pyfar.Signal The room impulse response. freq: integer OR string The frequency band. If set to 'broadband', the time window of the Lundeby-algorithm will not be set in dependence of frequency. noise_level: ndarray, double OR string If not specified, the noise level is calculated based on the last 10 percent of the RIR. Otherwise specify manually for each channel as array. is_energy: boolean Defines, if the data is already squared. time_shift : boolean Defines, if the silence at beginning of the RIR should be removed. channel_independent : boolean Defines, if the time shift and normalization is done channel-independently or not. normalize : boolean Defines, if the energy decay curve should be normalized in the end or not. threshold : float Defines a peak-signal-to-noise ratio based threshold in dB for final truncation of the EDC. Values below the sum of the threshold level and the peak-signal-to-noise ratio in dB are discarded. The default is 15 dB, which is in correspondence with ISO 3382-1:2009 [#]_. plot: Boolean Specifies, whether the results should be visualized or not. Returns ------- pyfar.TimeData Returns the noise compensated EDC. Examples -------- Plot the RIR and the EDC calculated truncating the integration at the intersection time. .. plot:: >>> import numpy as np >>> import pyfar as pf >>> import pyrato as ra >>> from pyrato.analytic import rectangular_room_rigid_walls ... >>> L = np.array([8, 5, 3])/10 >>> source_pos = np.array([5, 3, 1.2])/10 >>> receiver_pos = np.array([1, 1, 1.2])/10 >>> rir, _ = rectangular_room_rigid_walls( ... L, source_pos, receiver_pos, ... reverberation_time=1, max_freq=1e3, n_samples=2**16, ... speed_of_sound=343.9) >>> rir = rir/rir.time.max() ... >>> awgn = pf.signals.noise( ... rir.n_samples, rms=rir.time.max()*10**(-50/20), ... sampling_rate=rir.sampling_rate) >>> rir = rir + awgn >>> edc = ra.energy_decay_curve_truncation(rir) ... >>> ax = pf.plot.time(rir, dB=True, label='RIR') >>> pf.plot.time(edc, dB=True, log_prefix=10, label='EDC') >>> ax.set_ylim(-65, 5) >>> ax.legend() References ---------- .. [#] International Organization for Standardization, “EN ISO 3382-1:2009 Acoustics - Measurement of room acoustic parameters,” 2009. """ energy_data = dsp.preprocess_rir( data, is_energy=is_energy, shift=time_shift, channel_independent=channel_independent) n_samples = data.n_samples intersection_time = intersection_time_lundeby( energy_data, freq=freq, initial_noise_power=noise_level, is_energy=True, time_shift=False, channel_independent=False, plot=False)[0] intersection_time_idx = np.rint(intersection_time * data.sampling_rate) psnr = dsp.peak_signal_to_noise_ratio( data, noise_level, is_energy=is_energy) trunc_levels = 10*np.log10((psnr)) - threshold energy_decay_curve = np.zeros([*data.cshape, n_samples]) for ch in np.ndindex(data.cshape): energy_decay_curve[ ch, :int(intersection_time_idx[ch])] = \ _schroeder_integration( energy_data.time[ ch, :int(intersection_time_idx[ch])], is_energy=True) energy_decay_curve = _truncate_energy_decay_curve( energy_decay_curve, trunc_levels) if normalize: # Normalize the EDC... if not channel_independent: # ...by the first element of each channel. energy_decay_curve = (energy_decay_curve.T / energy_decay_curve[..., 0]).T else: # ...by the maximum first element of each channel. max_start_value = np.amax(energy_decay_curve[..., 0]) energy_decay_curve /= max_start_value edc = pf.TimeData( energy_decay_curve, data.times, comment=data.comment) if plot: ax = pf.plot.time(data, dB=True, label='RIR') pf.plot.time(edc, dB=True, log_prefix=10, label='EDC') ax.set_ylim(-65, 5) ax.legend() return edc
[docs] def energy_decay_curve_lundeby( data, freq='broadband', noise_level='auto', is_energy=False, time_shift=True, channel_independent=False, normalize=True, plot=False): """Lundeby et al. [#]_ proposed a correction term to prevent the truncation error. The missing signal energy from truncation time to infinity is estimated and added to the truncated integral. Parameters ---------- data : pyfar.Signal The room impulse response. freq: integer OR string The frequency band. If set to 'broadband', the time window of the Lundeby-algorithm will not be set in dependence of frequency. noise_level: ndarray, double OR string If not specified, the noise level is calculated based on the last 10 percent of the RIR. Otherwise specify manually for each channel as array. is_energy: boolean Defines, if the data is already squared. time_shift : boolean Defines, if the silence at beginning of the RIR should be removed. channel_independent : boolean Defines, if the time shift and normalizsation is done channel-independently or not. normalize : boolean Defines, if the energy decay curve should be normalized in the end or not. plot: Boolean Specifies, whether the results should be visualized or not. Returns ------- pyfar.TimeData Returns the noise handeled edc. References ---------- .. [#] Lundeby, Virgran, Bietz and Vorlaender - Uncertainties of Measurements in Room Acoustics - ACUSTICA Vol. 81 (1995) Examples -------- Plot the RIR and the EDC calculated after Lundeby. .. plot:: >>> import numpy as np >>> import pyfar as pf >>> import pyrato as ra >>> from pyrato.analytic import rectangular_room_rigid_walls ... >>> L = np.array([8, 5, 3])/10 >>> source_pos = np.array([5, 3, 1.2])/10 >>> receiver_pos = np.array([1, 1, 1.2])/10 >>> rir, _ = rectangular_room_rigid_walls( ... L, source_pos, receiver_pos, ... reverberation_time=1, max_freq=1e3, n_samples=2**16, ... speed_of_sound=343.9) >>> rir = rir/rir.time.max() ... >>> awgn = pf.signals.noise( ... rir.n_samples, rms=rir.time.max()*10**(-50/20), ... sampling_rate=rir.sampling_rate) >>> rir = rir + awgn >>> edc = ra.energy_decay_curve_lundeby(rir) ... >>> ax = pf.plot.time(rir, dB=True, label='RIR') >>> pf.plot.time(edc, dB=True, log_prefix=10, label='EDC') >>> ax.set_ylim(-65, 5) >>> ax.legend() """ energy_data = dsp.preprocess_rir( data, is_energy=is_energy, shift=time_shift, channel_independent=channel_independent) n_samples = energy_data.n_samples sampling_rate = data.sampling_rate intersection_time, late_reverberation_time, noise_estimation = \ intersection_time_lundeby( energy_data, freq=freq, initial_noise_power=noise_level, is_energy=True, time_shift=False, channel_independent=False, plot=False) time_vector = data.times energy_decay_curve = np.zeros([*data.cshape, n_samples]) for ch in np.ndindex(data.cshape): intersection_time_idx = np.argmin( np.abs(time_vector - intersection_time[ch])) p_square_at_intersection = noise_estimation[ch] # Calculate correction term according to DIN EN ISO 3382 # TO-DO: check reference! correction = (p_square_at_intersection * late_reverberation_time[ch] * (1 / (6*np.log(10))) * sampling_rate) energy_decay_curve[ch, :intersection_time_idx] = \ _schroeder_integration( energy_data.time[ch, :intersection_time_idx], is_energy=True) energy_decay_curve[ch] += correction energy_decay_curve[ch, intersection_time_idx:] = np.nan if normalize: # Normalize the EDC... if not channel_independent: # ...by the first element of each channel. energy_decay_curve = ( energy_decay_curve.T / energy_decay_curve[..., 0]).T else: # ...by the maximum first element of each channel. max_start_value = np.amax(energy_decay_curve[..., 0]) energy_decay_curve /= max_start_value edc = pf.TimeData( energy_decay_curve, data.times, comment=data.comment) if plot: ax = pf.plot.time(data, dB=True, label='RIR') pf.plot.time(edc, dB=True, log_prefix=10, label='EDC') ax.set_ylim(-65, 5) ax.legend() return edc
[docs] def energy_decay_curve_chu( data, noise_level='auto', is_energy=False, time_shift=True, channel_independent=False, normalize=True, threshold=10, plot=False): """ Implementation of the "subtraction of noise"-method after Chu [#] The noise level is estimated and subtracted from the impulse response before backward integration. Parameters ---------- data : ndarray, double The room impulse response with dimension [..., n_samples] noise_level: ndarray, double OR string If not specified, the noise level is calculated based on the last 10 percent of the RIR. Otherwise specify manually for each channel as array. is_energy: boolean Defines, if the data is already squared. time_shift : boolean Defines, if the silence at beginning of the RIR should be removed. channel_independent : boolean Defines, if the time shift and normalizsation is done channel-independently or not. normalize : boolean Defines, if the energy decay curve should be normalized in the end or not. threshold : float, None Defines a peak-signal-to-noise ratio based threshold in dB for final truncation of the EDC. Values below the sum of the threshold level and the peak-signal-to-noise ratio in dB are discarded. The default is 10 dB. If `None`, the decay curve will not be truncated further. plot: Boolean Specifies, whether the results should be visualized or not. Returns ------- energy_decay_curve: ndarray, double Returns the noise handeled edc. References ---------- .. [#] W. T. Chu. “Comparison of reverberation measurements using Schroeder’s impulse method and decay-curve averaging method”. In: Journal of the Acoustical Society of America 63.5 (1978), pp. 1444–1450. Examples -------- .. plot:: >>> import numpy as np >>> import pyfar as pf >>> import pyrato as ra >>> from pyrato.analytic import rectangular_room_rigid_walls ... >>> L = np.array([8, 5, 3])/10 >>> source_pos = np.array([5, 3, 1.2])/10 >>> receiver_pos = np.array([1, 1, 1.2])/10 >>> rir, _ = rectangular_room_rigid_walls( ... L, source_pos, receiver_pos, ... reverberation_time=1, max_freq=1e3, n_samples=2**16, ... speed_of_sound=343.9) ... >>> awgn = pf.signals.noise( ... rir.n_samples, rms=rir.time.max()*10**(-40/20), ... sampling_rate=rir.sampling_rate) >>> rir = rir + awgn >>> edc = ra.energy_decay_curve_chu(rir) ... >>> pf.plot.time(rir/np.abs(rir.time).max(), dB=True, label='RIR') >>> ax = pf.plot.time( ... edc/edc.time[..., 0], dB=True, log_prefix=10, label='EDC') >>> ax.set_ylim(-65, 5) >>> ax.legend() """ energy_data = dsp.preprocess_rir( data, is_energy=is_energy, shift=time_shift, channel_independent=channel_independent) subtracted = subtract_noise_from_squared_rir( energy_data, noise_level=noise_level) edc = schroeder_integration(subtracted, is_energy=True) if normalize: # Normalize the EDC... if not channel_independent: # ...by the first element of each channel. edc.time = (edc.time.T / edc.time[..., 0]).T else: # ...by the maximum first element of all channels. max_start_value = np.amax(edc.time[..., 0]) edc.time /= max_start_value mask = edc.time <= 2*np.finfo(float).eps if np.any(mask): first_zero = np.nanargmax(mask, axis=-1) for ch in np.ndindex(edc.cshape): edc.time[ch, first_zero[ch]:] = np.nan if threshold is not None: psnr = dsp.peak_signal_to_noise_ratio( data, noise_level, is_energy=is_energy) trunc_levels = 10*np.log10((psnr)) - threshold edc = truncate_energy_decay_curve(edc, trunc_levels) if plot: plt.figure(figsize=(15, 3)) pf.plot.use('light') plt.subplot(131) pf.plot.time(energy_data, dB=True, log_prefix=10) plt.ylabel('Squared IR in dB') plt.subplot(132) pf.plot.time(subtracted, dB=True, log_prefix=10) plt.ylabel('Noise subtracted RIR in dB') plt.subplot(133) pf.plot.time(edc, dB=True, log_prefix=10) plt.ylabel('EDC in dB') return edc
[docs] def energy_decay_curve_chu_lundeby( data, freq='broadband', noise_level='auto', is_energy=False, time_shift=True, channel_independent=False, normalize=True, plot=False): """ This function combines Chu's and Lundeby's methods: The estimated noise level is subtracted before backward integration, the impulse response is truncated at the intersection time, and the correction for the truncation is applied [4]_, [5]_, [6]_ Parameters ---------- data : pyfar.Signal The room impulse response. freq: integer OR string The frequency band. If set to 'broadband', the time window of the Lundeby-algorithm will not be set in dependence of frequency. noise_level: ndarray, double OR string If not specified, the noise level is calculated based on the last 10 percent of the RIR. Otherwise specify manually for each channel as array. is_energy: boolean Defines, if the data is already squared. time_shift : boolean Defines, if the silence at beginning of the RIR should be removed. channel_independent : boolean Defines, if the time shift and normalizsation is done channel-independently or not. normalize : boolean Defines, if the energy decay curve should be normalized in the end or not. plot: Boolean Specifies, whether the results should be visualized or not. Returns ------- pyfar.TimeData Returns the noise handeled edc. References ---------- .. [4] Lundeby, Virgran, Bietz and Vorlaender - Uncertainties of Measurements in Room Acoustics - ACUSTICA Vol. 81 (1995) .. [5] W. T. Chu. “Comparison of reverberation measurements using Schroeder’s impulse method and decay-curve averaging method”. In: Journal of the Acoustical Society of America 63.5 (1978), pp. 1444–1450. .. [6] M. Guski, “Influences of external error sources on measurements of room acoustic parameters,” 2015. Examples -------- Calculate and plot the EDC using a combination of Chu's and Lundeby's methods. .. plot:: >>> import numpy as np >>> import pyfar as pf >>> import pyrato as ra >>> from pyrato.analytic import rectangular_room_rigid_walls ... >>> L = np.array([8, 5, 3])/10 >>> source_pos = np.array([5, 3, 1.2])/10 >>> receiver_pos = np.array([1, 1, 1.2])/10 >>> rir, _ = rectangular_room_rigid_walls( ... L, source_pos, receiver_pos, ... reverberation_time=1, max_freq=1e3, n_samples=2**16, ... speed_of_sound=343.9) >>> rir = rir/rir.time.max() ... >>> awgn = pf.signals.noise( ... rir.n_samples, rms=rir.time.max()*10**(-50/20), ... sampling_rate=rir.sampling_rate) >>> rir = rir + awgn >>> edc = ra.energy_decay_curve_chu_lundeby(rir) ... >>> ax = pf.plot.time(rir, dB=True, label='RIR') >>> pf.plot.time(edc, dB=True, log_prefix=10, label='EDC') >>> ax.set_ylim(-65, 5) >>> ax.legend() """ energy_data = dsp.preprocess_rir( data, is_energy=is_energy, shift=time_shift, channel_independent=channel_independent) n_samples = energy_data.n_samples subtraction = subtract_noise_from_squared_rir( energy_data, noise_level=noise_level) intersection_time, late_reverberation_time, noise_level = \ intersection_time_lundeby( energy_data, freq=freq, initial_noise_power=noise_level, is_energy=True, time_shift=False, channel_independent=False, plot=False) time_vector = data.times energy_decay_curve = np.zeros([*data.cshape, n_samples]) for ch in np.ndindex(data.cshape): intersection_time_idx = np.argmin(np.abs( time_vector - intersection_time[ch])) if type(noise_level) is str and noise_level == 'auto': p_square_at_intersection = dsp.estimate_noise_energy( energy_data.time[ch], is_energy=True) else: p_square_at_intersection = noise_level[ch] # calculate correction term according to DIN EN ISO 3382 correction = (p_square_at_intersection * late_reverberation_time[ch] * (1 / (6*np.log(10))) * data.sampling_rate) energy_decay_curve[ch, :intersection_time_idx] = \ _schroeder_integration( subtraction.time[ch, :intersection_time_idx], is_energy=True) energy_decay_curve[ch] += correction energy_decay_curve[ch, intersection_time_idx:] = np.nan if normalize: # Normalize the EDC... if not channel_independent: # ...by the first element of each channel. energy_decay_curve = (energy_decay_curve.T / energy_decay_curve[..., 0]).T else: # ...by the maximum first element of each channel. max_start_value = np.amax(energy_decay_curve[..., 0]) energy_decay_curve /= max_start_value edc = pf.TimeData( energy_decay_curve, data.times, comment=data.comment) if plot: ax = pf.plot.time(data, dB=True, label='RIR') pf.plot.time(edc, dB=True, log_prefix=10, label='EDC') ax.set_ylim(-65, 5) ax.legend() return edc
[docs] def intersection_time_lundeby( data, freq='broadband', initial_noise_power='auto', is_energy=False, time_shift=False, channel_independent=False, plot=False): """Calculate the intersection time between impulse response and noise. This function uses the algorithm after Lundeby et al. [#]_ to calculate the intersection time, lundeby reverberation time, and noise level estimation. Parameters ---------- data : pyfar.Signal The room impulse response freq: integer OR string The frequency band. If set to 'broadband', the time window of the Lundeby-algorithm will not be set in dependence of frequency. noise_level: ndarray, double OR string If not specified, the noise level is calculated based on the last 10 percent of the RIR. Otherwise specify manually for each channel as array. is_energy: boolean Defines, if the data is already squared. time_shift : boolean Defines, if the silence at beginning of the RIR should be removed. channel_independent : boolean Defines, if the time shift and normalizsation is done channel-independently or not. plot: Boolean Specifies, whether the results should be visualized or not. Returns ------- intersection_time: ndarray, float Returns the Lundeby intersection time for each channel. reverberation_time: ndarray, float Returns the Lundeby reverberation time for each channel. noise_level: ndarray, float Returns the noise level estimation for each channel. References ---------- .. [#] Lundeby, Virgran, Bietz and Vorlaender - Uncertainties of Measurements in Room Acoustics - ACUSTICA Vol. 81 (1995) Examples -------- Estimate the intersection time :math:`T_i` and plot the RIR and the estimated noise power. .. plot:: >>> import numpy as np >>> import pyfar as pf >>> import pyrato as ra >>> from pyrato.analytic import rectangular_room_rigid_walls ... >>> L = np.array([8, 5, 3])/10 >>> source_pos = np.array([5, 3, 1.2])/10 >>> receiver_pos = np.array([1, 1, 1.2])/10 >>> rir, _ = rectangular_room_rigid_walls( ... L, source_pos, receiver_pos, ... reverberation_time=1, max_freq=1e3, n_samples=2**16, ... speed_of_sound=343.9) >>> rir = rir/np.abs(rir.time).max() ... >>> awgn = pf.signals.noise( ... rir.n_samples, rms=rir.time.max()*10**(-40/20), ... sampling_rate=rir.sampling_rate) >>> rir = rir + awgn >>> inter_time, _, noise_power = ra.intersection_time_lundeby(rir) ... >>> ax = pf.plot.time(rir, dB=True, label='RIR') >>> ax.axvline(inter_time, c='k', linestyle='--', label='$T_i$') >>> ax.axhline( ... 10*np.log10(noise_power), c='k', linestyle=':', label='Noise') >>> ax.set_ylim(-65, 5) >>> ax.legend() """ # Define constants: # time intervals per 10 dB decay. Lundeby: 3...10 n_intervals_per_10dB = 5 # end of regression 5 ... 10 dB dB_above_noise = 10 # Dynamic range 10 ... 20 dB use_dyn_range_for_regression = 20 energy_data = dsp.preprocess_rir( data, is_energy=is_energy, shift=time_shift, channel_independent=channel_independent) if isinstance(data, pf.Signal): sampling_rate = data.sampling_rate elif isinstance(data, pf.TimeData) and not isinstance(data, pf.Signal): sampling_rate = np.round(1/np.diff(data.times).mean(), decimals=4) energy_data = energy_data.time if freq == "broadband": # broadband: use 30 ms windows sizes freq_dependent_window_time = 0.03 else: freq_dependent_window_time = (800/freq+10) / 1000 # (1) SMOOTH time_window_data, time_vector_window, time_vector = dsp._smooth_rir( energy_data, sampling_rate, freq_dependent_window_time) # (2) ESTIMATE NOISE if initial_noise_power == 'auto': noise_estimation = dsp._estimate_noise_energy(energy_data) else: noise_estimation = initial_noise_power.copy() # (3) REGRESSION reverberation_time = np.zeros(data.cshape, data.time.dtype) noise_level = np.zeros(data.cshape, data.time.dtype) intersection_time = np.zeros(data.cshape, data.time.dtype) noise_peak_level = np.zeros(data.cshape, data.time.dtype) for ch in np.ndindex(data.cshape): time_window_data_current_channel = time_window_data[ch] start_idx = np.nanargmax(time_window_data_current_channel, axis=-1) try: stop_idx = (np.argwhere(10*np.log10( time_window_data_current_channel[start_idx+1:-1]) > (10*np.log10(noise_estimation[ch]) + dB_above_noise))[-1, 0] + start_idx) except IndexError: raise Exception( 'Regression failed: Low SNR. Estimation terminated.') dyn_range = np.diff(10*np.log10(np.take( time_window_data_current_channel, [start_idx, stop_idx]))) if (stop_idx == start_idx) or dyn_range > -5: raise Exception( 'Regression failed: Low SNR. Estimation terminated.') # regression_matrix*slope = edc regression_matrix = np.vstack((np.ones( [stop_idx-start_idx]), time_vector_window[start_idx:stop_idx])) slope = np.linalg.lstsq( regression_matrix.T, 10*np.log10(time_window_data_current_channel[start_idx:stop_idx]), rcond=None)[0] if slope[1] == 0 or np.any(np.isnan(slope)): raise Exception( 'Regression failed: T would be Inf, setting to 0. \ Estimation terminated.') regression_time = np.array( [time_vector_window[start_idx], time_vector_window[stop_idx]]) regression_values = np.array( [10*np.log10(time_window_data[0, start_idx]), (10*np.log10(time_window_data[0, start_idx]) + slope[1]*time_vector_window[stop_idx])]) # (4) PRELIMINARY CROSSING POINT crossing_point = \ (10*np.log10(noise_estimation[ch]) - slope[0]) / slope[1] preliminary_crossing_point = crossing_point # (5) NEW LOCAL TIME INTERVAL LENGTH n_blocks_in_decay = (np.diff( 10*np.log10(np.take( time_window_data_current_channel, [start_idx, stop_idx]))) / -10 * n_intervals_per_10dB) n_samples_per_block = np.round(np.diff(np.take( time_vector_window, [start_idx, stop_idx])) / n_blocks_in_decay * sampling_rate) window_time = n_samples_per_block/sampling_rate # (6) AVERAGE time_window_data_current_channel, \ time_vector_window_current_channel, \ time_vector_current_channel = dsp._smooth_rir( energy_data[ch], sampling_rate, window_time) time_window_data_current_channel = np.squeeze( time_window_data_current_channel) idx_max = np.nanargmax(time_window_data_current_channel) # high start value to enter while-loop old_crossing_point = 11+crossing_point loop_counter = 0 while True: # (7) ESTIMATE BACKGROUND LEVEL corresponding_decay = 10 # 5...10 dB idx_last_10_percent = np.round( time_window_data_current_channel.shape[-1]*0.9) t_block = n_samples_per_block / sampling_rate rel_decay = corresponding_decay / slope[1] idx_10dB_below_crosspoint = np.nanmax( np.r_[1, np.round(((crossing_point - rel_decay) / t_block))]) noise_estimation_current_channel = np.nanmean( time_window_data_current_channel[int(np.nanmin( [idx_last_10_percent, idx_10dB_below_crosspoint])):]) # (8) ESTIMATE LATE DECAY SLOPE try: start_idx_loop = np.argwhere(10*np.log10( time_window_data_current_channel[idx_max:]) < ( 10*np.log10(noise_estimation_current_channel) + dB_above_noise + use_dyn_range_for_regression + idx_max))[0, 0] except TypeError: start_idx_loop = 0 try: stop_idx_loop = (np.argwhere(10*np.log10( time_window_data_current_channel[start_idx_loop+1:]) < ( 10*np.log10(noise_estimation_current_channel) + dB_above_noise))[0, 0] + start_idx_loop) except IndexError: raise Exception( 'Regression failed: Low SNR. Estimation terminated.') # regression_matrix*slope = edc regression_matrix = np.vstack((np.ones( [stop_idx_loop-start_idx_loop]), time_vector_window_current_channel[ start_idx_loop:stop_idx_loop])) slope = np.linalg.lstsq( regression_matrix.T, (10*np.log10(time_window_data_current_channel[ start_idx_loop:stop_idx_loop])), rcond=None)[0] if slope[1] >= 0: raise Exception( 'Regression did not work due, T would be Inf, \ setting to 0. Estimation was terminated.') # (9) FIND CROSSPOINT old_crossing_point = crossing_point crossing_point = ((10*np.log10(noise_estimation_current_channel) - slope[0]) / slope[1]) loop_counter = loop_counter + 1 if (np.abs(old_crossing_point-crossing_point) < 0.01): break if loop_counter > 30: # TO-DO: Paper says 5 iterations are sufficient in all cases! warnings.warn( "Lundeby algorithm was terminated after 30 iterations.") break reverberation_time[ch] = -60/slope[1] noise_level[ch] = noise_estimation_current_channel intersection_time[ch] = crossing_point noise_peak_level[ch] = 10 * np.log10( np.nanmax(time_window_data_current_channel[int(np.nanmin( [idx_last_10_percent, idx_10dB_below_crosspoint])):])) if plot: plt.figure(figsize=(15, 3)) plt.subplot(131) max_data_db = np.nanmax(10*np.log10(energy_data)) plt.plot(time_vector, 10*np.log10(energy_data.T)) plt.xlabel('Time [s]') plt.ylabel('Squared RIR [dB]') plt.ylim(bottom=max_data_db-80+5, top=max_data_db+5) plt.grid(True) plt.subplot(132) plt.plot(time_vector_window, 10*np.log10(time_window_data.T)) plt.xlabel('Time [s]') plt.ylabel('Smoothened RIR [dB]') plt.plot( time_vector_window[-int(np.round(time_window_data.shape[-1]/10))], 10*np.log10(noise_estimation[0]), marker='o', label='noise estimation', linestyle='--', color='C1') plt.axhline( 10*np.log10(noise_estimation[0]), color='C1', linestyle='--') plt.plot( regression_time, regression_values, marker='o', color='C2', label='regression') plt.plot( preliminary_crossing_point, 10*np.log10(noise_estimation[ch]), marker='o', color='C3', label='preliminary crosspoint') plt.legend() plt.grid(True) plt.subplot(133) # TO-DO: plot all channels? plt.plot( time_vector_window_current_channel, 10*np.log10(time_window_data_current_channel)) plt.xlabel('Time [s]') plt.ylabel('Final EDC Estimation [dB]') plt.plot( time_vector_window_current_channel[int(idx_10dB_below_crosspoint)], 10*np.log10(noise_estimation[0]), marker='o', color='C1', label='Noise') plt.axhline( 10*np.log10(noise_estimation[0]), color='C1', linestyle='--') plt.plot( crossing_point, 10*np.log10(noise_estimation[0]), marker='o', color='C3', label='Final crosspoint', linestyle='--') plt.axvline( reverberation_time[0], label=('$T_{60} = '+str(np.round(reverberation_time[0], 3)) + '$'), color='k') plt.axvline( intersection_time[0], label=('Lundeby intersection time'), color='C3', linestyle='--') plt.tight_layout() plt.legend() plt.grid(True) return intersection_time, reverberation_time, noise_level
def _truncate_energy_decay_curve(energy_decay_curve, threshold_level): edc = np.atleast_2d(energy_decay_curve) threshold_level = np.atleast_2d(threshold_level) e = edc.T[0] t = e / 10**(threshold_level/10) mask = edc.T < np.broadcast_to(t, edc.T.shape) edc[mask.T] = np.nan return edc def truncate_energy_decay_curve(energy_decay_curve, threshold): """Truncate an energy decay curve, discarding values below the threshold. Parameters ---------- energy_decay_curve : pyfar.TimeData The energy decay curve threshold : float The threshold level in dB. The data below the threshold level are set to numpy.nan values. """ return pf.TimeData( _truncate_energy_decay_curve(energy_decay_curve.time, threshold), energy_decay_curve.times, energy_decay_curve.comment)