diff --git a/beat.py b/beat.py index 42fd664..ca2fbc1 100644 --- a/beat.py +++ b/beat.py @@ -1,4 +1,5 @@ import numpy as np +import matplotlib.pyplot as plt # for debug only class SsfZxing: """ @@ -7,18 +8,16 @@ class SsfZxing: """ t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise) # these two depend on each other. - t_range = 0.024 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate + t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate sw_sec = 0.04 #: upslope width in sec (for SSF function) - ssf_rel_thres = 3 #: magic number from Zong 2003, to scale mean SSF amplitude + ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th' def __init__(self): pass - def _ssf_det_zxings(self, fs, ssf): + def _ssf_det_zxings(self, fs, ssf, ssf_th): """detect threshold crossings in 'ssf' signal.""" i_holdoff = int(self.t_holdoff * fs) - # TODO: check if we need lowpass instead of mean for 'ssf_th' - ssf_th = self.ssf_rel_thres * np.mean(ssf) # threshold crossing ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1)) ssf_pks = np.pad(ssf_pk[:-1], (1,0)) @@ -31,7 +30,7 @@ class SsfZxing: i_range = int(self.t_range * fs) for i in np.arange(i_range, ssf_z.shape[0]-i_range-1): if ssf_z[i]: - rise = ssf[i+i_range] - ssf[i-i_range] + rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i]) if rise < ssf_th * self.ssf_rel_rise: ssf_z[i] = 0 ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise @@ -43,13 +42,15 @@ class SsfZxing: def _ssf_function(self, fs, y): """sum-slope function.""" sw = int(self.sw_sec*fs) - duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window #ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!) duks = np.pad(np.cumsum(duk), (0, sw)) duks_r = np.roll(duks, sw) ssf = (duks - duks_r)[:-sw] - return ssf + # compute threshold + # TODO: check if we need lowpass instead of mean for 'ssf_th' + ssf_th = self.ssf_rel_thres * np.mean(ssf) + return ssf, ssf_th def get_mae_dist(ibis): @@ -128,6 +129,11 @@ def get_mae_err(fs, freq, phase, act_ibis, debug=False): # (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned) mae += get_mae_err_1(fs, freq, phase, act_ibis, debug) mae += get_mae_err_2(fs, freq, phase, act_ibis, debug) + # TODO: may need to weight these two differently + # TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24] + # TODO: (check: is match always slightly to the left of the trough / smooth minimum?) + # TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??) + # (or even penalize differently instead of adding dir1 and dir2) return mae class RegularBeatFinder: diff --git a/segmenter.py b/segmenter.py index cda868e..965d469 100644 --- a/segmenter.py +++ b/segmenter.py @@ -9,6 +9,8 @@ def median_filter(a, w): o[i] = np.median(sl) return o +# nice-to: split longer segments (above 30 sec), merge very-short segments + class Segmenter: seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec seg_win_step_sec = 1.0 #: step for segmentation, in sec diff --git a/sqi.py b/sqi.py index 35d92de..d643e8b 100644 --- a/sqi.py +++ b/sqi.py @@ -28,14 +28,14 @@ class SigQuality: def __init__(self): pass - def get_snr(self, fs, ssf, ssf_threshold, ssf_zxings): + def get_snr(self, fs, ssf, ssf_threshold, est_zxings): """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations.""" sigma = fs * self.gauss_beat_template_sigma_sec W = int(fs * self.gauss_beat_template_win_sec) gb = gauss(W, W//2, sigma) # place gaussians on estimated beat locations ssf_est = np.zeros(ssf.shape[0]) - for i in ssf_zxings: + for i in est_zxings: ssf_est += shift(ssf.shape[0], i, gb) ssf_est /= gb[W//2] # normalize amplitude to 1.0 ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center) @@ -49,7 +49,7 @@ class SigQuality: sqi_noise = np.sum(sqi_pen * (ssf**2)) # noise is everywhere, while signal is only around detected peaks - correct for this. - goal_density = np.mean(np.clip(2*sigma / np.diff(ssf_zxings), a_min=0, a_max=1)) + goal_density = np.mean(np.clip(2*sigma / np.diff(est_zxings), a_min=0, a_max=1)) sqi_goal /= goal_density sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise))