Compare commits

..

3 Commits

4 changed files with 70 additions and 12 deletions

37
beat.py
View File

@@ -1,4 +1,5 @@
import numpy as np import numpy as np
import matplotlib.pyplot as plt # for debug only
class SsfZxing: class SsfZxing:
""" """
@@ -7,18 +8,21 @@ class SsfZxing:
""" """
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise) t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
# these two depend on each other. # these two depend on each other.
t_range = 0.024 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
sw_sec = 0.04 #: upslope width in sec (for SSF function) sw_sec = 0.04 #: upslope width in sec (for SSF function)
ssf_rel_thres = 3 #: magic number from Zong 2003, to scale mean SSF amplitude ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th' ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
# TODO: C++ impl has diverged.
# - refractory period changes
# - ssf_th filter with 6-points
# - ?? others ??
def __init__(self): pass def __init__(self): pass
def _ssf_det_zxings(self, fs, ssf): def _ssf_det_zxings(self, fs, ssf, ssf_th):
"""detect threshold crossings in 'ssf' signal.""" """detect threshold crossings in 'ssf' signal."""
i_holdoff = int(self.t_holdoff * fs) i_holdoff = int(self.t_holdoff * fs)
# TODO: check if we need lowpass instead of mean for 'ssf_th'
ssf_th = self.ssf_rel_thres * np.mean(ssf)
# threshold crossing # threshold crossing
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1)) ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
ssf_pks = np.pad(ssf_pk[:-1], (1,0)) ssf_pks = np.pad(ssf_pk[:-1], (1,0))
@@ -31,26 +35,38 @@ class SsfZxing:
i_range = int(self.t_range * fs) i_range = int(self.t_range * fs)
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1): for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
if ssf_z[i]: if ssf_z[i]:
rise = ssf[i+i_range] - ssf[i-i_range] rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i])
if rise < ssf_th * self.ssf_rel_rise: if rise < ssf_th * self.ssf_rel_rise:
ssf_z[i] = 0 ssf_z[i] = 0
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_zxings = np.where(ssf_z)[0] ssf_zxings = np.where(ssf_z)[0]
# only integer-index resolution (no interpolation) # only integer-index resolution (no interpolation)
self.ssf_zxings = ssf_zxings
return ssf_zxings return ssf_zxings
def _ssf_function(self, fs, y): def _ssf_function(self, fs, y):
"""sum-slope function.""" """sum-slope function."""
sw = int(self.sw_sec*fs) sw = int(self.sw_sec*fs)
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!) #ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
duks = np.pad(np.cumsum(duk), (0, sw)) duks = np.pad(np.cumsum(duk), (0, sw))
duks_r = np.roll(duks, sw) duks_r = np.roll(duks, sw)
ssf = (duks - duks_r)[:-sw] ssf = (duks - duks_r)[:-sw]
return ssf # compute threshold
# TODO: check if we need lowpass instead of mean for 'ssf_th'
ssf_th = self.ssf_rel_thres * np.mean(ssf)
self.ssf, self.ssf_th = ssf, ssf_th
return ssf, ssf_th
def debug_plot(self, i1, i2):
ssf, ssf_th, ssf_zxings = self.ssf, self.ssf_th, self.ssf_zxings
ssf_slice = ssf[i1:i2]
ssf_th_slice = ssf_th[i1:i2] if isinstance(ssf_th, np.ndarray) else ssf_th
plt.figure(figsize=(8, 2))
plt.plot(ssf_slice)
plt.plot(np.arange(ssf_slice.shape[0]), np.ones(ssf_slice.shape[0]) * ssf_th_slice)
plt.scatter(ssf_zxings[i1:i2], np.ones(ssf_zxings[i1:i2].shape[0]) * ssf_th_slice, c='r')
def get_mae_dist(ibis): def get_mae_dist(ibis):
"""make triangular wave between beats, representing absolute beat placement error.""" """make triangular wave between beats, representing absolute beat placement error."""
@@ -128,6 +144,11 @@ def get_mae_err(fs, freq, phase, act_ibis, debug=False):
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned) # (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug) mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug) mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
# TODO: may need to weight these two differently
# TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24]
# TODO: (check: is match always slightly to the left of the trough / smooth minimum?)
# TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??)
# (or even penalize differently instead of adding dir1 and dir2)
return mae return mae
class RegularBeatFinder: class RegularBeatFinder:

View File

