#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Sub-module implementing different methods for the calculation of energy decay
curves (EDCs) from measured or simulated room impulse responses (RIRs).
"""
import numpy as np
from matplotlib import pyplot as plt
from pyrato import dsp
import warnings
import pyfar as pf
[docs]
def subtract_noise_from_squared_rir(data, noise_level='auto'):
"""Subtract the noise power from a squared room impulse response.
Parameters
----------
data : pyfar.TimeData
The squared room impulse response.
noise_level : str or numpy.ndarray, float, optional
The noise power for each channel. The default is 'auto', which will
try to estimate the noise power from the room impulse response.
Returns
-------
pyfar.TimeData
The squared room impulse response after noise power subtraction.
"""
subtracted = _subtract_noise_from_squared_rir(
data.time, noise_level=noise_level)
return pf.TimeData(subtracted, data.times, comment=data.comment)
def _subtract_noise_from_squared_rir(data, noise_level='auto'):
"""Subtract the noise power from a squared room impulse response.
Parameters
----------
data : ndarray, double
The squared room impulse response with dimension ``(..., n_samples)``
noise_level: ndarray, double OR string
If ``'auto'``, the noise level is calculated based on the last 10
percent of the RIR. Otherwise specify manually for each channel
as array.
Returns
-------
result : ndarray, double
The noise-subtracted RIR.
"""
if noise_level == "auto":
noise_level = dsp._estimate_noise_energy(
data,
interval=[0.9, 1.0])
return (data.T - noise_level).T
[docs]
def schroeder_integration(room_impulse_response, is_energy=False):
r"""Calculate the Schroeder integral of a room impulse response.
The result is the energy decay curve for the given room impulse response
after [#]_.
.. math::
\langle e^2(t) \rangle = \int_{t}^{\infty} h^2(\tau)
\mathrm{d} \tau
Parameters
----------
room_impulse_response : pyfar.Signal, pyfar.TimeData
The room impulse response.
is_energy : boolean, optional
Whether the input represents energy data or sound pressure values.
By default, this is set to ``False``. If set to ``True``,
the variable :math:`h(\tau)` is assumed to represent energy and
the power two is omitted from the given equation.
Returns
-------
energy_decay_curve : pyfar.TimeData
The energy decay curve
References
----------
.. [#] M. R. Schroeder, "New Method of Measuring Reverberation Time,"
The Journal of the Acoustical Society of America, vol. 37, no. 6,
pp. 1187-1187, 1965.
.. [#] ISO 3382, Acoustics — Measurement of the reverberation
time of rooms with reference to other acoustical parameters.
Notes
-----
No preprocessing of the impulse response is performed. In particular,
this function does not:
- detect or align the direct sound (impulse response start),
- compensate for measurement noise,
- truncate the impulse response at the noise floor.
The full length of the input signal is squared and integrated.
For measurements in accordance with ISO 3382 [#]_, appropriate
preprocessing (:func:`pyrato.edc.energy_decay_curve_truncation`) of the
impulse response
(time alignment, band-pass filtering, and noise handling) must be
performed before calling this
function.
Example
-------
Calculate the Schroeder integral of a simulated RIR and plot.
.. plot::
>>> import numpy as np
>>> import pyfar as pf
>>> import pyrato as ra
>>> from pyrato.analytic import rectangular_room_rigid_walls
...
>>> L = np.array([8, 5, 3])/10
>>> source_pos = np.array([5, 3, 1.2])/10
>>> receiver_pos = np.array([1, 1, 1.2])/10
>>> rir, _ = rectangular_room_rigid_walls(
... L, source_pos, receiver_pos,
... reverberation_time=1, max_freq=1e3, n_samples=2**16,
... speed_of_sound=343.9)
>>> edc = ra.edc.schroeder_integration(rir)
>>> pf.plot.time(rir/np.abs(rir.time).max(), dB=True, label='RIR')
>>> ax = pf.plot.time(
... edc/edc.time[..., 0], dB=True, log_prefix=10, label='EDC')
>>> ax.set_ylim(-65, 5)
>>> ax.legend()
"""
edc = _schroeder_integration(
room_impulse_response.time, is_energy=is_energy)
return pf.TimeData(edc, room_impulse_response.times)
def _schroeder_integration(impulse_response, is_energy=False):
r"""Calculate the Schroeder integral of a room impulse response.
The result is the energy decay curve for the given room impulse
response [#]_.
.. math::
\langle e^2(t) \rangle = \int_{t}^{\infty} h^2(\tau)
\mathrm{d} \tau
This is a private function for use with numpy arrays.
Parameters
----------
impulse_response : ndarray, double
Room impulse response as array
is_energy: boolean
If `True` the input data is proportional to energy density; if `False`
the input data is proportional to sound pressure.
Returns
-------
energy_decay_curve : ndarray, double
The energy decay curve
"""
if not is_energy:
data = np.abs(impulse_response)**2
else:
data = impulse_response.copy()
ndim = data.ndim
data = np.atleast_2d(data)
energy_decay_curve = np.flip(
np.nancumsum(np.flip(data, axis=-1), axis=-1), axis=-1)
if ndim < energy_decay_curve.ndim:
energy_decay_curve = np.squeeze(energy_decay_curve)
return energy_decay_curve
[docs]
def energy_decay_curve_truncation(
data,
freq='broadband',
noise_level='auto',
is_energy=False,
time_shift=True,
channel_independent=False,
normalize=True,
threshold=15,
plot=False):
r"""Truncated Schroeder integration.
The integration is truncated at the intersection with measurement noise,
which is estimated from the room impulse response using Lundeby's
algorithm. An initial estimation of the noise power is performed on the
last 10 percent of the impulse response, if no value is passed. Using the
default parameters, the function is compliant with ISO 3382-1:2009 [#]_.
.. math::
E_(t) = \int_t^{t_{i}} h^2(\tau) d\tau
.. note::
The EDC contains ``np.nan`` values if the intersection time could not
be computed due to a low signal-to-noise ration. In this case a warning
is raised as well.
Parameters
----------
data : pyfar.Signal
The room impulse response.
freq: integer OR string
The frequency band. If set to 'broadband',
the time window of the Lundeby-algorithm will not be set in dependence
of frequency.
noise_level: ndarray, double OR string
If not specified, the noise level is calculated based on the last 10
percent of the RIR. Otherwise specify manually for each channel
as array.
is_energy: boolean
If `True` the input data is proportional to energy density; if `False`
the input data is proportional to sound pressure.
time_shift : boolean
Defines, if the silence at beginning of the RIR should be removed.
channel_independent : boolean
Defines, if the time shift and normalization is done
channel-independently or not.
normalize : boolean
Defines, if the energy decay curve should be normalized in the end
or not.
threshold : float
Defines a peak-signal-to-noise ratio based threshold in dB for final
truncation of the EDC. Values below the sum of the threshold level and
the peak-signal-to-noise ratio in dB are discarded. The default is
15 dB, which is in correspondence with ISO 3382-1:2009.
plot: Boolean
Specifies, whether the results should be visualized or not.
Returns
-------
pyfar.TimeData
Returns the noise compensated EDC.
References
----------
.. [#] International Organization for Standardization, “EN ISO 3382-1:2009
Acoustics - Measurement of room acoustic parameters,” 2009.
Examples
--------
Plot the RIR and the EDC calculated truncating the integration at the
intersection time.
.. plot::
>>> import numpy as np
>>> import pyfar as pf
>>> import pyrato as ra
>>> from pyrato.analytic import rectangular_room_rigid_walls
...
>>> L = np.array([8, 5, 3])/10
>>> source_pos = np.array([5, 3, 1.2])/10
>>> receiver_pos = np.array([1, 1, 1.2])/10
>>> rir, _ = rectangular_room_rigid_walls(
... L, source_pos, receiver_pos,
... reverberation_time=1, max_freq=1e3, n_samples=2**16,
... speed_of_sound=343.9)
>>> rir = rir/rir.time.max()
...
>>> awgn = pf.signals.noise(
... rir.n_samples, rms=rir.time.max()*10**(-50/20),
... sampling_rate=rir.sampling_rate)
>>> rir = rir + awgn
>>> edc = ra.edc.energy_decay_curve_truncation(rir)
...
>>> ax = pf.plot.time(rir, dB=True, label='RIR')
>>> pf.plot.time(edc, dB=True, log_prefix=10, label='EDC')
>>> ax.set_ylim(-65, 5)
>>> ax.legend()
"""
# flatten to allow signals with cdim > 1
shape = data.time.shape
data = data.flatten()
energy_data = dsp._preprocess_rir(
data,
is_energy=is_energy,
shift=time_shift,
channel_independent=channel_independent)
n_samples = data.n_samples
intersection_time = intersection_time_lundeby(
energy_data,
freq=freq,
initial_noise_power=noise_level,
is_energy=True,
time_shift=False,
channel_independent=False,
plot=False,
failure_policy='warning')[0]
energy_decay_curve = np.zeros([*data.cshape, n_samples])
for ch in np.ndindex(data.cshape):
if np.isnan(intersection_time[ch]):
energy_decay_curve[ch] = np.nan
continue
intersection_time_idx = np.rint(
intersection_time[ch] * data.sampling_rate)
psnr = dsp.peak_signal_to_noise_ratio(
data[ch], noise_level, is_energy=is_energy)
trunc_level = 10*np.log10((psnr)) - threshold
energy_decay_curve[
ch, :int(intersection_time_idx)] = \
_schroeder_integration(
energy_data.time[
ch, :int(intersection_time_idx)],
is_energy=True)
energy_decay_curve[ch] = _threshold_energy_decay_curve(
energy_decay_curve[ch], trunc_level)
if normalize:
energy_decay_curve = _normalize_edc(
energy_decay_curve, channel_independent)
edc = pf.TimeData(
energy_decay_curve.reshape(shape), data.times, comment=data.comment)
if plot:
ax = pf.plot.time(data, dB=True, label='RIR')
pf.plot.time(edc, dB=True, log_prefix=10, label='EDC')
ax.set_ylim(-65, 5)
ax.legend()
return edc
[docs]
def energy_decay_curve_lundeby(
data,
freq='broadband',
noise_level='auto',
is_energy=False,
time_shift=True,
channel_independent=False,
normalize=True,
plot=False):
r"""Truncated Schroeder integration with additive truncation compensation.
Lundeby et al. [#]_ proposed a correction term to for the truncation
error. The missing signal energy, caused by limiting the integration time,
is estimated under the assumption of a single exponential late decay
process.
.. math::
E(t) = \int_t^{t_i} h^2(\tau) d\tau + C
C = p_{i}^2 \cdot T_\mathrm{late} \cdot \frac{1}{6 \cdot ln(10)}
.. note::
The EDC contains ``np.nan`` values if the intersection time could not
be computed due to a low signal-to-noise ration. In this case a warning
is raised as well.
Parameters
----------
data : pyfar.Signal
The room impulse response.
freq: integer OR string
The frequency band. If set to 'broadband',
the time window of the Lundeby-algorithm will not be set in dependence
of frequency.
noise_level: ndarray, double OR string
If not specified, the noise level is calculated based on the last 10
percent of the RIR. Otherwise specify manually for each channel
as array.
is_energy: boolean
If `True` the input data is proportional to energy density; if `False`
the input data is proportional to sound pressure.
time_shift : bool
If `True`, the delay of the RIR is removed.
channel_independent : boolean
Defines, if the time shift and normalizsation is done
channel-independently or not.
normalize : boolean
Defines, if the energy decay curve should be normalized in the end
or not.
plot: Boolean
Specifies, whether the results should be visualized or not.
Returns
-------
pyfar.TimeData
Returns the noise compensated energy decay curve.
References
----------
.. [#] A. Lundeby, T. E. Vigran, H. Bietz, and M. Vorländer,
“Uncertainties of Measurements in Room Acoustics,”
Acta Acust. United Ac., vol. 81, no. 4, pp. 344-355, Jul. 1995.
Examples
--------
Plot the room impulse response and the energy decay curve calculated
after Lundeby.
.. plot::
>>> import numpy as np
>>> import pyfar as pf
>>> import pyrato as ra
>>> from pyrato.analytic import rectangular_room_rigid_walls
...
>>> L = np.array([8, 5, 3])/10
>>> source_pos = np.array([5, 3, 1.2])/10
>>> receiver_pos = np.array([1, 1, 1.2])/10
>>> rir, _ = rectangular_room_rigid_walls(
... L, source_pos, receiver_pos,
... reverberation_time=1, max_freq=1e3, n_samples=2**16,
... speed_of_sound=343.9)
>>> rir = rir/rir.time.max()
...
>>> awgn = pf.signals.noise(
... rir.n_samples, rms=rir.time.max()*10**(-50/20),
... sampling_rate=rir.sampling_rate)
>>> rir = rir + awgn
>>> edc = ra.edc.energy_decay_curve_lundeby(rir)
...
>>> ax = pf.plot.time(rir, dB=True, label='RIR')
>>> pf.plot.time(edc, dB=True, log_prefix=10, label='EDC')
>>> ax.set_ylim(-65, 5)
>>> ax.legend()
"""
# flatten to allow signals with cdim > 1
shape = data.time.shape
data = data.flatten()
energy_data = dsp._preprocess_rir(
data,
is_energy=is_energy,
shift=time_shift,
channel_independent=channel_independent)
n_samples = energy_data.n_samples
sampling_rate = data.sampling_rate
intersection_time, late_reverberation_time, noise_estimation = \
intersection_time_lundeby(
energy_data,
freq=freq,
initial_noise_power=noise_level,
is_energy=True,
time_shift=False,
channel_independent=False,
plot=False,
failure_policy='warning')
time_vector = data.times
energy_decay_curve = np.zeros([*data.cshape, n_samples])
for ch in np.ndindex(data.cshape):
if np.isnan(intersection_time[ch]):
energy_decay_curve[ch] = np.nan
continue
intersection_time_idx = np.argmin(
np.abs(time_vector - intersection_time[ch]))
p_square_at_intersection = noise_estimation[ch]
# Calculate correction term according to DIN EN ISO 3382
# TO-DO: check reference!
correction = (p_square_at_intersection
* late_reverberation_time[ch]
* (1 / (6*np.log(10)))
* sampling_rate)
energy_decay_curve[ch, :intersection_time_idx] = \
_schroeder_integration(
energy_data.time[ch, :intersection_time_idx],
is_energy=True)
energy_decay_curve[ch] += correction
energy_decay_curve[ch, intersection_time_idx:] = np.nan
if normalize:
energy_decay_curve = _normalize_edc(
energy_decay_curve, channel_independent)
edc = pf.TimeData(
energy_decay_curve.reshape(shape), data.times, comment=data.comment)
if plot:
ax = pf.plot.time(data, dB=True, label='RIR')
pf.plot.time(edc, dB=True, log_prefix=10, label='EDC')
ax.set_ylim(-65, 5)
ax.legend()
return edc
[docs]
def energy_decay_curve_chu(
data,
noise_level='auto',
is_energy=False,
time_shift=True,
channel_independent=False,
normalize=True,
threshold=10,
plot=False):
r"""Schroeder integration with prior subtraction of average noise power.
Chu [#]_ proposed to subtract the average power of the additive measurement
noise from the energy impulse response prior to applying the Schroeder
integral. The noise power $N_{est}$ is estimated from the last 10 percent
of the room impulse response if not given by the user.
.. math::
E(t) = \int_t^{t_{IR}} (h^2(\tau) - N_{est}) d\tau
Parameters
----------
data : pyfar.Signal
The room impulse response
noise_level: ndarray, float or string
If set to 'auto', the noise power is calculated based on the last 10
percent of the RIR. Otherwise it needs to be given for each individual
channel of the input. The default is 'auto'.
is_energy: boolean
If `True` the input data is proportional to energy density; if `False`
the input data is proportional to sound pressure.
time_shift : bool
If `True`, the delay of the RIR is removed.
channel_independent : boolean
Defines, if the time shift and normalization is done
channel-independently or not.
normalize : bool
If `True`, the energy decay curve is normalized.
threshold : float, None
Defines a peak-signal-to-noise ratio based threshold in dB for final
truncation of the EDC. Values below the sum of the threshold level and
the peak-signal-to-noise ratio in dB are discarded. The default is
10 dB. If `None`, the decay curve will not be truncated further.
plot: Boolean
Specifies, whether the results should be visualized or not.
Returns
-------
pyfar.TimeData
Returns the noise compensated edc.
References
----------
.. [#] W. T. Chu. “Comparison of reverberation measurements using
Schroeder's impulse method and decay-curve averaging method”.
In: Journal of the Acoustical Society of America 63.5 (1978),
pp. 1444-1450.
Examples
--------
.. plot::
>>> import numpy as np
>>> import pyfar as pf
>>> import pyrato as ra
>>> from pyrato.analytic import rectangular_room_rigid_walls
...
>>> L = np.array([8, 5, 3])/10
>>> source_pos = np.array([5, 3, 1.2])/10
>>> receiver_pos = np.array([1, 1, 1.2])/10
>>> rir, _ = rectangular_room_rigid_walls(
... L, source_pos, receiver_pos,
... reverberation_time=1, max_freq=1e3, n_samples=2**16,
... speed_of_sound=343.9)
...
>>> awgn = pf.signals.noise(
... rir.n_samples, rms=rir.time.max()*10**(-40/20),
... sampling_rate=rir.sampling_rate)
>>> rir = rir + awgn
>>> edc = ra.edc.energy_decay_curve_chu(rir)
...
>>> pf.plot.time(rir/np.abs(rir.time).max(), dB=True, label='RIR')
>>> ax = pf.plot.time(
... edc/edc.time[..., 0], dB=True, log_prefix=10, label='EDC')
>>> ax.set_ylim(-65, 5)
>>> ax.legend()
"""
# flatten to allow signals with cdim > 1
shape = data.cshape
data = data.flatten()
energy_data = dsp._preprocess_rir(
data,
is_energy=is_energy,
shift=time_shift,
channel_independent=channel_independent)
subtracted = subtract_noise_from_squared_rir(
energy_data,
noise_level=noise_level)
edc = schroeder_integration(subtracted, is_energy=True)
if normalize:
edc.time = _normalize_edc(
edc.time, channel_independent)
mask = edc.time <= 2*np.finfo(float).eps
if np.any(mask):
first_zero = np.nanargmax(mask, axis=-1)
for ch in np.ndindex(edc.cshape):
edc.time[ch, first_zero[ch]:] = np.nan
if threshold is not None:
psnr = dsp.peak_signal_to_noise_ratio(
data, noise_level, is_energy=is_energy)
trunc_levels = 10*np.log10((psnr)) - threshold
edc = threshold_energy_decay_curve(edc, trunc_levels)
edc = edc.reshape(shape)
if plot:
plt.figure(figsize=(15, 3))
pf.plot.use('light')
plt.subplot(131)
pf.plot.time(energy_data, dB=True, log_prefix=10)
plt.ylabel('Squared IR in dB')
plt.subplot(132)
pf.plot.time(subtracted, dB=True, log_prefix=10)
plt.ylabel('Noise subtracted RIR in dB')
plt.subplot(133)
pf.plot.time(edc, dB=True, log_prefix=10)
plt.ylabel('EDC in dB')
return edc
[docs]
def energy_decay_curve_chu_lundeby(
data,
freq='broadband',
noise_level='auto',
is_energy=False,
time_shift=True,
channel_independent=False,
normalize=True,
plot=False):
r"""Combination of Chu's and Lundeby's methods.
The average power $N_{set}$ of the additive measurement noise is subtracted
from the energy impulse response prior to applying the Schroeder integral.
The Schroeder integration is truncated at the intersection with the noise
floor and the truncation error compensated based on the assumption of a
single exponential late decay process.
For a more detailed description of the method, see [#]_.
.. math::
E_(t) = \int_t^{t_i} (h^2(\tau) - N_{est}) d\tau + C
C = p_{i}^2 \cdot T_\mathrm{late} \cdot \frac{1}{6 \cdot ln(10)}
.. note::
The EDC contains ``np.nan`` values if the intersection time could not
be computed due to a low signal-to-noise ration. In this case a warning
is raised as well.
Parameters
----------
data : pyfar.Signal
The room impulse response.
freq: integer OR string
The frequency band. If set to 'broadband',
the time window of the Lundeby-algorithm will not be set in dependence
of frequency.
noise_level: ndarray, double OR string
If not specified, the noise level is calculated based on the last 10
percent of the RIR. Otherwise specify manually for each channel
as array.
is_energy: boolean
If `True` the input data is proportional to energy density; if `False`
the input data is proportional to sound pressure.
time_shift : boolean
Defines, if the silence at beginning of the RIR should be removed.
channel_independent : boolean
Defines, if the time shift and normalization is done
channel-independently or not.
normalize : boolean
Defines, if the energy decay curve should be normalized in the end
or not.
plot: Boolean
Specifies, whether the results should be visualized or not.
Returns
-------
pyfar.TimeData
Returns the noise compensated edc.
References
----------
.. [#] M. Guski and M. Vorländer, “Comparison of Noise Compensation
Methods for Room Acoustic Impulse Response Evaluations,” Acta
Acust. United Ac., vol. 100, pp. 320-327, 2014, doi: 10/f5vxx2.
Examples
--------
Calculate and plot the EDC using a combination of Chu's and Lundeby's
methods.
.. plot::
>>> import numpy as np
>>> import pyfar as pf
>>> import pyrato as ra
>>> from pyrato.analytic import rectangular_room_rigid_walls
...
>>> L = np.array([8, 5, 3])/10
>>> source_pos = np.array([5, 3, 1.2])/10
>>> receiver_pos = np.array([1, 1, 1.2])/10
>>> rir, _ = rectangular_room_rigid_walls(
... L, source_pos, receiver_pos,
... reverberation_time=1, max_freq=1e3, n_samples=2**16,
... speed_of_sound=343.9)
>>> rir = rir/rir.time.max()
...
>>> awgn = pf.signals.noise(
... rir.n_samples, rms=rir.time.max()*10**(-50/20),
... sampling_rate=rir.sampling_rate)
>>> rir = rir + awgn
>>> edc = ra.edc.energy_decay_curve_chu_lundeby(rir)
...
>>> ax = pf.plot.time(rir, dB=True, label='RIR')
>>> pf.plot.time(edc, dB=True, log_prefix=10, label='EDC')
>>> ax.set_ylim(-65, 5)
>>> ax.legend()
"""
# flatten to allow signals with cdim > 1
shape = data.time.shape
data = data.flatten()
energy_data = dsp._preprocess_rir(
data,
is_energy=is_energy,
shift=time_shift,
channel_independent=channel_independent)
n_samples = energy_data.n_samples
subtraction = subtract_noise_from_squared_rir(
energy_data,
noise_level=noise_level)
intersection_time, late_reverberation_time, noise_level = \
intersection_time_lundeby(
energy_data,
freq=freq,
initial_noise_power=noise_level,
is_energy=True,
time_shift=False,
channel_independent=False,
plot=False,
failure_policy='warning')
time_vector = data.times
energy_decay_curve = np.zeros([*data.cshape, n_samples])
for ch in np.ndindex(data.cshape):
if np.isnan(intersection_time[ch]):
energy_decay_curve[ch] = np.nan
continue
intersection_time_idx = np.argmin(np.abs(
time_vector - intersection_time[ch]))
if isinstance(noise_level, str) and noise_level == 'auto':
p_square_at_intersection = dsp.estimate_noise_energy(
energy_data.time[ch], is_energy=True)
else:
p_square_at_intersection = noise_level[ch]
# calculate correction term according to DIN EN ISO 3382
correction = (p_square_at_intersection
* late_reverberation_time[ch]
* (1 / (6*np.log(10)))
* data.sampling_rate)
energy_decay_curve[ch, :intersection_time_idx] = \
_schroeder_integration(
subtraction.time[ch, :intersection_time_idx],
is_energy=True)
energy_decay_curve[ch] += correction
energy_decay_curve[ch, intersection_time_idx:] = np.nan
if normalize:
energy_decay_curve = _normalize_edc(
energy_decay_curve, channel_independent)
edc = pf.TimeData(
energy_decay_curve.reshape(shape), data.times, comment=data.comment)
if plot:
ax = pf.plot.time(data, dB=True, label='RIR')
pf.plot.time(edc, dB=True, log_prefix=10, label='EDC')
ax.set_ylim(-65, 5)
ax.legend()
return edc
[docs]
def intersection_time_lundeby(
data,
freq='broadband',
initial_noise_power='auto',
is_energy=False,
time_shift=False,
channel_independent=False,
plot=False,
failure_policy='error'):
"""Calculate the intersection time between impulse response and noise.
This function uses the algorithm after Lundeby et al. [#]_ to calculate
the intersection time, lundeby reverberation time, and noise level
estimation.
Parameters
----------
data : pyfar.Signal
The room impulse response
freq: integer OR string
The frequency band. If set to 'broadband',
the time window of the Lundeby-algorithm will not be set in dependence
of frequency.
initial_noise_power: ndarray, double OR string
If ``'auto'``, the noise level is calculated based on the last 10
percent of the RIR. Otherwise specify manually for each channel
as array.
is_energy: boolean
If `True` the input data is proportional to energy density; if `False`
the input data is proportional to sound pressure.
time_shift : boolean
Defines, if the silence at beginning of the RIR should be removed.
channel_independent : boolean
Defines, if the time shift and normalization is done
channel-independently or not.
plot: Boolean
Specifies, whether the results should be visualized or not.
failure_policy : string, optional
Specifies how failures in detecting the Lundby parameters (see return
values below) are handled.
- ``'error'``: raises an error and the computation is terminated.
- ``'warning'``: raises a warning and returns NaN values. This is
useful when computing parameters for multi-channel input, e.g., in
octave bands.
The default is ``'error'``.
Returns
-------
intersection_time: ndarray, float
Returns the Lundeby intersection time for each channel.
reverberation_time: ndarray, float
Returns the Lundeby reverberation time for each channel.
noise_level: ndarray, float
Returns the noise level estimation for each channel.
References
----------
.. [#] Lundeby, Virgran, Bietz and Vorlaender - Uncertainties of
Measurements in Room Acoustics - ACUSTICA Vol. 81 (1995)
Examples
--------
Estimate the intersection time :math:`T_i` and plot the RIR and the
estimated noise power.
.. plot::
>>> import numpy as np
>>> import pyfar as pf
>>> import pyrato as ra
>>> from pyrato.analytic import rectangular_room_rigid_walls
...
>>> L = np.array([8, 5, 3])/10
>>> source_pos = np.array([5, 3, 1.2])/10
>>> receiver_pos = np.array([1, 1, 1.2])/10
>>> rir, _ = rectangular_room_rigid_walls(
... L, source_pos, receiver_pos,
... reverberation_time=1, max_freq=1e3, n_samples=2**16,
... speed_of_sound=343.9)
>>> rir = rir/np.abs(rir.time).max()
...
>>> awgn = pf.signals.noise(
... rir.n_samples, rms=rir.time.max()*10**(-40/20),
... sampling_rate=rir.sampling_rate)
>>> rir = rir + awgn
>>> inter_time, _, noise_power = ra.edc.intersection_time_lundeby(rir)
...
>>> ax = pf.plot.time(rir, dB=True, label='RIR')
>>> ax.axvline(inter_time, c='k', linestyle='--', label='$T_i$')
>>> ax.axhline(
... 10*np.log10(noise_power), c='k', linestyle=':', label='Noise')
>>> ax.set_ylim(-65, 5)
>>> ax.legend()
"""
# Define constants:
# time intervals per 10 dB decay. Lundeby: 3...10
n_intervals_per_10dB = 5
# end of regression 5 ... 10 dB
dB_above_noise = 10
# Dynamic range 10 ... 20 dB
use_dyn_range_for_regression = 20
energy_data = dsp._preprocess_rir(
data,
is_energy=is_energy,
shift=time_shift,
channel_independent=channel_independent)
if isinstance(data, pf.Signal):
sampling_rate = data.sampling_rate
elif isinstance(data, pf.TimeData):
sampling_rate = np.round(1/np.diff(data.times).mean(), decimals=4)
energy_data = energy_data.time
if freq == "broadband":
# broadband: use 30 ms windows sizes
freq_dependent_window_time = 0.03
else:
freq_dependent_window_time = (800/freq+10) / 1000
# (1) SMOOTH
time_window_data, time_vector_window, time_vector = dsp._smooth_rir(
energy_data, sampling_rate, freq_dependent_window_time)
# (2) ESTIMATE NOISE
if initial_noise_power == 'auto':
noise_estimation = dsp._estimate_noise_energy(energy_data)
else:
noise_estimation = initial_noise_power.copy()
# (3) REGRESSION
reverberation_time = np.zeros(data.cshape, data.time.dtype)
noise_level = np.zeros(data.cshape, data.time.dtype)
intersection_time = np.zeros(data.cshape, data.time.dtype)
noise_peak_level = np.zeros(data.cshape, data.time.dtype)
for ch in np.ndindex(data.cshape):
output = _intersection_time_lundby(
time_window_data[ch], noise_estimation[ch], energy_data[ch],
time_vector_window, dB_above_noise, n_intervals_per_10dB,
use_dyn_range_for_regression, sampling_rate, ch, failure_policy)
if output is None:
reverberation_time[ch] = np.nan
noise_level[ch] = np.nan
intersection_time[ch] = np.nan
else:
slope, noise_estimation_current_channel, crossing_point, \
time_window_data_current_channel, idx_last_10_percent, \
idx_10dB_below_crosspoint, regression_time, regression_values, \
preliminary_crossing_point, time_vector_window_current_channel \
= output
reverberation_time[ch] = -60/slope[1]
noise_level[ch] = noise_estimation_current_channel
intersection_time[ch] = crossing_point
noise_peak_level[ch] = 10 * np.log10(
np.nanmax(time_window_data_current_channel[int(np.nanmin(
[idx_last_10_percent, idx_10dB_below_crosspoint])):]))
if plot:
if np.prod(data.cshape) > 1:
raise ValueError(
'The plot can only be done for single channel input')
if output is None:
raise ValueError('The plot can not be done because the estimation '
'failed (see warnings for more information)')
plt.figure(figsize=(15, 3))
plt.subplot(131)
max_data_db = np.nanmax(10*np.log10(energy_data))
plt.plot(time_vector, 10*np.log10(energy_data.T))
plt.xlabel('Time [s]')
plt.ylabel('Squared RIR [dB]')
plt.ylim(bottom=max_data_db-80+5, top=max_data_db+5)
plt.grid(True)
plt.subplot(132)
plt.plot(time_vector_window, 10*np.log10(time_window_data.T))
plt.xlabel('Time [s]')
plt.ylabel('Smoothened RIR [dB]')
plt.plot(
time_vector_window[-int(np.round(time_window_data.shape[-1]/10))],
10*np.log10(noise_estimation[0]),
marker='o',
label='noise estimation',
linestyle='--',
color='C1')
plt.axhline(
10*np.log10(noise_estimation[0]),
color='C1',
linestyle='--')
plt.plot(
regression_time,
regression_values,
marker='o',
color='C2',
label='regression')
plt.plot(
preliminary_crossing_point,
10*np.log10(noise_estimation[ch]),
marker='o',
color='C3',
label='preliminary crosspoint')
plt.legend()
plt.grid(True)
plt.subplot(133)
# TO-DO: plot all channels?
plt.plot(
time_vector_window_current_channel,
10*np.log10(time_window_data_current_channel))
plt.xlabel('Time [s]')
plt.ylabel('Final EDC Estimation [dB]')
plt.plot(
time_vector_window_current_channel[int(idx_10dB_below_crosspoint)],
10*np.log10(noise_estimation[0]),
marker='o',
color='C1',
label='Noise')
plt.axhline(
10*np.log10(noise_estimation[0]),
color='C1',
linestyle='--')
plt.plot(
crossing_point,
10*np.log10(noise_estimation[0]),
marker='o',
color='C3',
label='Final crosspoint',
linestyle='--')
plt.axvline(
reverberation_time[0],
label=('$T_{60} = '+str(np.round(reverberation_time[0], 3)) + '$'),
color='k')
plt.axvline(
intersection_time[0],
label=('Lundeby intersection time'),
color='C3',
linestyle='--')
plt.tight_layout()
plt.legend()
plt.grid(True)
return intersection_time, reverberation_time, noise_level
def _intersection_time_lundby(
time_window_data, noise_estimation, energy_data,
time_vector_window, dB_above_noise, n_intervals_per_10dB,
use_dyn_range_for_regression, sampling_rate, ch, failure_policy):
"""
Private function to handle the channel-wise processing for
`intersection_time_lundby`.
Parameters
----------
time_window_data : np.ndarray
The smoothed RIR obtained from ``pyrato.dsp._smooth_rir``.
noise_estimation : float
The energy of the background noise.
energy_data : np.ndarray
Data returned by ``pyrato.dsp._preprocess_rir``.
time_vector_window : ndarray
The time vector of the smoothed data obtained from
``pyrato.dsp._smooth_rir``.
dB_above_noise : float
End-point of the regression in dB.
n_intervals_per_10dB : float
Time intervals per 10 dB decay.
use_dyn_range_for_regression : float
Dynamic range for regression in dB.
sampling_rate : float, int
Sampling rate in Hz.
ch : multidimensional index
Channel index used for verbose warning messages
failure_policy : str
Specifies how failures in detecting the Lundby parameters (see return
values below) are handled.
- ``'error'``: raises an error and the computation is terminated.
- ``'warning'``: raises a warning and returns NaN values. This is
useful when computing parameters for multi-channel input, e.g., in
octave bands.
Returns
-------
``None``if the parameter estimation failed, otherwise:
slope : np.ndarray
Coefficients [intercept, slope] of the regression line.
noise_estimation_current_channel : float
Refined background noise energy for the channel.
crossing_point : float
Intersection time between decay line and noise floor in seconds.
time_window_data_current_channel** : np.ndarray
Smoothed energy decay curve at final iteration.
idx_last_10_percent : float
Index at 90% of the time window length.
idx_10dB_below_crosspoint : float
Index 10 dB below the crossing point.
regression_time : np.ndarray
Start and stop times of the regression window in seconds.
regression_values : np.ndarray
Start and stop values of the regression line in dB.
preliminary_crossing_point : float
Initial crossing point estimate before iteration.
time_vector_window_current_channel : np.ndarray
Time vector corresponding to the smoothed data.
"""
time_window_data_current_channel = time_window_data
start_idx = np.nanargmax(time_window_data_current_channel, axis=-1)
try:
stop_idx = (np.argwhere(10*np.log10(
time_window_data_current_channel[start_idx+1:-1]) >
(10*np.log10(noise_estimation) +
dB_above_noise))[-1, 0] + start_idx)
except IndexError as e:
message = f'Regression failed for channel {ch} due to low SNR.'
if failure_policy == 'error':
raise ValueError(message) from e
else:
warnings.warn(message, stacklevel=2)
return None
dyn_range = np.diff(10*np.log10(np.take(
time_window_data_current_channel, [start_idx, stop_idx])))
if (stop_idx == start_idx) or dyn_range > -5:
message = f'Regression failed for channel {ch} due to low SNR.'
if failure_policy == 'error':
raise ValueError(message)
else:
warnings.warn(message, stacklevel=2)
return None
# regression_matrix*slope = edc
regression_matrix = np.vstack((np.ones(
[stop_idx-start_idx]), time_vector_window[start_idx:stop_idx]))
slope = np.linalg.lstsq(
regression_matrix.T,
10*np.log10(time_window_data_current_channel[start_idx:stop_idx]),
rcond=None)[0]
if slope[1] == 0 or np.any(np.isnan(slope)):
message = (
f'Regression failed for channel {ch}. '
'Remove preceeding delay or check the SNR')
if failure_policy == 'error':
raise ValueError(message)
else:
warnings.warn(message, stacklevel=2)
return None
regression_time = np.array(
[time_vector_window[start_idx], time_vector_window[stop_idx]])
regression_values = np.array(
[10*np.log10(time_window_data[start_idx]),
(10*np.log10(time_window_data[start_idx])
+ slope[1]*time_vector_window[stop_idx])])
# (4) PRELIMINARY CROSSING POINT
crossing_point = \
(10*np.log10(noise_estimation) - slope[0]) / slope[1]
preliminary_crossing_point = crossing_point
# (5) NEW LOCAL TIME INTERVAL LENGTH
n_blocks_in_decay = (np.diff(
10*np.log10(np.take(
time_window_data_current_channel, [start_idx, stop_idx])))[0]
/ -10 * n_intervals_per_10dB)
n_samples_per_block = np.round(np.diff(np.take(
time_vector_window,
[start_idx, stop_idx]))[0] / n_blocks_in_decay * sampling_rate)
window_time = n_samples_per_block/sampling_rate
# (6) AVERAGE
time_window_data_current_channel, \
time_vector_window_current_channel, \
time_vector_current_channel = dsp._smooth_rir(
energy_data, sampling_rate, window_time)
time_window_data_current_channel = np.squeeze(
time_window_data_current_channel)
idx_max = np.nanargmax(time_window_data_current_channel)
# high start value to enter while-loop
old_crossing_point = 11+crossing_point
loop_counter = 0
while True:
# (7) ESTIMATE BACKGROUND LEVEL
corresponding_decay = 10 # 5...10 dB
idx_last_10_percent = np.round(
time_window_data_current_channel.shape[-1]*0.9)
t_block = n_samples_per_block / sampling_rate
rel_decay = corresponding_decay / slope[1]
idx_10dB_below_crosspoint = np.nanmax(
np.r_[1, np.round(((crossing_point - rel_decay) / t_block))])
noise_estimation_current_channel = np.nanmean(
time_window_data_current_channel[int(np.nanmin(
[idx_last_10_percent, idx_10dB_below_crosspoint])):])
# (8) ESTIMATE LATE DECAY SLOPE
try:
start_idx_loop = np.argwhere(10*np.log10(
time_window_data_current_channel[idx_max:]) < (
10*np.log10(noise_estimation_current_channel)
+ dB_above_noise
+ use_dyn_range_for_regression))[0, 0] + idx_max
except TypeError:
start_idx_loop = 0
try:
stop_idx_loop = np.argwhere(10*np.log10(
time_window_data_current_channel[start_idx_loop+1:]) < (
10*np.log10(noise_estimation_current_channel)
+ dB_above_noise))[0, 0] + start_idx_loop
except IndexError as e:
message = f'Regression failed for channel {ch} due to low SNR.'
if failure_policy == 'error':
raise ValueError(message) from e
else:
warnings.warn(message, stacklevel=2)
return None
# regression_matrix*slope = edc
regression_matrix = np.vstack((np.ones(
[stop_idx_loop-start_idx_loop]),
time_vector_window_current_channel[
start_idx_loop:stop_idx_loop]))
slope = np.linalg.lstsq(
regression_matrix.T,
(10*np.log10(time_window_data_current_channel[
start_idx_loop:stop_idx_loop])),
rcond=None)[0]
if slope[1] >= 0:
message = (
f'Regression failed for channel {ch}. '
'Remove preceeding delay or check the SNR')
if failure_policy == 'error':
raise ValueError(message)
else:
warnings.warn(message, stacklevel=2)
return None
# (9) FIND CROSSPOINT
old_crossing_point = crossing_point
crossing_point = ((10*np.log10(noise_estimation_current_channel)
- slope[0]) / slope[1])
loop_counter = loop_counter + 1
if (np.abs(old_crossing_point-crossing_point) < 0.01):
break
if loop_counter > 30:
# TO-DO: Paper says 5 iterations are sufficient in all cases!
warnings.warn(
"Lundeby algorithm was terminated after 30 iterations.",
stacklevel=2)
break
return (slope, noise_estimation_current_channel, crossing_point,
time_window_data_current_channel, idx_last_10_percent,
idx_10dB_below_crosspoint, regression_time, regression_values,
preliminary_crossing_point, time_vector_window_current_channel)
def _threshold_energy_decay_curve(energy_decay_curve, threshold_level):
"""
Threshold an energy decay curve.
Parameters
----------
energy_decay_curve : array like
The energy decay curve
threshold_level : float
The threshold level in dB. The data below the threshold level are set
to ``numpy.nan`` values.
Returns
-------
energy_decay_curve : numpy.ndarray
The thresholded energy decay curve
"""
edc = np.atleast_2d(energy_decay_curve)
threshold_level = np.atleast_2d(threshold_level)
e = edc.T[0]
t = e / 10**(threshold_level/10)
mask = edc.T < np.broadcast_to(t, edc.T.shape)
edc[mask.T] = np.nan
return edc
[docs]
def threshold_energy_decay_curve(energy_decay_curve, threshold):
"""
Threshold an energy decay curve.
Parameters
----------
energy_decay_curve : pyfar.TimeData
The energy decay curve
threshold : float
The threshold level in dB. The data below the threshold level are set
to ``numpy.nan`` values.
Returns
-------
energy_decay_curve : pyfar.TimeData
The thresholded energy decay curve
"""
return pf.TimeData(
_threshold_energy_decay_curve(energy_decay_curve.time, threshold),
energy_decay_curve.times,
energy_decay_curve.comment)
def _normalize_edc(energy_decay_curve, channel_independent):
"""
Helper function to normalize the energy decay curve.
Parameters
----------
energy_decay_curve : numpy array
EDC of shape `(..., n_samples)`.
channel_independent : boolean
Defines, if the time shift and normalization is done
channel-independently or not.
Returns
-------
energy_decay_curve : numpy array
The normalized EDC
"""
# ignore runtime warnings due to NaN values in edc. More informative
# warnings are raised by `intersection_time_lundeby` and everything
# that happens below is by intention
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", "All-NaN slice encountered", RuntimeWarning)
# Normalize the EDC...
if not channel_independent:
# by the max across all channels
energy_decay_curve = \
np.divide(energy_decay_curve[..., :],
np.nanmax(energy_decay_curve))
else:
# by the max of each individual channel
energy_decay_curve = \
energy_decay_curve[..., :] \
/ np.nanmax(energy_decay_curve, axis=-1, keepdims=True)
return energy_decay_curve