Compare commits

...

3 Commits

5 changed files with 388 additions and 6 deletions

180
beat.py Normal file
View File

@@ -0,0 +1,180 @@
import numpy as np
class SsfZxing:
"""
Find beats in a Sum Slope Function by detecting threshold crossings.
See Zong et al, from CINC 2003.
"""
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
# these two depend on each other.
t_range = 0.024 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
sw_sec = 0.04 #: upslope width in sec (for SSF function)
ssf_rel_thres = 3 #: magic number from Zong 2003, to scale mean SSF amplitude
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
def __init__(self): pass
def _ssf_det_zxings(self, fs, ssf):
"""detect threshold crossings in 'ssf' signal."""
i_holdoff = int(self.t_holdoff * fs)
# TODO: check if we need lowpass instead of mean for 'ssf_th'
ssf_th = self.ssf_rel_thres * np.mean(ssf)
# threshold crossing
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
ssf_z = (ssf_pk - ssf_pks) == 1
# holdoff
for i in np.arange(ssf_z.shape[0] - i_holdoff):
if ssf_z[i]:
ssf_z[i+1:i+1+i_holdoff] = 0
# rise amplitude filter (check in 2-3 samples [0.024 sec or so] and compare vs. threshold)
i_range = int(self.t_range * fs)
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
if ssf_z[i]:
rise = ssf[i+i_range] - ssf[i-i_range]
if rise < ssf_th * self.ssf_rel_rise:
ssf_z[i] = 0
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_zxings = np.where(ssf_z)[0]
# only integer-index resolution (no interpolation)
return ssf_zxings
def _ssf_function(self, fs, y):
"""sum-slope function."""
sw = int(self.sw_sec*fs)
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
duks = np.pad(np.cumsum(duk), (0, sw))
duks_r = np.roll(duks, sw)
ssf = (duks - duks_r)[:-sw]
return ssf
def get_mae_dist(ibis):
"""make triangular wave between beats, representing absolute beat placement error."""
dist = np.zeros(np.sum(ibis)+1)
# fill with distances
p_i, i = 0, 0
for ibi in ibis:
i += ibi
l_c = ibi // 2
dist[p_i:p_i+l_c+ibi%2] = np.arange(l_c+ibi%2)
dist[p_i+l_c+ibi%2:i] = 1 + np.arange(ibi-l_c-ibi%2)[::-1]
p_i = i
return dist
def get_mae_err_1(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave (estimate)
and actually detected beats. computes 'dist' from 'act_ibis' (direction 1).
"""
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
# 2 pi freq n / fs + phase = k * pi
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
# n = (k - phase/pi) * fs / (2 freq)
# k_max = 2 freq n_max / fs + phase/pi
N = np.sum(act_ibis)+1
phase %= 2*np.pi
k_max = 2 * freq * N / fs + phase/np.pi + 1
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
est_ibis = np.diff(i_est_zxing)
dist = get_mae_dist(est_ibis)
assert np.sum(est_ibis) > np.sum(act_ibis)
if debug:
plt.figure(figsize=(8,2))
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
mae_errs = dist[np.cumsum(act_ibis)]
return np.sum(mae_errs)
def get_mae_err_2(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave (estimate)
and actually detected beats. computes 'dist' from est_zxings (direction 2)
"""
dist = np.pad(get_mae_dist(act_ibis), (0,1))
dist[-1] = 1 # sentinel for rounding errors in np.round()
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
# 2 pi freq n / fs + phase = k * pi
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
# n = (k - phase/pi) * fs / (2 freq)
# k_max = 2 freq n_max / fs + phase/pi
N = np.sum(act_ibis)+1
phase %= 2*np.pi
k_max = 2 * freq * N / fs + phase/np.pi
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
if debug:
plt.figure(figsize=(8,2))
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
mae_errs = dist[i_est_zxing]
return np.sum(mae_errs)
def get_mae_err(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave
and actually detected beats.
"""
mae = 0
# we compute both directions, to properly handle "missing beats"
# (in direction 1, an optimal solution is "fully dense", "freq = fs/2", because each 'act_beat' will align to the next index)
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
return mae
class RegularBeatFinder:
"""
Optimize a beat frequency by placing a regular beat over the detected beats.
Always finds a beat within f1..f2 range,
ignoring whether the detected beat is an upper harmonic.
"""
num_freqs = 200 #: number of freq steps to evaluate
range_f1 = 0.5 #: lowest detection frequency in Hz
range_f2 = 4.0 #: highest detection frequency in Hz
def __init__(self): pass
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None):
"""Find the optimal beat frequency."""
act_ibis = np.diff(ssf_zxings)
# evaluate mean absolute errors for all frequencies
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(freqs, freq_errs)
# get optimal frequency
i_freq = np.argmin(freq_errs)
# compute normalized error (mean beat placement error in samples)
N = np.sum(act_ibis)+1
k_max = 2 * freqs[i_freq] * N / fs # + phase/np.pi
norm_err = freq_errs[i_freq] / k_max
#
return freqs[i_freq], norm_err
def freq_to_est_ibis(self, fs, freq, N):
phase = 0
k_max = 2 * freq * N / fs + phase/np.pi
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
return np.diff(i_est_zxing)
def _get_opt_ibi_freq_2(self, fs, act_ibis, debug_i=None):
"""get mean absolute errors for a range of frequencies."""
assert self.range_f2 < fs/8, "it is at most useful to go until f = fs/8" # (and even then, get_mae_dist() triangle will not be very useful)
t_freqs = np.linspace(self.range_f1, self.range_f2, self.num_freqs)
fe = np.zeros(self.num_freqs)
for i, f in enumerate(t_freqs):
fe[i] = get_mae_err(fs, f, 0.0, act_ibis)
if debug_i is not None:
# plot colors:
# y: estimate
# r: actual
get_mae_err(fs, t_freqs[debug_i], 0.0, act_ibis, debug=True)
return t_freqs, fe

View File

@@ -4,3 +4,5 @@ authomatic
requests requests
numpy numpy
scipy scipy
scikit-learn
hsh-signal

View File

@@ -13,8 +13,8 @@
import numpy as np import numpy as np
from numpy.fft import fft from numpy.fft import fft
from scipy.signal import fftconvolve
from scipy.io import wavfile from hsh_signal.signal import lowpass_fft
def viterbi_highest_frequency_path_vectorized(Scp2, jump_penalty=2.0, use_log_amplitude=True): def viterbi_highest_frequency_path_vectorized(Scp2, jump_penalty=2.0, use_log_amplitude=True):
Scp2 = np.asarray(Scp2, dtype=float) Scp2 = np.asarray(Scp2, dtype=float)
@@ -103,14 +103,21 @@ class BassAnalyzer:
wavelet_win_sec = 0.175 wavelet_win_sec = 0.175
k_omega, k_nu = 0.12, 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B') k_omega, k_nu = 0.12, 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B')
viterbi_jump_penalty = 5000.0 viterbi_jump_penalty = 5000.0
Wp_force = None
I_force = None
def __init__(self, fs, sig): def __init__(self, fs, sig, Wp_force=None):
""" """
:param fs: sampling rate :param fs: sampling rate
:param sig: audio signal normalized to [-1,1] :param sig: audio signal normalized to [-1,1]
""" """
self.D = int(self.shift_sec * fs) #: spectrogram step self.D = int(self.shift_sec * fs) #: spectrogram step
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window if self.Wp_force:
self.Wp = self.Wp_force
elif Wp_force:
self.Wp = Wp_force
else:
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
self.U = self.Wp // self.W # ratio self.U = self.Wp // self.W # ratio
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters) self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
@@ -162,7 +169,10 @@ class BassAnalyzer:
g = np.abs(Spf) # (M x W) g = np.abs(Spf) # (M x W)
g_bar = np.mean(g, axis=1) # (M) g_bar = np.mean(g, axis=1) # (M)
# TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const. # TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const.
A = np.mean(g_bar) # amplitude cutoff for pulse train #A = np.mean(g_bar) # amplitude cutoff for pulse train
ip = int(fs)
g_bar_l = lowpass_fft(np.pad(g_bar, (ip, ip), mode='edge'), fps=fs, cf=0.5, tw=0.05)[ip:-ip]
A = g_bar_l
# compute transitions (pulse train) # compute transitions (pulse train)
pt = (g_bar > A).astype(int) # pulse train pt = (g_bar > A).astype(int) # pulse train
@@ -220,7 +230,10 @@ class BassAnalyzer:
T = (Lp - Wp) / fsp # un-padded sample count -> time length T = (Lp - Wp) / fsp # un-padded sample count -> time length
#omega = p * T / B # width parameter of Gabor wavelet #omega = p * T / B # width parameter of Gabor wavelet
#nu = B / (p * T) # frequency parameter of Gabor wavelet #nu = B / (p * T) # frequency parameter of Gabor wavelet
I = int(np.log2(p**2 * T**2 / (delta * B**2)) - 3/2) # number of octaves if self.I_force:
I = self.I_force
else:
I = int(np.log2(p**2 * T**2 / (delta * B**2)) - 3/2) # number of octaves
J = int(256 / I) # number of voices per octave J = int(256 / I) # number of voices per octave
r = np.linspace(0, I*J-1, I*J) r = np.linspace(0, I*J-1, I*J)
@@ -271,3 +284,71 @@ class BassAnalyzer:
def _viterbi_ampl(self, Scp2, path): def _viterbi_ampl(self, Scp2, path):
max_amplitudes = np.array([np.abs(Scp2[i, path[i]]) for i in range(Scp2.shape[0])]) max_amplitudes = np.array([np.abs(Scp2[i, path[i]]) for i in range(Scp2.shape[0])])
return max_amplitudes return max_amplitudes
class GuitarAnalyzer:
"""
Rhythm analysis from songs.
Provides a beat amplitude signal from the audio signal.
Performs short-time Fourier Transform on the signal,
then returns the f1..f2 band power for each window.
For low-frequency instruments like percussion,
use BassAnalyzer instead.
"""
W = 1024 #: window size (must be even, so that right padding W/2-1 works)
shift_sec = 0.008 #: window shift in sec ('delta_tau') between subsequent windows
target_band_f1 = 800.0 #: lower bound of target freq band in Hz
target_band_f2 = 4300.0 #: upper bound of target freq band in Hz
def __init__(self, fs, sig):
"""
:param fs: sampling rate
:param sig: audio signal normalized to [-1,1]
"""
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
self.fs = fs
self.D = int(self.shift_sec * fs) #: spectrogram step
self.L = self.f.shape[0]
self.M = (self.L-self.W) // self.D + 1 #: number of time steps
def spectrogram_power_amplitudes(self):
"""
Compute spectrogram power from a target frequency range.
NOTE: downsampled from the original 'fs'.
:returns: (fsd, sig): sampling rate, amplitude signal
"""
Spf = self._spectrogram()
ampl = self._spectrogram_power(Spf)
return ampl
def _spectrogram_power(self, Spf):
# Spf
# fs, W
fs, W = self.fs, self.W
k1, k2 = int(self.target_band_f1/fs*W), int(self.target_band_f2/fs*W) # freq band range in W-FFT
#
# spectrum power in f1..f2 bands
#
#hp_slice = highpass(np.sum(np.abs(Spf_slice[:, k1:k2]), axis=1), fps=fs/Dp, cf=2.0, tw=0.2)
hp_slice = np.sum(np.abs(Spf[:, k1:k2]), axis=1)-np.mean(np.sum(np.abs(Spf[:, k1:k2]), axis=1))
return hp_slice
def _spectrogram(self):
"""W-FFT (STFTs) to determine scalogram parameters"""
# *f
# M, W, D
f = self.f
M, W, D = self.M, self.W, self.D
#
# compute spectrogram: 'Spf' (M x W) <- from 'f'
#
iwss = np.linspace(W//2, W//2 + (M-1)*D, M, dtype=int) # 'D'-spaced start time indices of windows on 's'
Spf = np.zeros((M, W), dtype=np.complex128)
for i, iw in zip(range(M), iwss):
iws, iwe = iw-W//2, iw+W//2
Spf[i,:] = fft(f[iws:iwe])
return Spf

63
segmenter.py Normal file
View File

@@ -0,0 +1,63 @@
import numpy as np
from sklearn.cluster import KMeans
def median_filter(a, w):
ap = np.pad(a, (w//2, w//2), mode='edge')
o = np.zeros(a.shape[0])
for i in np.arange(a.shape[0]):
sl = ap[i:i+w]
o[i] = np.median(sl)
return o
class Segmenter:
seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec
seg_win_step_sec = 1.0 #: step for segmentation, in sec
n_clusters = 8 #: clusters for KMeans algorithm
seg_filt_win_sec = 20.0 #: median filter width for smoothing segments
def __init__(self): pass
def get_segment_boundaries(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments."""
segment_ids = self.get_segments(fs, guitar)
stxs = np.diff(segment_ids) != 0
i_stxs = np.where(stxs)[0]
return i_stxs
def get_segments(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments."""
seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec)
seg_guitar_data = self._sig_stochastics(fs, guitar)
X = np.vstack((
seg_guitar_data[:,0]*1.4,
np.sqrt(np.sum(seg_guitar_data[:,1:]**2, axis=1))
)).T
# cluster by stochastic characteristics
kmeans = KMeans(n_clusters=self.n_clusters, random_state=0, n_init="auto").fit(X)
segment_ids = np.floor(median_filter(kmeans.labels_, seg_filt_win)).astype(int)
# up-sample segment id assignment
iidx = np.linspace(0, segment_ids.shape[0], guitar.shape[0], endpoint=False).astype(int)
return segment_ids[iidx]
def _sig_stochastics(self, fs, y):
"""compute the stochastic moments of the signal. normalized."""
seg_win_size = int(self.seg_win_size_sec * fs)
seg_win_step = int(self.seg_win_step_sec * fs)
#
seg_y_data = np.zeros((y.shape[0] // seg_win_step, 4))
y_pad = np.pad(y, (seg_win_size // 2, seg_win_size // 2))
y_0_max, y_0_mean = np.max(y), np.mean(y)
y_1_max = np.max(np.mean((y - y_0_mean)**2))
y_2_max = np.max(np.mean(np.abs((y - y_0_mean)**3)))
y_3_max = np.max(np.mean((y - y_0_mean)**4))
wo = int(self.seg_win_size_sec/self.seg_win_step_sec)
for i in np.arange(wo//2, y.shape[0] // seg_win_step - wo//2):
i_c = int((i+0.5)*seg_win_step)
y_slice = y_pad[i_c-seg_win_size//2:i_c+seg_win_size//2]
mean = np.mean(y_slice)
seg_y_data[i,0] = mean / y_0_max
seg_y_data[i,1] = np.mean((y_slice - mean)**2) / y_1_max
seg_y_data[i,2] = np.mean(np.abs((y_slice - mean)**3)) / y_2_max / 2
seg_y_data[i,3] = np.mean((y_slice - mean)**4) / y_3_max / 4
#
return seg_y_data

56
sqi.py Normal file
View File

@@ -0,0 +1,56 @@
import numpy as np
def gauss(N, mu, sigma):
x = np.arange(N)
norm = sigma * np.sqrt(2*np.pi)
return np.exp(-1/2 * (x - mu)**2 / sigma**2) / norm
def shift(N, n, x):
"""shift the center of the signal 'x' to time index 'n'."""
y = np.zeros(N)
xl = x.shape[0]
s = n - xl // 2
x_bi, x_ei = np.clip(xl // 2 - n, a_min=0, a_max=xl), np.clip(N + xl - xl // 2 - n - xl % 2, a_min=0, a_max=xl)
y_bi, y_ei = np.clip(n - xl // 2, a_min=0, a_max=N), np.clip(n + xl // 2 + xl % 2, a_min=0, a_max=N)
y[y_bi:y_ei] = x[x_bi:x_ei]
return y
class SigQuality:
"""
Compute the Signal-to-Noise Ratio of beats
"""
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
#gauss_amplitude = 2.0
def __init__(self): pass
def get_snr(self, fs, ssf, ssf_threshold, ssf_zxings):
"""Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
sigma = fs * self.gauss_beat_template_sigma_sec
W = int(fs * self.gauss_beat_template_win_sec)
gb = gauss(W, W//2, sigma)
# place gaussians on estimated beat locations
ssf_est = np.zeros(ssf.shape[0])
for i in ssf_zxings:
ssf_est += shift(ssf.shape[0], i, gb)
ssf_est /= gb[W//2] # normalize amplitude to 1.0
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
#sqi_ref = ssf_est * (self.gauss_amplitude * ssf_threshold) # set amplitude
# penalty term = (where there should be no amplitude, according to gaussians)
sqi_pen = 1.0 - ssf_est
# sqi = ratio of signal energy in goal vs. in noise
sqi_goal = np.sum(ssf_est * (ssf**2))
sqi_noise = np.sum(sqi_pen * (ssf**2))
# noise is everywhere, while signal is only around detected peaks - correct for this.
goal_density = np.mean(np.clip(2*sigma / np.diff(ssf_zxings), a_min=0, a_max=1))
sqi_goal /= goal_density
sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise))
return sqi