2026-04-27 22:13:02 +02:00
|
|
|
import numpy as np
|
2026-04-28 10:10:52 +02:00
|
|
|
import matplotlib.pyplot as plt # for debug only
|
2026-04-27 22:13:02 +02:00
|
|
|
|
|
|
|
|
class SsfZxing:
|
|
|
|
|
"""
|
|
|
|
|
Find beats in a Sum Slope Function by detecting threshold crossings.
|
|
|
|
|
See Zong et al, from CINC 2003.
|
|
|
|
|
"""
|
|
|
|
|
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
|
|
|
|
|
# these two depend on each other.
|
2026-04-28 10:10:52 +02:00
|
|
|
t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
|
2026-04-27 22:13:02 +02:00
|
|
|
sw_sec = 0.04 #: upslope width in sec (for SSF function)
|
2026-04-28 10:10:52 +02:00
|
|
|
ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude
|
2026-04-27 22:13:02 +02:00
|
|
|
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
|
|
|
|
|
|
|
|
|
|
def __init__(self): pass
|
|
|
|
|
|
2026-04-28 10:10:52 +02:00
|
|
|
def _ssf_det_zxings(self, fs, ssf, ssf_th):
|
2026-04-27 22:13:02 +02:00
|
|
|
"""detect threshold crossings in 'ssf' signal."""
|
|
|
|
|
i_holdoff = int(self.t_holdoff * fs)
|
|
|
|
|
# threshold crossing
|
|
|
|
|
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
|
|
|
|
|
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
|
|
|
|
|
ssf_z = (ssf_pk - ssf_pks) == 1
|
|
|
|
|
# holdoff
|
|
|
|
|
for i in np.arange(ssf_z.shape[0] - i_holdoff):
|
|
|
|
|
if ssf_z[i]:
|
|
|
|
|
ssf_z[i+1:i+1+i_holdoff] = 0
|
|
|
|
|
# rise amplitude filter (check in 2-3 samples [0.024 sec or so] and compare vs. threshold)
|
|
|
|
|
i_range = int(self.t_range * fs)
|
|
|
|
|
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
|
|
|
|
|
if ssf_z[i]:
|
2026-04-28 10:10:52 +02:00
|
|
|
rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i])
|
2026-04-27 22:13:02 +02:00
|
|
|
if rise < ssf_th * self.ssf_rel_rise:
|
|
|
|
|
ssf_z[i] = 0
|
|
|
|
|
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
|
|
|
|
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
|
|
|
|
ssf_zxings = np.where(ssf_z)[0]
|
|
|
|
|
# only integer-index resolution (no interpolation)
|
|
|
|
|
return ssf_zxings
|
|
|
|
|
|
|
|
|
|
def _ssf_function(self, fs, y):
|
|
|
|
|
"""sum-slope function."""
|
|
|
|
|
sw = int(self.sw_sec*fs)
|
|
|
|
|
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
|
|
|
|
|
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
|
|
|
|
|
duks = np.pad(np.cumsum(duk), (0, sw))
|
|
|
|
|
duks_r = np.roll(duks, sw)
|
|
|
|
|
ssf = (duks - duks_r)[:-sw]
|
2026-04-28 10:10:52 +02:00
|
|
|
# compute threshold
|
|
|
|
|
# TODO: check if we need lowpass instead of mean for 'ssf_th'
|
|
|
|
|
ssf_th = self.ssf_rel_thres * np.mean(ssf)
|
|
|
|
|
return ssf, ssf_th
|
2026-04-27 22:13:02 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_mae_dist(ibis):
|
|
|
|
|
"""make triangular wave between beats, representing absolute beat placement error."""
|
|
|
|
|
dist = np.zeros(np.sum(ibis)+1)
|
|
|
|
|
# fill with distances
|
|
|
|
|
p_i, i = 0, 0
|
|
|
|
|
for ibi in ibis:
|
|
|
|
|
i += ibi
|
|
|
|
|
l_c = ibi // 2
|
|
|
|
|
dist[p_i:p_i+l_c+ibi%2] = np.arange(l_c+ibi%2)
|
|
|
|
|
dist[p_i+l_c+ibi%2:i] = 1 + np.arange(ibi-l_c-ibi%2)[::-1]
|
|
|
|
|
p_i = i
|
|
|
|
|
return dist
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_mae_err_1(fs, freq, phase, act_ibis, debug=False):
|
|
|
|
|
"""
|
|
|
|
|
compute beat placement error between zero crossings of given sine wave (estimate)
|
|
|
|
|
and actually detected beats. computes 'dist' from 'act_ibis' (direction 1).
|
|
|
|
|
"""
|
|
|
|
|
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
|
|
|
|
|
# 2 pi freq n / fs + phase = k * pi
|
|
|
|
|
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
|
|
|
|
|
# n = (k - phase/pi) * fs / (2 freq)
|
|
|
|
|
# k_max = 2 freq n_max / fs + phase/pi
|
|
|
|
|
N = np.sum(act_ibis)+1
|
|
|
|
|
phase %= 2*np.pi
|
|
|
|
|
k_max = 2 * freq * N / fs + phase/np.pi + 1
|
|
|
|
|
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
|
|
|
|
est_ibis = np.diff(i_est_zxing)
|
|
|
|
|
dist = get_mae_dist(est_ibis)
|
|
|
|
|
assert np.sum(est_ibis) > np.sum(act_ibis)
|
|
|
|
|
if debug:
|
|
|
|
|
plt.figure(figsize=(8,2))
|
|
|
|
|
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
|
|
|
|
|
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
|
|
|
|
|
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
|
|
|
|
|
mae_errs = dist[np.cumsum(act_ibis)]
|
|
|
|
|
return np.sum(mae_errs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_mae_err_2(fs, freq, phase, act_ibis, debug=False):
|
|
|
|
|
"""
|
|
|
|
|
compute beat placement error between zero crossings of given sine wave (estimate)
|
|
|
|
|
and actually detected beats. computes 'dist' from est_zxings (direction 2)
|
|
|
|
|
"""
|
|
|
|
|
dist = np.pad(get_mae_dist(act_ibis), (0,1))
|
|
|
|
|
dist[-1] = 1 # sentinel for rounding errors in np.round()
|
|
|
|
|
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
|
|
|
|
|
# 2 pi freq n / fs + phase = k * pi
|
|
|
|
|
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
|
|
|
|
|
# n = (k - phase/pi) * fs / (2 freq)
|
|
|
|
|
# k_max = 2 freq n_max / fs + phase/pi
|
|
|
|
|
N = np.sum(act_ibis)+1
|
|
|
|
|
phase %= 2*np.pi
|
|
|
|
|
k_max = 2 * freq * N / fs + phase/np.pi
|
|
|
|
|
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
|
|
|
|
if debug:
|
|
|
|
|
plt.figure(figsize=(8,2))
|
|
|
|
|
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
|
|
|
|
|
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
|
|
|
|
|
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
|
|
|
|
|
mae_errs = dist[i_est_zxing]
|
|
|
|
|
return np.sum(mae_errs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_mae_err(fs, freq, phase, act_ibis, debug=False):
|
|
|
|
|
"""
|
|
|
|
|
compute beat placement error between zero crossings of given sine wave
|
|
|
|
|
and actually detected beats.
|
|
|
|
|
"""
|
|
|
|
|
mae = 0
|
|
|
|
|
# we compute both directions, to properly handle "missing beats"
|
|
|
|
|
# (in direction 1, an optimal solution is "fully dense", "freq = fs/2", because each 'act_beat' will align to the next index)
|
|
|
|
|
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
|
|
|
|
|
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
|
|
|
|
|
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
|
2026-04-28 10:10:52 +02:00
|
|
|
# TODO: may need to weight these two differently
|
|
|
|
|
# TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24]
|
|
|
|
|
# TODO: (check: is match always slightly to the left of the trough / smooth minimum?)
|
|
|
|
|
# TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??)
|
|
|
|
|
# (or even penalize differently instead of adding dir1 and dir2)
|
2026-04-27 22:13:02 +02:00
|
|
|
return mae
|
|
|
|
|
|
|
|
|
|
class RegularBeatFinder:
|
|
|
|
|
"""
|
|
|
|
|
Optimize a beat frequency by placing a regular beat over the detected beats.
|
|
|
|
|
Always finds a beat within f1..f2 range,
|
|
|
|
|
ignoring whether the detected beat is an upper harmonic.
|
|
|
|
|
"""
|
|
|
|
|
num_freqs = 200 #: number of freq steps to evaluate
|
|
|
|
|
range_f1 = 0.5 #: lowest detection frequency in Hz
|
|
|
|
|
range_f2 = 4.0 #: highest detection frequency in Hz
|
|
|
|
|
|
|
|
|
|
def __init__(self): pass
|
|
|
|
|
|
|
|
|
|
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None):
|
|
|
|
|
"""Find the optimal beat frequency."""
|
|
|
|
|
act_ibis = np.diff(ssf_zxings)
|
|
|
|
|
# evaluate mean absolute errors for all frequencies
|
|
|
|
|
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
|
|
|
|
|
if debug_fe:
|
|
|
|
|
plt.figure(figsize=(8,2))
|
|
|
|
|
plt.plot(freqs, freq_errs)
|
|
|
|
|
# get optimal frequency
|
|
|
|
|
i_freq = np.argmin(freq_errs)
|
|
|
|
|
# compute normalized error (mean beat placement error in samples)
|
|
|
|
|
N = np.sum(act_ibis)+1
|
|
|
|
|
k_max = 2 * freqs[i_freq] * N / fs # + phase/np.pi
|
|
|
|
|
norm_err = freq_errs[i_freq] / k_max
|
|
|
|
|
#
|
|
|
|
|
return freqs[i_freq], norm_err
|
|
|
|
|
|
|
|
|
|
def freq_to_est_ibis(self, fs, freq, N):
|
|
|
|
|
phase = 0
|
|
|
|
|
k_max = 2 * freq * N / fs + phase/np.pi
|
|
|
|
|
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
|
|
|
|
return np.diff(i_est_zxing)
|
|
|
|
|
|
|
|
|
|
def _get_opt_ibi_freq_2(self, fs, act_ibis, debug_i=None):
|
|
|
|
|
"""get mean absolute errors for a range of frequencies."""
|
|
|
|
|
assert self.range_f2 < fs/8, "it is at most useful to go until f = fs/8" # (and even then, get_mae_dist() triangle will not be very useful)
|
|
|
|
|
t_freqs = np.linspace(self.range_f1, self.range_f2, self.num_freqs)
|
|
|
|
|
fe = np.zeros(self.num_freqs)
|
|
|
|
|
for i, f in enumerate(t_freqs):
|
|
|
|
|
fe[i] = get_mae_err(fs, f, 0.0, act_ibis)
|
|
|
|
|
if debug_i is not None:
|
|
|
|
|
# plot colors:
|
|
|
|
|
# y: estimate
|
|
|
|
|
# r: actual
|
|
|
|
|
get_mae_err(fs, t_freqs[debug_i], 0.0, act_ibis, debug=True)
|
|
|
|
|
return t_freqs, fe
|