import numpy as np import matplotlib.pyplot as plt # for debug only from sqi import gauss # note: may be called ZxingDetector instead? class SsfZxing: """ Find beats in a Sum Slope Function by detecting threshold crossings. See Zong et al, from CINC 2003. """ t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise) # these two depend on each other. t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate sw_sec = 0.04 #: upslope width in sec (for SSF function) ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th' # TODO: C++ impl has diverged. # - refractory period changes # - ssf_th filter with 6-points # - ?? others ?? def __init__(self): pass def _ssf_det_zxings(self, fs, ssf, ssf_th): """detect threshold crossings in 'ssf' signal.""" i_holdoff = int(self.t_holdoff * fs) # threshold crossing ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1)) ssf_pks = np.pad(ssf_pk[:-1], (1,0)) ssf_z = (ssf_pk - ssf_pks) == 1 # holdoff for i in np.arange(ssf_z.shape[0] - i_holdoff): if ssf_z[i]: ssf_z[i+1:i+1+i_holdoff] = 0 # rise amplitude filter (check in 2-3 samples [0.024 sec or so] and compare vs. threshold) i_range = int(self.t_range * fs) for i in np.arange(i_range, ssf_z.shape[0]-i_range-1): if ssf_z[i]: rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i]) if rise < ssf_th * self.ssf_rel_rise: ssf_z[i] = 0 ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise ssf_zxings = np.where(ssf_z)[0] # only integer-index resolution (no interpolation) self.ssf_zxings = ssf_zxings return ssf_zxings def _ssf_function(self, fs, y): """sum-slope function.""" sw = int(self.sw_sec*fs) duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window #ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!) duks = np.pad(np.cumsum(duk), (0, sw)) duks_r = np.roll(duks, sw) ssf = (duks - duks_r)[:-sw] # compute threshold # TODO: check if we need lowpass instead of mean for 'ssf_th' ssf_th = self.ssf_rel_thres * np.mean(ssf) self.ssf, self.ssf_th = ssf, ssf_th return ssf, ssf_th def debug_plot(self, i1, i2): ssf, ssf_th, ssf_zxings = self.ssf, self.ssf_th, self.ssf_zxings ssf_slice = ssf[i1:i2] ssf_th_slice = ssf_th[i1:i2] if isinstance(ssf_th, np.ndarray) else ssf_th plt.figure(figsize=(8, 2)) plt.plot(ssf_slice) plt.plot(np.arange(ssf_slice.shape[0]), np.ones(ssf_slice.shape[0]) * ssf_th_slice) plt.scatter(ssf_zxings[i1:i2], np.ones(ssf_zxings[i1:i2].shape[0]) * ssf_th_slice, c='r') def get_mae_dist(ibis): """make triangular wave between beats, representing absolute beat placement error.""" dist = np.zeros(np.sum(ibis)+1) # fill with distances p_i, i = 0, 0 for ibi in ibis: i += ibi l_c = ibi // 2 dist[p_i:p_i+l_c+ibi%2] = np.arange(l_c+ibi%2) dist[p_i+l_c+ibi%2:i] = 1 + np.arange(ibi-l_c-ibi%2)[::-1] p_i = i return dist def get_mae_err_1(fs, freq, phase, act_ibis, debug=False): """ compute beat placement error between zero crossings of given sine wave (estimate) and actually detected beats. computes 'dist' from 'act_ibis' (direction 1). """ # sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ... # 2 pi freq n / fs + phase = k * pi # 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...) # n = (k - phase/pi) * fs / (2 freq) # k_max = 2 freq n_max / fs + phase/pi N = np.sum(act_ibis)+1 phase %= 2*np.pi k_max = 2 * freq * N / fs + phase/np.pi + 1 i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int) est_ibis = np.diff(i_est_zxing) dist = get_mae_dist(est_ibis) assert np.sum(est_ibis) > np.sum(act_ibis) if debug: plt.figure(figsize=(8,2)) plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase)) plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r') plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y') mae_errs = dist[np.cumsum(act_ibis)] return np.sum(mae_errs) def get_mae_err_2(fs, freq, phase, act_ibis, debug=False): """ compute beat placement error between zero crossings of given sine wave (estimate) and actually detected beats. computes 'dist' from est_zxings (direction 2) """ dist = np.pad(get_mae_dist(act_ibis), (0,1)) dist[-1] = 1 # sentinel for rounding errors in np.round() # sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ... # 2 pi freq n / fs + phase = k * pi # 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...) # n = (k - phase/pi) * fs / (2 freq) # k_max = 2 freq n_max / fs + phase/pi N = np.sum(act_ibis)+1 phase %= 2*np.pi k_max = 2 * freq * N / fs + phase/np.pi i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int) if debug: plt.figure(figsize=(8,2)) plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase)) plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r') plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y') mae_errs = dist[i_est_zxing] return np.sum(mae_errs) def get_mae_err(fs, freq, phase, act_ibis, debug=False): """ compute beat placement error between zero crossings of given sine wave and actually detected beats. """ mae = 0 # we compute both directions, to properly handle "missing beats" # (in direction 1, an optimal solution is "fully dense", "freq = fs/2", because each 'act_beat' will align to the next index) # (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned) mae += get_mae_err_1(fs, freq, phase, act_ibis, debug) mae += get_mae_err_2(fs, freq, phase, act_ibis, debug) # TODO: may need to weight these two differently # TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24] # TODO: (check: is match always slightly to the left of the trough / smooth minimum?) # TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??) # (or even penalize differently instead of adding dir1 and dir2) return mae class RegularBeatFinder: """ Optimize a beat frequency by placing a regular beat over the detected beats. Always finds a beat within f1..f2 range, ignoring whether the detected beat is an upper harmonic. """ num_freqs = 200 #: number of freq steps to evaluate range_f1 = 0.5 #: lowest detection frequency in Hz range_f2 = 4.0 #: highest detection frequency in Hz f_bias_width = 0.2 #: gaussian std relative to num_freqs within f1..f2 def __init__(self): pass def find_beat(self, fs, ssf_zxings, f_hint=None, debug_fe=False, debug_i=None): """Find the optimal beat frequency.""" act_ibis = np.diff(ssf_zxings) # evaluate mean absolute errors for all frequencies freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i) # bias with f_hint - once we know the beat freq, make it more likely for it to be found everywhere nf, f1, f2 = RegularBeatFinder.num_freqs, RegularBeatFinder.range_f2, RegularBeatFinder.range_f1 bias = gauss( nf, (f_hint - f1) / (f2 - f1) * nf, RegularBeatFinder.f_bias_width * nf ) freqs_bias = 1.0 / (np.max(bias)+bias) # make 'f_hint' at most 2x more likely -- (1+bias) if normalized freq_errs *= freqs_bias # if debug_fe: plt.figure(figsize=(8,2)) plt.plot(freqs, freq_errs) # get optimal frequency i_freq = np.argmin(freq_errs) # compute normalized error (mean beat placement error in samples) N = np.sum(act_ibis)+1 k_max = 2 * freqs[i_freq] * N / fs # + phase/np.pi norm_err = freq_errs[i_freq] / k_max # return freqs[i_freq], norm_err def freq_to_est_ibis(self, fs, freq, N): phase = 0 k_max = 2 * freq * N / fs + phase/np.pi i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int) return np.diff(i_est_zxing) def _get_opt_ibi_freq_2(self, fs, act_ibis, debug_i=None): """get mean absolute errors for a range of frequencies.""" assert self.range_f2 < fs/8, "it is at most useful to go until f = fs/8" # (and even then, get_mae_dist() triangle will not be very useful) t_freqs = np.linspace(self.range_f1, self.range_f2, self.num_freqs) fe = np.zeros(self.num_freqs) for i, f in enumerate(t_freqs): fe[i] = get_mae_err(fs, f, 0.0, act_ibis) if debug_i is not None: # plot colors: # y: estimate # r: actual get_mae_err(fs, t_freqs[debug_i], 0.0, act_ibis, debug=True) return t_freqs, fe