Compare commits
3 Commits
e506a3e580
...
4627786dc4
| Author | SHA1 | Date | |
|---|---|---|---|
| 4627786dc4 | |||
| 11934b1f61 | |||
| d10187878d |
37
beat.py
37
beat.py
@@ -1,4 +1,5 @@
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt # for debug only
|
||||||
|
|
||||||
class SsfZxing:
|
class SsfZxing:
|
||||||
"""
|
"""
|
||||||
@@ -7,18 +8,21 @@ class SsfZxing:
|
|||||||
"""
|
"""
|
||||||
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
|
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
|
||||||
# these two depend on each other.
|
# these two depend on each other.
|
||||||
t_range = 0.024 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
|
t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
|
||||||
sw_sec = 0.04 #: upslope width in sec (for SSF function)
|
sw_sec = 0.04 #: upslope width in sec (for SSF function)
|
||||||
ssf_rel_thres = 3 #: magic number from Zong 2003, to scale mean SSF amplitude
|
ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude
|
||||||
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
|
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
|
||||||
|
|
||||||
|
# TODO: C++ impl has diverged.
|
||||||
|
# - refractory period changes
|
||||||
|
# - ssf_th filter with 6-points
|
||||||
|
# - ?? others ??
|
||||||
|
|
||||||
def __init__(self): pass
|
def __init__(self): pass
|
||||||
|
|
||||||
def _ssf_det_zxings(self, fs, ssf):
|
def _ssf_det_zxings(self, fs, ssf, ssf_th):
|
||||||
"""detect threshold crossings in 'ssf' signal."""
|
"""detect threshold crossings in 'ssf' signal."""
|
||||||
i_holdoff = int(self.t_holdoff * fs)
|
i_holdoff = int(self.t_holdoff * fs)
|
||||||
# TODO: check if we need lowpass instead of mean for 'ssf_th'
|
|
||||||
ssf_th = self.ssf_rel_thres * np.mean(ssf)
|
|
||||||
# threshold crossing
|
# threshold crossing
|
||||||
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
|
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
|
||||||
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
|
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
|
||||||
@@ -31,26 +35,38 @@ class SsfZxing:
|
|||||||
i_range = int(self.t_range * fs)
|
i_range = int(self.t_range * fs)
|
||||||
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
|
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
|
||||||
if ssf_z[i]:
|
if ssf_z[i]:
|
||||||
rise = ssf[i+i_range] - ssf[i-i_range]
|
rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i])
|
||||||
if rise < ssf_th * self.ssf_rel_rise:
|
if rise < ssf_th * self.ssf_rel_rise:
|
||||||
ssf_z[i] = 0
|
ssf_z[i] = 0
|
||||||
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
||||||
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
||||||
ssf_zxings = np.where(ssf_z)[0]
|
ssf_zxings = np.where(ssf_z)[0]
|
||||||
# only integer-index resolution (no interpolation)
|
# only integer-index resolution (no interpolation)
|
||||||
|
self.ssf_zxings = ssf_zxings
|
||||||
return ssf_zxings
|
return ssf_zxings
|
||||||
|
|
||||||
def _ssf_function(self, fs, y):
|
def _ssf_function(self, fs, y):
|
||||||
"""sum-slope function."""
|
"""sum-slope function."""
|
||||||
sw = int(self.sw_sec*fs)
|
sw = int(self.sw_sec*fs)
|
||||||
|
|
||||||
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
|
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
|
||||||
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
|
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
|
||||||
duks = np.pad(np.cumsum(duk), (0, sw))
|
duks = np.pad(np.cumsum(duk), (0, sw))
|
||||||
duks_r = np.roll(duks, sw)
|
duks_r = np.roll(duks, sw)
|
||||||
ssf = (duks - duks_r)[:-sw]
|
ssf = (duks - duks_r)[:-sw]
|
||||||
return ssf
|
# compute threshold
|
||||||
|
# TODO: check if we need lowpass instead of mean for 'ssf_th'
|
||||||
|
ssf_th = self.ssf_rel_thres * np.mean(ssf)
|
||||||
|
self.ssf, self.ssf_th = ssf, ssf_th
|
||||||
|
return ssf, ssf_th
|
||||||
|
|
||||||
|
def debug_plot(self, i1, i2):
|
||||||
|
ssf, ssf_th, ssf_zxings = self.ssf, self.ssf_th, self.ssf_zxings
|
||||||
|
ssf_slice = ssf[i1:i2]
|
||||||
|
ssf_th_slice = ssf_th[i1:i2] if isinstance(ssf_th, np.ndarray) else ssf_th
|
||||||
|
plt.figure(figsize=(8, 2))
|
||||||
|
plt.plot(ssf_slice)
|
||||||
|
plt.plot(np.arange(ssf_slice.shape[0]), np.ones(ssf_slice.shape[0]) * ssf_th_slice)
|
||||||
|
plt.scatter(ssf_zxings[i1:i2], np.ones(ssf_zxings[i1:i2].shape[0]) * ssf_th_slice, c='r')
|
||||||
|
|
||||||
def get_mae_dist(ibis):
|
def get_mae_dist(ibis):
|
||||||
"""make triangular wave between beats, representing absolute beat placement error."""
|
"""make triangular wave between beats, representing absolute beat placement error."""
|
||||||
@@ -128,6 +144,11 @@ def get_mae_err(fs, freq, phase, act_ibis, debug=False):
|
|||||||
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
|
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
|
||||||
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
|
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
|
||||||
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
|
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
|
||||||
|
# TODO: may need to weight these two differently
|
||||||
|
# TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24]
|
||||||
|
# TODO: (check: is match always slightly to the left of the trough / smooth minimum?)
|
||||||
|
# TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??)
|
||||||
|
# (or even penalize differently instead of adding dir1 and dir2)
|
||||||
return mae
|
return mae
|
||||||
|
|
||||||
class RegularBeatFinder:
|
class RegularBeatFinder:
|
||||||
|
|||||||
37
rhythm.py
37
rhythm.py
@@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from numpy.fft import fft
|
from numpy.fft import fft
|
||||||
|
import matplotlib.pyplot as plt # for debug only
|
||||||
|
|
||||||
from hsh_signal.signal import lowpass_fft
|
from hsh_signal.signal import lowpass_fft
|
||||||
import time
|
import time
|
||||||
@@ -81,7 +82,30 @@ def gabor_wavelet(omega, nu, fs, T, tt=None):
|
|||||||
psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega)
|
psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega)
|
||||||
return psi
|
return psi
|
||||||
|
|
||||||
class BassAnalyzer:
|
class Analyzer:
|
||||||
|
def __init__(self): pass
|
||||||
|
def debug_plot(self, i1, i2):
|
||||||
|
Scp2, path = self.Scp2, self.path
|
||||||
|
fs, Dp = self.fs, self.Dp
|
||||||
|
|
||||||
|
ss, omega, nu, fsp, Wp, I, J, freqs = self.pms
|
||||||
|
|
||||||
|
Scp2_slice = np.abs(Scp2[i1:i2])
|
||||||
|
|
||||||
|
plt.figure(figsize=(8,2))
|
||||||
|
plt.imshow(Scp2_slice.T, origin='lower')
|
||||||
|
x_positions = np.arange(Scp2_slice.shape[0]//250+1)*250
|
||||||
|
if x_positions[-1] == Scp2_slice.shape[0]:
|
||||||
|
x_positions[-1] -= 1 # so last tick is shown properly
|
||||||
|
t1 = i1 / (fs / Dp)
|
||||||
|
x_labels = ['{:.1f}'.format(t1+x*Dp/fs) for x in x_positions]
|
||||||
|
plt.xticks(x_positions, x_labels)
|
||||||
|
y_positions = np.arange(Scp2_slice.shape[1]//50)*50
|
||||||
|
y_labels = ['{:.1f}'.format((nu/(omega*ss[y]))) for y in y_positions] # Hz equivalents of wavelet scale
|
||||||
|
plt.yticks(y_positions, y_labels)
|
||||||
|
plt.plot(np.arange(Scp2_slice.shape[0]), path[i1:i2], c='r')
|
||||||
|
|
||||||
|
class BassAnalyzer(Analyzer):
|
||||||
"""
|
"""
|
||||||
Rhythm analysis from songs.
|
Rhythm analysis from songs.
|
||||||
Provides a beat amplitude signal from the audio signal.
|
Provides a beat amplitude signal from the audio signal.
|
||||||
@@ -112,6 +136,7 @@ class BassAnalyzer:
|
|||||||
:param fs: sampling rate
|
:param fs: sampling rate
|
||||||
:param sig: audio signal normalized to [-1,1]
|
:param sig: audio signal normalized to [-1,1]
|
||||||
"""
|
"""
|
||||||
|
super(BassAnalyzer, self).__init__()
|
||||||
self.D = int(self.shift_sec * fs) #: spectrogram step
|
self.D = int(self.shift_sec * fs) #: spectrogram step
|
||||||
if self.Wp_force:
|
if self.Wp_force:
|
||||||
self.Wp = self.Wp_force
|
self.Wp = self.Wp_force
|
||||||
@@ -151,6 +176,11 @@ class BassAnalyzer:
|
|||||||
t8 = time.time()
|
t8 = time.time()
|
||||||
ampl = self._viterbi_ampl(Scp2, path)
|
ampl = self._viterbi_ampl(Scp2, path)
|
||||||
t9 = time.time()
|
t9 = time.time()
|
||||||
|
|
||||||
|
self.Scp2 = Scp2
|
||||||
|
self.path = path
|
||||||
|
self.pms = pms
|
||||||
|
|
||||||
if not dbg_time:
|
if not dbg_time:
|
||||||
return ampl
|
return ampl
|
||||||
else:
|
else:
|
||||||
@@ -192,6 +222,11 @@ class BassAnalyzer:
|
|||||||
pt_re = (np.diff(pt) == 1).astype(int) # rising edge
|
pt_re = (np.diff(pt) == 1).astype(int) # rising edge
|
||||||
self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal
|
self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal
|
||||||
|
|
||||||
|
# clip B, to force **reasonable** frequency range for wavelets
|
||||||
|
# (noise will otherwise cause many transitions -> high B -> bass falls below freq range -> algo fail)
|
||||||
|
B_min, B_max = 0.5 * M / (fs / self.D), 5.0 * M / (fs / self.D)
|
||||||
|
self.B = np.clip(self.B, a_min=B_min, a_max=B_max)
|
||||||
|
|
||||||
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
|
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
|
||||||
squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int)
|
squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int)
|
||||||
ptr = pt[squashed_idxs]
|
ptr = pt[squashed_idxs]
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ def median_filter(a, w):
|
|||||||
o[i] = np.median(sl)
|
o[i] = np.median(sl)
|
||||||
return o
|
return o
|
||||||
|
|
||||||
|
# nice-to: split longer segments (above 30 sec), merge very-short segments
|
||||||
|
|
||||||
class Segmenter:
|
class Segmenter:
|
||||||
seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec
|
seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec
|
||||||
seg_win_step_sec = 1.0 #: step for segmentation, in sec
|
seg_win_step_sec = 1.0 #: step for segmentation, in sec
|
||||||
|
|||||||
6
sqi.py
6
sqi.py
@@ -28,14 +28,14 @@ class SigQuality:
|
|||||||
|
|
||||||
def __init__(self): pass
|
def __init__(self): pass
|
||||||
|
|
||||||
def get_snr(self, fs, ssf, ssf_threshold, ssf_zxings):
|
def get_snr(self, fs, ssf, ssf_threshold, est_zxings):
|
||||||
"""Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
|
"""Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
|
||||||
sigma = fs * self.gauss_beat_template_sigma_sec
|
sigma = fs * self.gauss_beat_template_sigma_sec
|
||||||
W = int(fs * self.gauss_beat_template_win_sec)
|
W = int(fs * self.gauss_beat_template_win_sec)
|
||||||
gb = gauss(W, W//2, sigma)
|
gb = gauss(W, W//2, sigma)
|
||||||
# place gaussians on estimated beat locations
|
# place gaussians on estimated beat locations
|
||||||
ssf_est = np.zeros(ssf.shape[0])
|
ssf_est = np.zeros(ssf.shape[0])
|
||||||
for i in ssf_zxings:
|
for i in est_zxings:
|
||||||
ssf_est += shift(ssf.shape[0], i, gb)
|
ssf_est += shift(ssf.shape[0], i, gb)
|
||||||
ssf_est /= gb[W//2] # normalize amplitude to 1.0
|
ssf_est /= gb[W//2] # normalize amplitude to 1.0
|
||||||
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
|
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
|
||||||
@@ -49,7 +49,7 @@ class SigQuality:
|
|||||||
sqi_noise = np.sum(sqi_pen * (ssf**2))
|
sqi_noise = np.sum(sqi_pen * (ssf**2))
|
||||||
|
|
||||||
# noise is everywhere, while signal is only around detected peaks - correct for this.
|
# noise is everywhere, while signal is only around detected peaks - correct for this.
|
||||||
goal_density = np.mean(np.clip(2*sigma / np.diff(ssf_zxings), a_min=0, a_max=1))
|
goal_density = np.mean(np.clip(2*sigma / np.diff(est_zxings), a_min=0, a_max=1))
|
||||||
sqi_goal /= goal_density
|
sqi_goal /= goal_density
|
||||||
sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise))
|
sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user