Files
lockstep-api/beat.py

203 lines
8.8 KiB
Python
Raw Normal View History

import numpy as np
import matplotlib.pyplot as plt # for debug only
# note: may be called ZxingDetector instead?
class SsfZxing:
"""
Find beats in a Sum Slope Function by detecting threshold crossings.
See Zong et al, from CINC 2003.
"""
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
# these two depend on each other.
t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
sw_sec = 0.04 #: upslope width in sec (for SSF function)
ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
2026-05-13 05:16:43 +02:00
# TODO: C++ impl has diverged.
# - refractory period changes
# - ssf_th filter with 6-points
# - ?? others ??
def __init__(self): pass
def _ssf_det_zxings(self, fs, ssf, ssf_th):
"""detect threshold crossings in 'ssf' signal."""
i_holdoff = int(self.t_holdoff * fs)
# threshold crossing
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
ssf_z = (ssf_pk - ssf_pks) == 1
# holdoff
for i in np.arange(ssf_z.shape[0] - i_holdoff):
if ssf_z[i]:
ssf_z[i+1:i+1+i_holdoff] = 0
# rise amplitude filter (check in 2-3 samples [0.024 sec or so] and compare vs. threshold)
i_range = int(self.t_range * fs)
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
if ssf_z[i]:
rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i])
if rise < ssf_th * self.ssf_rel_rise:
ssf_z[i] = 0
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_zxings = np.where(ssf_z)[0]
# only integer-index resolution (no interpolation)
2026-05-13 05:16:43 +02:00
self.ssf_zxings = ssf_zxings
return ssf_zxings
def _ssf_function(self, fs, y):
"""sum-slope function."""
sw = int(self.sw_sec*fs)
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
duks = np.pad(np.cumsum(duk), (0, sw))
duks_r = np.roll(duks, sw)
ssf = (duks - duks_r)[:-sw]
# compute threshold
# TODO: check if we need lowpass instead of mean for 'ssf_th'
ssf_th = self.ssf_rel_thres * np.mean(ssf)
2026-05-13 05:16:43 +02:00
self.ssf, self.ssf_th = ssf, ssf_th
return ssf, ssf_th
2026-05-13 05:16:43 +02:00
def debug_plot(self, i1, i2):
ssf, ssf_th, ssf_zxings = self.ssf, self.ssf_th, self.ssf_zxings
ssf_slice = ssf[i1:i2]
ssf_th_slice = ssf_th[i1:i2] if isinstance(ssf_th, np.ndarray) else ssf_th
plt.figure(figsize=(8, 2))
plt.plot(ssf_slice)
plt.plot(np.arange(ssf_slice.shape[0]), np.ones(ssf_slice.shape[0]) * ssf_th_slice)
plt.scatter(ssf_zxings[i1:i2], np.ones(ssf_zxings[i1:i2].shape[0]) * ssf_th_slice, c='r')
def get_mae_dist(ibis):
"""make triangular wave between beats, representing absolute beat placement error."""
dist = np.zeros(np.sum(ibis)+1)
# fill with distances
p_i, i = 0, 0
for ibi in ibis:
i += ibi
l_c = ibi // 2
dist[p_i:p_i+l_c+ibi%2] = np.arange(l_c+ibi%2)
dist[p_i+l_c+ibi%2:i] = 1 + np.arange(ibi-l_c-ibi%2)[::-1]
p_i = i
return dist
def get_mae_err_1(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave (estimate)
and actually detected beats. computes 'dist' from 'act_ibis' (direction 1).
"""
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
# 2 pi freq n / fs + phase = k * pi
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
# n = (k - phase/pi) * fs / (2 freq)
# k_max = 2 freq n_max / fs + phase/pi
N = np.sum(act_ibis)+1
phase %= 2*np.pi
k_max = 2 * freq * N / fs + phase/np.pi + 1
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
est_ibis = np.diff(i_est_zxing)
dist = get_mae_dist(est_ibis)
assert np.sum(est_ibis) > np.sum(act_ibis)
if debug:
plt.figure(figsize=(8,2))
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
mae_errs = dist[np.cumsum(act_ibis)]
return np.sum(mae_errs)
def get_mae_err_2(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave (estimate)
and actually detected beats. computes 'dist' from est_zxings (direction 2)
"""
dist = np.pad(get_mae_dist(act_ibis), (0,1))
dist[-1] = 1 # sentinel for rounding errors in np.round()
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
# 2 pi freq n / fs + phase = k * pi
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
# n = (k - phase/pi) * fs / (2 freq)
# k_max = 2 freq n_max / fs + phase/pi
N = np.sum(act_ibis)+1
phase %= 2*np.pi
k_max = 2 * freq * N / fs + phase/np.pi
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
if debug:
plt.figure(figsize=(8,2))
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
mae_errs = dist[i_est_zxing]
return np.sum(mae_errs)
def get_mae_err(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave
and actually detected beats.
"""
mae = 0
# we compute both directions, to properly handle "missing beats"
# (in direction 1, an optimal solution is "fully dense", "freq = fs/2", because each 'act_beat' will align to the next index)
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
# TODO: may need to weight these two differently
# TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24]
# TODO: (check: is match always slightly to the left of the trough / smooth minimum?)
# TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??)
# (or even penalize differently instead of adding dir1 and dir2)
return mae
class RegularBeatFinder:
"""
Optimize a beat frequency by placing a regular beat over the detected beats.
Always finds a beat within f1..f2 range,
ignoring whether the detected beat is an upper harmonic.
"""
num_freqs = 200 #: number of freq steps to evaluate
range_f1 = 0.5 #: lowest detection frequency in Hz
range_f2 = 4.0 #: highest detection frequency in Hz
def __init__(self): pass
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None):
"""Find the optimal beat frequency."""
act_ibis = np.diff(ssf_zxings)
# evaluate mean absolute errors for all frequencies
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(freqs, freq_errs)
# get optimal frequency
i_freq = np.argmin(freq_errs)
# compute normalized error (mean beat placement error in samples)
N = np.sum(act_ibis)+1
k_max = 2 * freqs[i_freq] * N / fs # + phase/np.pi
norm_err = freq_errs[i_freq] / k_max
#
return freqs[i_freq], norm_err
def freq_to_est_ibis(self, fs, freq, N):
phase = 0
k_max = 2 * freq * N / fs + phase/np.pi
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
return np.diff(i_est_zxing)
def _get_opt_ibi_freq_2(self, fs, act_ibis, debug_i=None):
"""get mean absolute errors for a range of frequencies."""
assert self.range_f2 < fs/8, "it is at most useful to go until f = fs/8" # (and even then, get_mae_dist() triangle will not be very useful)
t_freqs = np.linspace(self.range_f1, self.range_f2, self.num_freqs)
fe = np.zeros(self.num_freqs)
for i, f in enumerate(t_freqs):
fe[i] = get_mae_err(fs, f, 0.0, act_ibis)
if debug_i is not None:
# plot colors:
# y: estimate
# r: actual
get_mae_err(fs, t_freqs[debug_i], 0.0, act_ibis, debug=True)
return t_freqs, fe