// // Created by david on 04.03.2026. // #include #include "npy.hpp" //#include #include #include "pd_signal.h" #include "ssf_filter.h" #include "test_helpers.h" using namespace pd_signal; TEST(SignalTest, interp_t1) { //EXPECT_EQ(); std::vector xp { 1.0, 2.0, 4.0, 5.0, 5.01, 6.0 }; std::vector fp { 1.0, 2.0, 4.0, 5.0, 5.01, 7.0 }; std::vector x { 0.9, 1.0, 1.5, 2.0, 3.9, 4.1, 5.0, 5.5, 6.0, 6.1 }; std::vectory_e{ 1.0, 1.0, 1.5, 2.0, 3.9, 4.1, 5.0, 5.99494949495, 7.0, 7.0 }; size_t N = x.size(); // 5.99494949495 = (5.5-5.01)/0.99*(7-5.01)+5.01 std::vector y; interp(y, x, xp, fp); // assert y == y_e, nb. upto 5 digits double abs_err = 1e-5; for (size_t i = 0; i < N; i++) { ASSERT_NEAR(y_e[i], y[i], abs_err + 1e-9 * i); } } TEST(SignalTest, ranges) { const double abs_error = 1e-5; std::vector i; size_t N = 3; linspace(i, 0, (int) (N-1), (int) N, false); ASSERT_NEAR(0.0, i[0], abs_error); ASSERT_NEAR(1.0, i[1], abs_error); ASSERT_NEAR(2.0, i[2], abs_error); } class DebugRunningQuality : public RunningQuality { protected: virtual void dispatchLocked() { locked = true; } virtual void dispatchBeat(int idx, bool good, double posCorr) { corrs.push_back(posCorr); } bool locked; std::vector corrs; public: DebugRunningQuality(): locked(false) {} explicit DebugRunningQuality(bool disableSsf): RunningQuality(disableSsf), locked(false) {} virtual ~DebugRunningQuality() {} bool isLocked() { return locked; } std::vector getCorrs() { return corrs; } std::vector getBeatTemplate() { return this->beatTemplate; } }; /* TEST(SignalTest, resample_same_len) { std::vector rawBeat {0.0, 0.3, 0.9, 1.0, 0.7, 0.5, 0.1}; std::vector beat; resample(beat, rawBeat, 7); // TODO ASSERT_NEAR(0.3, beat[1], 1e-6); } */ /* TEST(SignalTest, resample_same_len) { std::vector rawBeat {0.0, 0.3, 0.9, 1.0, 0.7, 0.5, 0.1}; std::vector beat; resample(beat, rawBeat, 7); // TODO //ASSERT_NEAR(0.3, beat[1], 1e-6); for (int i = 0; i < 7; i++) std::cout << "b[" << i << "]=" << beat[i] << std::endl; } */ TEST(SignalTest, RunningQuality_t1) { DebugRunningQuality sqi(true); std::vector a {0.0, 0.3, 0.9, 1.0, 0.7, 0.5, 0.1}; std::vector b {0.0, 0.3, 0.9, 1.0, 0.5, 0.5, 0.1}; std::vector c {0.0, 0.3, 0.9, 1.0, 0.9, 0.5, 0.1}; std::vector d {0.0, 0.3, 0.9, 1.0, 0.7, 0.4, 0.1}; sqi.append(a, a); sqi.append(b, b); sqi.append(c, c); EXPECT_FALSE(sqi.isLocked()); sqi.append(d, d); EXPECT_TRUE(sqi.isLocked()); ASSERT_EQ(1, sqi.getCorrs().size()); double norm = sqrt((0.3*0.3 + 0.9*0.9 + 1.0 + 0.7*0.7 + 0.5*0.5 + 0.1*0.1) // \sum x_i^2 * (0.3*0.3 + 0.9*0.9 + 1.0 + 0.7*0.7 + 0.4*0.4 + 0.1*0.1)); // \sum y_i^2 double num = (0.3*0.3 + 0.9*0.9 + 1.0 + 0.7*0.7 + 0.5*0.4 + 0.1*0.1); // \sum x_i * y_i //ASSERT_NEAR(0.3, sqi.getBeatTemplate()[1], 1e-6); //ASSERT_NEAR(0.7, sqi.getBeatTemplate()[4], 1e-6); // nb. resampled! ASSERT_NEAR(num/norm, sqi.getCorrs()[0], 1e-3); } TEST(SignalTest, RunningQuality_t2) { npy::npy_data acc = npy::read_npy("test3/ssf_t3_acc.npy"); std::vector signal = fetch_y_axis(acc); #if (FPS != 60) #error "FPS must currently be 60, as highpass taps are pre-computed for that value" #endif // TODO: SQI: cehck input file // TODO: SQI: print debug values corr,idx, checkedSsf // Butterworth filter: order=5, fc=0.5, fs=60, btype='highpass' std::vector b {0.91875845, -4.59379227, 9.18758454, -9.18758454, 4.59379227, -0.91875845}; std::vector a {1. , -4.83056552, 9.33652742, -9.02545247, 4.36360803, -0.8441171}; IirFilter filter(b, a); //std::cerr << "before stage 1" << std::endl; // Stage 1: high-pass auto y = apply_filter(filter, signal); Filt f_neg(1, 0, 0, std::vector {-1.0}); auto y_neg = apply_filter(f_neg, y); //std::cerr << "before stage 2" << std::endl; // Stage 2: sum slope function const size_t upslope_width = 4; SsfFilter f_ssf(upslope_width); auto ssf = apply_filter(f_ssf, y_neg); //std::cerr << "before stage 3" << std::endl; // Stage 3: threshold detection const size_t len_refr = (size_t) (FPS / (MAX_BPM / 60)); DebugSsfStepDetectorThreshold f_ssd_thr(len_refr); auto ssf_threshold = apply_filter(f_ssd_thr, ssf); //std::cerr << "before writing results 1 and doing step detection" << std::endl; npy_save("test2/ssf_t3_ssf_threshold.npy", ssf_threshold); SsfStepDetector f_ssd(len_refr); auto steps = apply_filter(f_ssd, ssf); //std::cerr << "before writing results 2" << std::endl; npy_save("test2/ssf_t3_steps.npy", steps); // Debug SQI DebugRunningQuality sqi; std::vector beat_buf; std::vector ssf_buf; // y, ssf size_t N = y.size(); for (size_t i = 0; i < N; i++) { if (steps[i] == 1.0) { sqi.append(beat_buf, ssf_buf); beat_buf.clear(); ssf_buf.clear(); } beat_buf.push_back(y[i]); ssf_buf.push_back(ssf[i]); } EXPECT_TRUE(sqi.isLocked()); EXPECT_TRUE(sqi.getCorrs().size() > 50); std::vector corrs(sqi.getCorrs()); npy_save("test3/ssf_t3_sqi_corrs.npy", corrs); }