# -*- coding: utf-8 -*-
"""Signal processing related functions."""
import warnings
import pyfar as pf
import numpy as np
[docs]
def find_impulse_response_maximum(
impulse_response,
threshold=20,
noise_energy='auto'):
"""Find the maximum of an impulse response as argmax(h(t)).
Performs an initial SNR check according to a defined threshold level in dB.
Parameters
----------
impulse_response : ndarray, double
The impulse response
threshold : double, optional
Threshold SNR value in dB
noise_energy : float, str, optional
If ``'auto'``, the noise level is calculated based on the last 10
percent of the RIR. Otherwise specify manually for each channel
as array.
Returns
-------
max_sample : int
Sample at which the impulse response starts
Note
----
The function tries to estimate the SNR in the IR based on the signal energy
in the last 10 percent of the IR.
"""
ir_squared = np.abs(impulse_response.time)**2
mask_start = int(0.9*ir_squared.shape[-1])
if noise_energy == 'auto':
mask = np.arange(mask_start, ir_squared.shape[-1])
noise = np.mean(np.take(ir_squared, mask, axis=-1), axis=-1)
else:
noise = noise_energy
max_sample = np.argmax(ir_squared, axis=-1)
max_value = np.max(ir_squared, axis=-1)
if np.any(max_value < 10**(threshold/10) * noise) or \
np.any(max_sample > mask_start):
warnings.warn(
"The SNR seems lower than the specified threshold value. Check "
"if this is a valid impulse response with sufficient SNR.",
stacklevel=2)
return max_sample
[docs]
def estimate_noise_energy(
data,
interval=[0.9, 1.0],
is_energy=False):
"""Estimate the noise power of additive noise in impulse responses.
The noise power is distributed from an interval in which the additive
noise is assumed to be larger than the impulse response data.
Parameters
----------
data : pyfar.Signal
The impulse response.
interval : tuple, float
Defines the interval of the RIR to be evaluated for the estimation.
The interval is relative to the length of the RIR ``0 = 0%, 1=100%``.
By default ``(0.9, 1.0)``.
is_energy : bool
Defines if the data is already squared.
Returns
-------
noise_energy : numpy.ndarray[float]
The energy of the background noise,shaped according to the channel
shape of the input Signal.
"""
energy_data = _preprocess_rir(
data,
is_energy=is_energy,
shift=False,
channel_independent=False)
return _estimate_noise_energy(energy_data.time, interval=interval)
def _estimate_noise_energy(
energy_data,
interval=[0.9, 1.0]):
"""Estimate the noise power of additive noise in impulse responses.
Private function for use with numpy arrays.
Parameters
----------
energy_data: np.array
The room impulse response with shape ``(..., n_samples)``.
interval : tuple, float
Defines the interval of the RIR to be evaluated for the estimation.
The interval is relative to the length of the RIR ``0 = 0%, 1=100%``.
By default ``(0.9, 1.0)``.
Returns
-------
noise_energy : float
The energy of the background noise.
"""
if np.any(energy_data) < 0:
raise ValueError("Energy is negative, check your input signal.")
region_start_idx = int(energy_data.shape[-1]*interval[0])
region_end_idx = int(energy_data.shape[-1]*interval[1])
mask = np.arange(region_start_idx, region_end_idx)
noise_energy = np.nanmean(np.take(energy_data, mask, axis=-1), axis=-1)
return noise_energy
def _smooth_rir(
data,
sampling_rate,
smooth_block_length=0.075):
"""Smoothens the RIR by averaging the data in an specified interval.
Parameters
----------
data : ndarray, double
The room impulse response with dimension ``(..., n_samples)``.
sampling_rate: integer
Defines the sampling rate of the room impulse response.
smooth_block_length : double
Defines the block-length of the smoothing algorithm in seconds.
Returns
-------
time_window_data : ndarray, double
The smoothed RIR.
time_vector_window : ndarray, double
The respective time vector fitting the smoothed data.
time_vector : ndarray, double
The time vector fitting the original data.
"""
cshape = data.shape[:-1]
data = np.atleast_2d(data)
n_samples = data.shape[-1]
n_samples_nan = np.count_nonzero(np.isnan(data), axis=-1)
n_samples_per_block = int(np.round(smooth_block_length * sampling_rate, 0))
n_blocks = np.asarray(
np.floor((n_samples-n_samples_nan)/n_samples_per_block),
dtype=int)
# average data in blocks
n_blocks_min = int(np.min(n_blocks))
n_samples_actual = int(n_blocks_min*n_samples_per_block)
reshaped_array = np.reshape(
data[..., :n_samples_actual],
(*cshape, n_blocks_min, n_samples_per_block))
time_window_data = np.mean(reshaped_array, axis=-1)
# Use average time instances corresponding to the average energy level
# instead of time for the first sample of the block
time_vector_window = \
((0.5+np.arange(0, n_blocks_min)) * n_samples_per_block/sampling_rate)
# Use the time corresponding to the sampling of the original data
time_vector = (np.arange(0, n_samples))/sampling_rate
return time_window_data, time_vector_window, time_vector
def _preprocess_rir(
data,
is_energy=False,
shift=False,
channel_independent=False):
"""Preprocess the room impulse response for further processing.
- Square data
- Shift the RIR to the first sample of the array, compensating for the
delay of the time of arrival of the direct sound. The time shift is
performed as a non-cyclic shift, adding numpy.nan values in the end
of the RIR corresponding to the number of samples the data is
shifted by.
- The time shift can be done channel-independent or not.
Parameters
----------
data : ndarray, double
The room impulse response with dimension (..., n_samples).
is_energy : boolean
Defines, if the data is already squared.
shift : boolean
Defines, if the silence at beginning of the RIR should be removed.
channel_independent : boolean
Defines, if the time shift is done channel-independent or not.
Returns
-------
energy_data : ndarray, double
The preprocessed RIR
"""
times = data.times
n_channels = np.prod(data.cshape)
if shift:
rir_start_idx = pf.dsp.find_impulse_response_start(data)
if channel_independent and not n_channels == 1:
shift_samples = -rir_start_idx
else:
min_shift = np.amin(rir_start_idx)
shift_samples = np.asarray(
-min_shift * np.ones(data.cshape), dtype=int)
result = pf.dsp.time_shift(
data, shift_samples, mode='linear', pad_value=np.nan)
else:
result = data
if not is_energy:
energy_data = np.abs(result.time)**2
else:
energy_data = result.time.copy()
energy_data = pf.TimeData(energy_data, times)
return energy_data
[docs]
def peak_signal_to_noise_ratio(
impulse_response,
noise_power='auto',
is_energy=False):
"""Calculate the peak-signal-to-noise-ratio of an impulse response.
Parameters
----------
impulse_response : pyfar.Signal
The impulse response
noise_power : float, str, optional
The noise power. The default is 'auto', in which case the noise power
is estimated from the last 10 % of the impulse response.
is_energy : bool, optional
Defines if the impulse response is already squared. If set to True,
the function assumes that the input is already energy data, otherwise
it squares the input data. The default is False.
Returns
-------
float, array-like
The estimated peak-signal-to-noise-ratio for each channel of the
impulse response.
"""
data = impulse_response.time
if is_energy is False:
data = data**2
if noise_power == 'auto':
noise_power = _estimate_noise_energy(data)
return np.max(data, axis=-1) / noise_power