diff --git a/beat.py b/beat.py new file mode 100644 index 0000000..42fd664 --- /dev/null +++ b/beat.py @@ -0,0 +1,180 @@ +import numpy as np + +class SsfZxing: + """ + Find beats in a Sum Slope Function by detecting threshold crossings. + See Zong et al, from CINC 2003. + """ + t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise) + # these two depend on each other. + t_range = 0.024 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate + sw_sec = 0.04 #: upslope width in sec (for SSF function) + ssf_rel_thres = 3 #: magic number from Zong 2003, to scale mean SSF amplitude + ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th' + + def __init__(self): pass + + def _ssf_det_zxings(self, fs, ssf): + """detect threshold crossings in 'ssf' signal.""" + i_holdoff = int(self.t_holdoff * fs) + # TODO: check if we need lowpass instead of mean for 'ssf_th' + ssf_th = self.ssf_rel_thres * np.mean(ssf) + # threshold crossing + ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1)) + ssf_pks = np.pad(ssf_pk[:-1], (1,0)) + ssf_z = (ssf_pk - ssf_pks) == 1 + # holdoff + for i in np.arange(ssf_z.shape[0] - i_holdoff): + if ssf_z[i]: + ssf_z[i+1:i+1+i_holdoff] = 0 + # rise amplitude filter (check in 2-3 samples [0.024 sec or so] and compare vs. threshold) + i_range = int(self.t_range * fs) + for i in np.arange(i_range, ssf_z.shape[0]-i_range-1): + if ssf_z[i]: + rise = ssf[i+i_range] - ssf[i-i_range] + if rise < ssf_th * self.ssf_rel_rise: + ssf_z[i] = 0 + ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise + ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise + ssf_zxings = np.where(ssf_z)[0] + # only integer-index resolution (no interpolation) + return ssf_zxings + + def _ssf_function(self, fs, y): + """sum-slope function.""" + sw = int(self.sw_sec*fs) + + duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window + #ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!) + duks = np.pad(np.cumsum(duk), (0, sw)) + duks_r = np.roll(duks, sw) + ssf = (duks - duks_r)[:-sw] + return ssf + + +def get_mae_dist(ibis): + """make triangular wave between beats, representing absolute beat placement error.""" + dist = np.zeros(np.sum(ibis)+1) + # fill with distances + p_i, i = 0, 0 + for ibi in ibis: + i += ibi + l_c = ibi // 2 + dist[p_i:p_i+l_c+ibi%2] = np.arange(l_c+ibi%2) + dist[p_i+l_c+ibi%2:i] = 1 + np.arange(ibi-l_c-ibi%2)[::-1] + p_i = i + return dist + + +def get_mae_err_1(fs, freq, phase, act_ibis, debug=False): + """ + compute beat placement error between zero crossings of given sine wave (estimate) + and actually detected beats. computes 'dist' from 'act_ibis' (direction 1). + """ + # sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ... + # 2 pi freq n / fs + phase = k * pi + # 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...) + # n = (k - phase/pi) * fs / (2 freq) + # k_max = 2 freq n_max / fs + phase/pi + N = np.sum(act_ibis)+1 + phase %= 2*np.pi + k_max = 2 * freq * N / fs + phase/np.pi + 1 + i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int) + est_ibis = np.diff(i_est_zxing) + dist = get_mae_dist(est_ibis) + assert np.sum(est_ibis) > np.sum(act_ibis) + if debug: + plt.figure(figsize=(8,2)) + plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase)) + plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r') + plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y') + mae_errs = dist[np.cumsum(act_ibis)] + return np.sum(mae_errs) + + +def get_mae_err_2(fs, freq, phase, act_ibis, debug=False): + """ + compute beat placement error between zero crossings of given sine wave (estimate) + and actually detected beats. computes 'dist' from est_zxings (direction 2) + """ + dist = np.pad(get_mae_dist(act_ibis), (0,1)) + dist[-1] = 1 # sentinel for rounding errors in np.round() + # sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ... + # 2 pi freq n / fs + phase = k * pi + # 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...) + # n = (k - phase/pi) * fs / (2 freq) + # k_max = 2 freq n_max / fs + phase/pi + N = np.sum(act_ibis)+1 + phase %= 2*np.pi + k_max = 2 * freq * N / fs + phase/np.pi + i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int) + if debug: + plt.figure(figsize=(8,2)) + plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase)) + plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r') + plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y') + mae_errs = dist[i_est_zxing] + return np.sum(mae_errs) + + +def get_mae_err(fs, freq, phase, act_ibis, debug=False): + """ + compute beat placement error between zero crossings of given sine wave + and actually detected beats. + """ + mae = 0 + # we compute both directions, to properly handle "missing beats" + # (in direction 1, an optimal solution is "fully dense", "freq = fs/2", because each 'act_beat' will align to the next index) + # (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned) + mae += get_mae_err_1(fs, freq, phase, act_ibis, debug) + mae += get_mae_err_2(fs, freq, phase, act_ibis, debug) + return mae + +class RegularBeatFinder: + """ + Optimize a beat frequency by placing a regular beat over the detected beats. + Always finds a beat within f1..f2 range, + ignoring whether the detected beat is an upper harmonic. + """ + num_freqs = 200 #: number of freq steps to evaluate + range_f1 = 0.5 #: lowest detection frequency in Hz + range_f2 = 4.0 #: highest detection frequency in Hz + + def __init__(self): pass + + def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None): + """Find the optimal beat frequency.""" + act_ibis = np.diff(ssf_zxings) + # evaluate mean absolute errors for all frequencies + freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i) + if debug_fe: + plt.figure(figsize=(8,2)) + plt.plot(freqs, freq_errs) + # get optimal frequency + i_freq = np.argmin(freq_errs) + # compute normalized error (mean beat placement error in samples) + N = np.sum(act_ibis)+1 + k_max = 2 * freqs[i_freq] * N / fs # + phase/np.pi + norm_err = freq_errs[i_freq] / k_max + # + return freqs[i_freq], norm_err + + def freq_to_est_ibis(self, fs, freq, N): + phase = 0 + k_max = 2 * freq * N / fs + phase/np.pi + i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int) + return np.diff(i_est_zxing) + + def _get_opt_ibi_freq_2(self, fs, act_ibis, debug_i=None): + """get mean absolute errors for a range of frequencies.""" + assert self.range_f2 < fs/8, "it is at most useful to go until f = fs/8" # (and even then, get_mae_dist() triangle will not be very useful) + t_freqs = np.linspace(self.range_f1, self.range_f2, self.num_freqs) + fe = np.zeros(self.num_freqs) + for i, f in enumerate(t_freqs): + fe[i] = get_mae_err(fs, f, 0.0, act_ibis) + if debug_i is not None: + # plot colors: + # y: estimate + # r: actual + get_mae_err(fs, t_freqs[debug_i], 0.0, act_ibis, debug=True) + return t_freqs, fe diff --git a/requirements.txt b/requirements.txt index 03df617..478a087 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,5 @@ authomatic requests numpy scipy +scikit-learn +hsh-signal \ No newline at end of file diff --git a/segmenter.py b/segmenter.py new file mode 100644 index 0000000..cda868e --- /dev/null +++ b/segmenter.py @@ -0,0 +1,63 @@ +import numpy as np +from sklearn.cluster import KMeans + +def median_filter(a, w): + ap = np.pad(a, (w//2, w//2), mode='edge') + o = np.zeros(a.shape[0]) + for i in np.arange(a.shape[0]): + sl = ap[i:i+w] + o[i] = np.median(sl) + return o + +class Segmenter: + seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec + seg_win_step_sec = 1.0 #: step for segmentation, in sec + n_clusters = 8 #: clusters for KMeans algorithm + seg_filt_win_sec = 20.0 #: median filter width for smoothing segments + + def __init__(self): pass + + def get_segment_boundaries(self, fs, guitar): + """split the spectral power signal 'guitar' into stochastically similar segments.""" + segment_ids = self.get_segments(fs, guitar) + stxs = np.diff(segment_ids) != 0 + i_stxs = np.where(stxs)[0] + return i_stxs + + def get_segments(self, fs, guitar): + """split the spectral power signal 'guitar' into stochastically similar segments.""" + seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec) + seg_guitar_data = self._sig_stochastics(fs, guitar) + X = np.vstack(( + seg_guitar_data[:,0]*1.4, + np.sqrt(np.sum(seg_guitar_data[:,1:]**2, axis=1)) + )).T + # cluster by stochastic characteristics + kmeans = KMeans(n_clusters=self.n_clusters, random_state=0, n_init="auto").fit(X) + segment_ids = np.floor(median_filter(kmeans.labels_, seg_filt_win)).astype(int) + # up-sample segment id assignment + iidx = np.linspace(0, segment_ids.shape[0], guitar.shape[0], endpoint=False).astype(int) + return segment_ids[iidx] + + def _sig_stochastics(self, fs, y): + """compute the stochastic moments of the signal. normalized.""" + seg_win_size = int(self.seg_win_size_sec * fs) + seg_win_step = int(self.seg_win_step_sec * fs) + # + seg_y_data = np.zeros((y.shape[0] // seg_win_step, 4)) + y_pad = np.pad(y, (seg_win_size // 2, seg_win_size // 2)) + y_0_max, y_0_mean = np.max(y), np.mean(y) + y_1_max = np.max(np.mean((y - y_0_mean)**2)) + y_2_max = np.max(np.mean(np.abs((y - y_0_mean)**3))) + y_3_max = np.max(np.mean((y - y_0_mean)**4)) + wo = int(self.seg_win_size_sec/self.seg_win_step_sec) + for i in np.arange(wo//2, y.shape[0] // seg_win_step - wo//2): + i_c = int((i+0.5)*seg_win_step) + y_slice = y_pad[i_c-seg_win_size//2:i_c+seg_win_size//2] + mean = np.mean(y_slice) + seg_y_data[i,0] = mean / y_0_max + seg_y_data[i,1] = np.mean((y_slice - mean)**2) / y_1_max + seg_y_data[i,2] = np.mean(np.abs((y_slice - mean)**3)) / y_2_max / 2 + seg_y_data[i,3] = np.mean((y_slice - mean)**4) / y_3_max / 4 + # + return seg_y_data diff --git a/sqi.py b/sqi.py new file mode 100644 index 0000000..35d92de --- /dev/null +++ b/sqi.py @@ -0,0 +1,56 @@ +import numpy as np + + +def gauss(N, mu, sigma): + x = np.arange(N) + norm = sigma * np.sqrt(2*np.pi) + return np.exp(-1/2 * (x - mu)**2 / sigma**2) / norm + + +def shift(N, n, x): + """shift the center of the signal 'x' to time index 'n'.""" + y = np.zeros(N) + xl = x.shape[0] + s = n - xl // 2 + x_bi, x_ei = np.clip(xl // 2 - n, a_min=0, a_max=xl), np.clip(N + xl - xl // 2 - n - xl % 2, a_min=0, a_max=xl) + y_bi, y_ei = np.clip(n - xl // 2, a_min=0, a_max=N), np.clip(n + xl // 2 + xl % 2, a_min=0, a_max=N) + y[y_bi:y_ei] = x[x_bi:x_ei] + return y + + +class SigQuality: + """ + Compute the Signal-to-Noise Ratio of beats + """ + gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function) + gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function) + #gauss_amplitude = 2.0 + + def __init__(self): pass + + def get_snr(self, fs, ssf, ssf_threshold, ssf_zxings): + """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations.""" + sigma = fs * self.gauss_beat_template_sigma_sec + W = int(fs * self.gauss_beat_template_win_sec) + gb = gauss(W, W//2, sigma) + # place gaussians on estimated beat locations + ssf_est = np.zeros(ssf.shape[0]) + for i in ssf_zxings: + ssf_est += shift(ssf.shape[0], i, gb) + ssf_est /= gb[W//2] # normalize amplitude to 1.0 + ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center) + #sqi_ref = ssf_est * (self.gauss_amplitude * ssf_threshold) # set amplitude + + # penalty term = (where there should be no amplitude, according to gaussians) + sqi_pen = 1.0 - ssf_est + + # sqi = ratio of signal energy in goal vs. in noise + sqi_goal = np.sum(ssf_est * (ssf**2)) + sqi_noise = np.sum(sqi_pen * (ssf**2)) + + # noise is everywhere, while signal is only around detected peaks - correct for this. + goal_density = np.mean(np.clip(2*sigma / np.diff(ssf_zxings), a_min=0, a_max=1)) + sqi_goal /= goal_density + sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise)) + + return sqi