feat: Segmenter, RegularBeatFinder, SigQuality
This commit is contained in:
180
beat.py
Normal file
180
beat.py
Normal file
@@ -0,0 +1,180 @@
|
||||
import numpy as np
|
||||
|
||||
class SsfZxing:
|
||||
"""
|
||||
Find beats in a Sum Slope Function by detecting threshold crossings.
|
||||
See Zong et al, from CINC 2003.
|
||||
"""
|
||||
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
|
||||
# these two depend on each other.
|
||||
t_range = 0.024 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
|
||||
sw_sec = 0.04 #: upslope width in sec (for SSF function)
|
||||
ssf_rel_thres = 3 #: magic number from Zong 2003, to scale mean SSF amplitude
|
||||
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
|
||||
|
||||
def __init__(self): pass
|
||||
|
||||
def _ssf_det_zxings(self, fs, ssf):
|
||||
"""detect threshold crossings in 'ssf' signal."""
|
||||
i_holdoff = int(self.t_holdoff * fs)
|
||||
# TODO: check if we need lowpass instead of mean for 'ssf_th'
|
||||
ssf_th = self.ssf_rel_thres * np.mean(ssf)
|
||||
# threshold crossing
|
||||
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
|
||||
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
|
||||
ssf_z = (ssf_pk - ssf_pks) == 1
|
||||
# holdoff
|
||||
for i in np.arange(ssf_z.shape[0] - i_holdoff):
|
||||
if ssf_z[i]:
|
||||
ssf_z[i+1:i+1+i_holdoff] = 0
|
||||
# rise amplitude filter (check in 2-3 samples [0.024 sec or so] and compare vs. threshold)
|
||||
i_range = int(self.t_range * fs)
|
||||
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
|
||||
if ssf_z[i]:
|
||||
rise = ssf[i+i_range] - ssf[i-i_range]
|
||||
if rise < ssf_th * self.ssf_rel_rise:
|
||||
ssf_z[i] = 0
|
||||
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
||||
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
||||
ssf_zxings = np.where(ssf_z)[0]
|
||||
# only integer-index resolution (no interpolation)
|
||||
return ssf_zxings
|
||||
|
||||
def _ssf_function(self, fs, y):
|
||||
"""sum-slope function."""
|
||||
sw = int(self.sw_sec*fs)
|
||||
|
||||
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
|
||||
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
|
||||
duks = np.pad(np.cumsum(duk), (0, sw))
|
||||
duks_r = np.roll(duks, sw)
|
||||
ssf = (duks - duks_r)[:-sw]
|
||||
return ssf
|
||||
|
||||
|
||||
def get_mae_dist(ibis):
|
||||
"""make triangular wave between beats, representing absolute beat placement error."""
|
||||
dist = np.zeros(np.sum(ibis)+1)
|
||||
# fill with distances
|
||||
p_i, i = 0, 0
|
||||
for ibi in ibis:
|
||||
i += ibi
|
||||
l_c = ibi // 2
|
||||
dist[p_i:p_i+l_c+ibi%2] = np.arange(l_c+ibi%2)
|
||||
dist[p_i+l_c+ibi%2:i] = 1 + np.arange(ibi-l_c-ibi%2)[::-1]
|
||||
p_i = i
|
||||
return dist
|
||||
|
||||
|
||||
def get_mae_err_1(fs, freq, phase, act_ibis, debug=False):
|
||||
"""
|
||||
compute beat placement error between zero crossings of given sine wave (estimate)
|
||||
and actually detected beats. computes 'dist' from 'act_ibis' (direction 1).
|
||||
"""
|
||||
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
|
||||
# 2 pi freq n / fs + phase = k * pi
|
||||
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
|
||||
# n = (k - phase/pi) * fs / (2 freq)
|
||||
# k_max = 2 freq n_max / fs + phase/pi
|
||||
N = np.sum(act_ibis)+1
|
||||
phase %= 2*np.pi
|
||||
k_max = 2 * freq * N / fs + phase/np.pi + 1
|
||||
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
||||
est_ibis = np.diff(i_est_zxing)
|
||||
dist = get_mae_dist(est_ibis)
|
||||
assert np.sum(est_ibis) > np.sum(act_ibis)
|
||||
if debug:
|
||||
plt.figure(figsize=(8,2))
|
||||
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
|
||||
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
|
||||
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
|
||||
mae_errs = dist[np.cumsum(act_ibis)]
|
||||
return np.sum(mae_errs)
|
||||
|
||||
|
||||
def get_mae_err_2(fs, freq, phase, act_ibis, debug=False):
|
||||
"""
|
||||
compute beat placement error between zero crossings of given sine wave (estimate)
|
||||
and actually detected beats. computes 'dist' from est_zxings (direction 2)
|
||||
"""
|
||||
dist = np.pad(get_mae_dist(act_ibis), (0,1))
|
||||
dist[-1] = 1 # sentinel for rounding errors in np.round()
|
||||
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
|
||||
# 2 pi freq n / fs + phase = k * pi
|
||||
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
|
||||
# n = (k - phase/pi) * fs / (2 freq)
|
||||
# k_max = 2 freq n_max / fs + phase/pi
|
||||
N = np.sum(act_ibis)+1
|
||||
phase %= 2*np.pi
|
||||
k_max = 2 * freq * N / fs + phase/np.pi
|
||||
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
||||
if debug:
|
||||
plt.figure(figsize=(8,2))
|
||||
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
|
||||
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
|
||||
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
|
||||
mae_errs = dist[i_est_zxing]
|
||||
return np.sum(mae_errs)
|
||||
|
||||
|
||||
def get_mae_err(fs, freq, phase, act_ibis, debug=False):
|
||||
"""
|
||||
compute beat placement error between zero crossings of given sine wave
|
||||
and actually detected beats.
|
||||
"""
|
||||
mae = 0
|
||||
# we compute both directions, to properly handle "missing beats"
|
||||
# (in direction 1, an optimal solution is "fully dense", "freq = fs/2", because each 'act_beat' will align to the next index)
|
||||
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
|
||||
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
|
||||
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
|
||||
return mae
|
||||
|
||||
class RegularBeatFinder:
|
||||
"""
|
||||
Optimize a beat frequency by placing a regular beat over the detected beats.
|
||||
Always finds a beat within f1..f2 range,
|
||||
ignoring whether the detected beat is an upper harmonic.
|
||||
"""
|
||||
num_freqs = 200 #: number of freq steps to evaluate
|
||||
range_f1 = 0.5 #: lowest detection frequency in Hz
|
||||
range_f2 = 4.0 #: highest detection frequency in Hz
|
||||
|
||||
def __init__(self): pass
|
||||
|
||||
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None):
|
||||
"""Find the optimal beat frequency."""
|
||||
act_ibis = np.diff(ssf_zxings)
|
||||
# evaluate mean absolute errors for all frequencies
|
||||
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
|
||||
if debug_fe:
|
||||
plt.figure(figsize=(8,2))
|
||||
plt.plot(freqs, freq_errs)
|
||||
# get optimal frequency
|
||||
i_freq = np.argmin(freq_errs)
|
||||
# compute normalized error (mean beat placement error in samples)
|
||||
N = np.sum(act_ibis)+1
|
||||
k_max = 2 * freqs[i_freq] * N / fs # + phase/np.pi
|
||||
norm_err = freq_errs[i_freq] / k_max
|
||||
#
|
||||
return freqs[i_freq], norm_err
|
||||
|
||||
def freq_to_est_ibis(self, fs, freq, N):
|
||||
phase = 0
|
||||
k_max = 2 * freq * N / fs + phase/np.pi
|
||||
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
||||
return np.diff(i_est_zxing)
|
||||
|
||||
def _get_opt_ibi_freq_2(self, fs, act_ibis, debug_i=None):
|
||||
"""get mean absolute errors for a range of frequencies."""
|
||||
assert self.range_f2 < fs/8, "it is at most useful to go until f = fs/8" # (and even then, get_mae_dist() triangle will not be very useful)
|
||||
t_freqs = np.linspace(self.range_f1, self.range_f2, self.num_freqs)
|
||||
fe = np.zeros(self.num_freqs)
|
||||
for i, f in enumerate(t_freqs):
|
||||
fe[i] = get_mae_err(fs, f, 0.0, act_ibis)
|
||||
if debug_i is not None:
|
||||
# plot colors:
|
||||
# y: estimate
|
||||
# r: actual
|
||||
get_mae_err(fs, t_freqs[debug_i], 0.0, act_ibis, debug=True)
|
||||
return t_freqs, fe
|
||||
Reference in New Issue
Block a user