@@ -13,6 +13,7 @@
import numpy as np import numpy as np
from numpy.fft import fft from numpy.fft import fft
import matplotlib.pyplot as plt # for debug only
from hsh_signal.signal import lowpass_fft from hsh_signal.signal import lowpass_fft
import time import time
@@ -81,7 +82,30 @@ def gabor_wavelet(omega, nu, fs, T, tt=None):
psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega) psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega)
return psi return psi
class BassAnalyzer: class Analyzer:
def __init__(self): pass
def debug_plot(self, i1, i2):
Scp2, path = self.Scp2, self.path
fs, Dp = self.fs, self.Dp
ss, omega, nu, fsp, Wp, I, J, freqs = self.pms
Scp2_slice = np.abs(Scp2[i1:i2])
plt.figure(figsize=(8,2))
plt.imshow(Scp2_slice.T, origin='lower')
x_positions = np.arange(Scp2_slice.shape[0]//250+1)*250
if x_positions[-1] == Scp2_slice.shape[0]:
x_positions[-1] -= 1 # so last tick is shown properly
t1 = i1 / (fs / Dp)
x_labels = ['{:.1f}'.format(t1+x*Dp/fs) for x in x_positions]
plt.xticks(x_positions, x_labels)
y_positions = np.arange(Scp2_slice.shape[1]//50)*50
y_labels = ['{:.1f}'.format((nu/(omega*ss[y]))) for y in y_positions] # Hz equivalents of wavelet scale
plt.yticks(y_positions, y_labels)
plt.plot(np.arange(Scp2_slice.shape[0]), path[i1:i2], c='r')
class BassAnalyzer(Analyzer):
""" """
Rhythm analysis from songs. Rhythm analysis from songs.
Provides a beat amplitude signal from the audio signal. Provides a beat amplitude signal from the audio signal.
@@ -112,6 +136,7 @@ class BassAnalyzer:
:param fs: sampling rate :param fs: sampling rate
:param sig: audio signal normalized to [-1,1] :param sig: audio signal normalized to [-1,1]
""" """
super(BassAnalyzer, self).__init__()
self.D = int(self.shift_sec * fs) #: spectrogram step self.D = int(self.shift_sec * fs) #: spectrogram step
if self.Wp_force: if self.Wp_force:
self.Wp = self.Wp_force self.Wp = self.Wp_force
@@ -151,6 +176,11 @@ class BassAnalyzer:
t8 = time.time() t8 = time.time()
ampl = self._viterbi_ampl(Scp2, path) ampl = self._viterbi_ampl(Scp2, path)
t9 = time.time() t9 = time.time()
self.Scp2 = Scp2
self.path = path
self.pms = pms
if not dbg_time: if not dbg_time:
return ampl return ampl
else: else:
@@ -192,6 +222,11 @@ class BassAnalyzer:
pt_re = (np.diff(pt) == 1).astype(int) # rising edge pt_re = (np.diff(pt) == 1).astype(int) # rising edge
self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal
# clip B, to force **reasonable** frequency range for wavelets
# (noise will otherwise cause many transitions -> high B -> bass falls below freq range -> algo fail)
B_min, B_max = 0.5 * M / (fs / self.D), 5.0 * M / (fs / self.D)
self.B = np.clip(self.B, a_min=B_min, a_max=B_max)
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded) # resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int) squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int)
ptr = pt[squashed_idxs] ptr = pt[squashed_idxs]

View File

@@ -9,6 +9,8 @@ def median_filter(a, w):
o[i] = np.median(sl) o[i] = np.median(sl)
return o return o
# nice-to: split longer segments (above 30 sec), merge very-short segments
class Segmenter: class Segmenter:
seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec
seg_win_step_sec = 1.0 #: step for segmentation, in sec seg_win_step_sec = 1.0 #: step for segmentation, in sec

6
sqi.py
View File

@@ -28,14 +28,14 @@ class SigQuality:
def __init__(self): pass def __init__(self): pass
def get_snr(self, fs, ssf, ssf_threshold, ssf_zxings): def get_snr(self, fs, ssf, ssf_threshold, est_zxings):
"""Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations.""" """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
sigma = fs * self.gauss_beat_template_sigma_sec sigma = fs * self.gauss_beat_template_sigma_sec
W = int(fs * self.gauss_beat_template_win_sec) W = int(fs * self.gauss_beat_template_win_sec)
gb = gauss(W, W//2, sigma) gb = gauss(W, W//2, sigma)
# place gaussians on estimated beat locations # place gaussians on estimated beat locations
ssf_est = np.zeros(ssf.shape[0]) ssf_est = np.zeros(ssf.shape[0])
for i in ssf_zxings: for i in est_zxings:
ssf_est += shift(ssf.shape[0], i, gb) ssf_est += shift(ssf.shape[0], i, gb)
ssf_est /= gb[W//2] # normalize amplitude to 1.0 ssf_est /= gb[W//2] # normalize amplitude to 1.0
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center) ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
@@ -49,7 +49,7 @@ class SigQuality:
sqi_noise = np.sum(sqi_pen * (ssf**2)) sqi_noise = np.sum(sqi_pen * (ssf**2))
# noise is everywhere, while signal is only around detected peaks - correct for this. # noise is everywhere, while signal is only around detected peaks - correct for this.
goal_density = np.mean(np.clip(2*sigma / np.diff(ssf_zxings), a_min=0, a_max=1)) goal_density = np.mean(np.clip(2*sigma / np.diff(est_zxings), a_min=0, a_max=1))
sqi_goal /= goal_density sqi_goal /= goal_density
sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise)) sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise))