# -*- coding: utf-8 -*-
"""Signal processing related functions."""
import warnings
import numpy as np
import scipy.signal as spsignal
[docs]def find_impulse_response_start(
impulse_response,
threshold=20,
noise_energy='auto'):
"""Find the first sample of an impulse response in a accordance with the
ISO standard ISO 3382 [#]_. The start sample is identified as the first sample
that varies significantly from the noise floor but still has a level of
at least 20 dB below the maximum of the impulse response. The function
further tries to consider oscillations before the time below the threshold
value.
Parameters
----------
impulse_response : ndarray, double
The impulse response
threshold : double, optional
Threshold according to ISO3382 in dB
Returns
-------
start_sample : int
Sample at which the impulse response starts
Note
----
The function tries to estimate the SNR in the IR based on the signal energy
in the last 10 percent of the IR.
References
----------
.. [#] ISO 3382-1:2009-10, Acoustics - Measurement of the reverberation
time of rooms with reference to other acoustical parameters. pp. 22
"""
ir_squared = np.abs(impulse_response)**2
mask_start = int(0.9*ir_squared.shape[-1])
if noise_energy == 'auto':
mask = np.arange(mask_start, ir_squared.shape[-1])
noise = np.mean(np.take(ir_squared, mask, axis=-1), axis=-1)
else:
noise = noise_energy
max_sample = np.argmax(ir_squared, axis=-1)
max_value = np.max(ir_squared, axis=-1)
if np.any(max_value < 10**(threshold/10) * noise) or \
np.any(max_sample > mask_start):
raise ValueError("The SNR is lower than the defined threshold. Check \
if this is a valid impulse response with sufficient SNR.")
start_sample_shape = max_sample.shape
n_samples = ir_squared.shape[-1]
ir_squared = np.reshape(ir_squared, (-1, n_samples))
n_channels = ir_squared.shape[0]
max_sample = np.reshape(max_sample, n_channels)
max_value = np.reshape(max_value, n_channels)
start_sample = max_sample.copy()
for idx in range(n_channels):
# Only look for the start sample if the maximum index is bigger than 0
if start_sample[idx] > 0:
ir_before_max = ir_squared[idx, :max_sample[idx]+1] \
/ max_value[idx]
# Last value before peak lower than the peak/threshold
idx_last_below_thresh = np.argwhere(
ir_before_max < 10**(-threshold/10))
if idx_last_below_thresh.size > 0:
start_sample[idx] = idx_last_below_thresh[-1]
else:
start_sample[idx] = 0
warnings.warn(
'No values below threshold found before the maximum value,\
defaulting to 0')
idx_6dB_above_threshold = np.argwhere(
ir_before_max[:start_sample[idx]+1] >
10**((-threshold+6)/10))
if idx_6dB_above_threshold.size > 0:
idx_6dB_above_threshold = int(idx_6dB_above_threshold[0])
tmp = np.argwhere(
ir_before_max[:idx_6dB_above_threshold+1] <
10**(-threshold/10))
if tmp.size == 0:
start_sample[idx] = 0
warnings.warn(
'Oscillations detected in the impulse response. \
No clear starting sample found, defaulting to 0')
else:
start_sample[idx] = tmp[-1]
start_sample = np.reshape(start_sample, start_sample_shape)
return np.squeeze(start_sample)
[docs]def find_impulse_response_maximum(
impulse_response,
threshold=20,
noise_energy='auto'):
"""Find the maximum of an impulse response as argmax(h(t)).
Performs an initial SNR check according to a defined threshold level in dB.
Parameters
----------
impulse_response : ndarray, double
The impulse response
threshold : double, optional
Threshold SNR value in dB
Returns
-------
max_sample : int
Sample at which the impulse response starts
Note
----
The function tries to estimate the SNR in the IR based on the signal energy
in the last 10 percent of the IR.
"""
ir_squared = np.abs(impulse_response)**2
mask_start = int(0.9*ir_squared.shape[-1])
if noise_energy == 'auto':
mask = np.arange(mask_start, ir_squared.shape[-1])
noise = np.mean(np.take(ir_squared, mask, axis=-1), axis=-1)
else:
noise = noise_energy
max_sample = np.argmax(ir_squared, axis=-1)
max_value = np.max(ir_squared, axis=-1)
if np.any(max_value < 10**(threshold/10) * noise) or \
np.any(max_sample > mask_start):
raise ValueError("The SNR is lower than the defined threshold. Check \
if this is a valid impulse response with sufficient SNR.")
return np.squeeze(max_sample)
[docs]def time_shift(signal, n_samples_shift, circular_shift=True, keepdims=False):
"""Shift a signal in the time domain by n samples.
This function will perform a circular shift by default, inherently
assuming that the signal is periodic. Use the option `circular_shift=False`
to pad with nan values instead.
Notes
-----
This function is primarily intended to be used when processing impulse
responses.
Parameters
----------
signal : ndarray, float
Signal to be shifted
n_samples_shift : integer
Number of samples by which the signal should be shifted. A negative
number of samples will result in a left-shift, while a positive
number of samples will result in a right shift of the signal.
circular_shift : bool, True
Perform a circular or non-circular shift. If a non-circular shift is
performed, the data will be padded with nan values at the respective
beginning or ending of the data, corresponding to the number of samples
the data is shifted.
keepdims : bool, False
Do not squeeze the data before returning.
Returns
-------
shifted_signal : ndarray, float
Shifted input signal
"""
n_samples_shift = np.asarray(n_samples_shift, dtype=int)
if np.any(signal.shape[-1] < n_samples_shift):
msg = "Shifting by more samples than length of the signal."
if circular_shift:
warnings.warn(msg, UserWarning)
else:
raise ValueError(msg)
signal = np.atleast_2d(signal)
n_samples = signal.shape[-1]
signal_shape = signal.shape
signal = np.reshape(signal, (-1, n_samples))
n_channels = np.prod(signal.shape[:-1])
if n_samples_shift.size == 1:
n_samples_shift = np.broadcast_to(n_samples_shift, n_channels)
elif n_samples_shift.size == n_channels:
n_samples_shift = np.reshape(n_samples_shift, n_channels)
else:
raise ValueError("The number of shift samples has to match the number \
of signal channels.")
shifted_signal = signal.copy()
for channel in range(n_channels):
shifted_signal[channel, :] = \
np.roll(
shifted_signal[channel, :],
n_samples_shift[channel],
axis=-1)
if not circular_shift:
if n_samples_shift[channel] < 0:
# index is negative, so index will reference from the
# end of the array
shifted_signal[channel, n_samples_shift[channel]:] = np.nan
else:
# index is positive, so index will reference from the
# start of the array
shifted_signal[channel, :n_samples_shift[channel]] = np.nan
shifted_signal = np.reshape(shifted_signal, signal_shape)
if not keepdims:
shifted_signal = np.squeeze(shifted_signal)
return shifted_signal
[docs]def center_frequencies_octaves():
"""Return the octave center frequencies according to the IEC 61260:1:2014
standard.
Returns
-------
frequencies : ndarray, float
Octave center frequencies
"""
nominal = np.array([31.5, 63, 125, 250, 500, 1e3,
2e3, 4e3, 8e3, 16e3], dtype=np.float)
indices = _frequency_indices(nominal, 1)
exact = exact_center_frequencies_fractional_octaves(indices, 1)
return nominal, exact
[docs]def center_frequencies_third_octaves():
"""Return the third octave center frequencies according
to the ICE 61260:1:2014 standard.
Returns
-------
frequencies : ndarray, float
third octave center frequencies
"""
nominal = np.array([
25, 31.5, 40, 50, 63, 80, 100, 125, 160,
200, 250, 315, 400, 500, 630, 800, 1000,
1250, 1600, 2000, 2500, 3150, 4000, 5000,
6300, 8000, 10000, 12500, 16000, 20000], dtype=np.float)
indices = _frequency_indices(nominal, 3)
exact = exact_center_frequencies_fractional_octaves(indices, 3)
return nominal, exact
def exact_center_frequencies_fractional_octaves(indices, num_fractions):
"""Returns the exact center frequencies for fractional octave bands
according to the IEC 61260:1:2014 standard.
octave ratio
.. G = 10^{3/10}
center frequencies
.. f_m = f_r G^{x/b}
.. f_m = f_e G^{(2x+1)/(2b)}
where b is the number of octave fractions, f_r is the reference frequency
chosen as 1000Hz and x is the index of the frequency band.
Parameters
----------
indices : array
The indices for which the center frequencies are calculated.
num_fractions : 1, 3
The number of octave fractions. 1 returns octave center frequencies,
3 returns third octave center frequencies.
Returns
-------
frequencies : ndarray, float
center frequencies of the fractional octave bands
"""
reference_freq = 1e3
octave_ratio = 10**(3/10)
iseven = np.mod(num_fractions, 2) == 0
if ~iseven:
exponent = (indices/num_fractions)
else:
exponent = ((2*indices + 1) / num_fractions / 2)
return reference_freq * octave_ratio**exponent
def _frequency_indices(frequencies, num_fractions):
"""Return the indices for fractional octave filters.
Parameters
----------
frequencies : array
The nominal frequencies for which the indices for exact center
frequency calculation are to be calculated.
num_fractions : 1, 3
Number of fractional bands
Returns
-------
indices : array
The indices for exact center frequency calculation.
"""
reference_freq = 1e3
octave_ratio = 10**(3/10)
iseven = np.mod(num_fractions, 2) == 0
if ~iseven:
indices = np.around(
num_fractions * np.log(frequencies/reference_freq)
/ np.log(octave_ratio))
else:
indices = np.around(
2.0*num_fractions *
np.log(frequencies/reference_freq) / np.log(octave_ratio) - 1)/2
return indices
[docs]def filter_fractional_octave_bands(
signal, samplingrate, num_fractions,
freq_range=(20.0, 20e3), order=6):
"""Apply a fractional octave filter to a signal.
Filter bank implementation using second order sections of butterworth
filters for increased numeric accuracy and stability.
Parameters
----------
signal : ndarray
input signal to be filtered
samplingrate : integer
samplingrate of the signal
num_fractions : integer
number of octave fractions
order : integer, optional
order of the butterworth filter
Returns
-------
signal_filtered : ndarray
Signal filtered into fractional octave bands. The array has a new axis
with dimension corresponding to the number of frequency bands:
[num_fractions, *signal.shape]
"""
if num_fractions not in (1, 3):
raise ValueError("This currently supports only octave and third \
octave band filters.")
octave_ratio = 10**(3/10)
if num_fractions == 1:
nominal, exact = center_frequencies_octaves()
else:
nominal, exact = center_frequencies_third_octaves()
mask_frequencies = (nominal > freq_range[0]) & (nominal < freq_range[1])
nominal = nominal[mask_frequencies]
exact = exact[mask_frequencies]
signal_out_shape = (exact.size,) + signal.shape
signal_out = np.broadcast_to(signal, signal_out_shape).copy()
for band in range(exact.size):
freq_upper = exact[band] * octave_ratio**(1/2/num_fractions)
freq_lower = exact[band] * octave_ratio**(-1/2/num_fractions)
# normalize interval such that the Nyquist frequency is 1
Wn = np.array([freq_lower, freq_upper]) / samplingrate * 2
# in case the upper frequency limit is above Nyquist, use a highpass
if Wn[-1] > 1:
warnings.warn('Your upper frequency limit [{}] is above the \
Nyquist frequency. Using a highpass filter instead of a \
bandpass'.format(freq_upper))
Wn = Wn[0]
btype = 'highpass'
else:
btype = 'bandpass'
sos = spsignal.butter(order, Wn, btype=btype, output='sos')
signal_out[band, :] = spsignal.sosfilt(sos, signal)
return signal_out