Compare commits
9 Commits
b0a7202f32
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 02549e5618 | |||
| 3b7bae0a38 | |||
| 041cba8224 | |||
| 729555acc3 | |||
| f3f580f923 | |||
| e349278c06 | |||
| 71f1975a97 | |||
| ee5a1376ee | |||
| e42cddd645 |
11
TODO.md
Normal file
11
TODO.md
Normal file
@@ -0,0 +1,11 @@
|
||||
# TODO
|
||||
|
||||
- tests: beat and guitar synthesizer
|
||||
- generate rhythmic sequence and test the algos
|
||||
.
|
||||
|
||||
O> "2027-04-29 TestApi Bass song.ipynb": [21]
|
||||
- why is ssf continually rising?
|
||||
- because of using 'fs' not 'fsd'
|
||||
.
|
||||
|
||||
231
api.py
231
api.py
@@ -17,19 +17,24 @@
|
||||
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
|
||||
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
from base64 import b64encode
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import json
|
||||
from functools import wraps
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from flask import Flask, request, session, jsonify, make_response, redirect, url_for
|
||||
from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for
|
||||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||||
from authomatic import Authomatic
|
||||
from authomatic.adapters import WerkzeugAdapter
|
||||
from authomatic.providers import oauth2
|
||||
from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP
|
||||
|
||||
import random
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
@@ -111,6 +116,7 @@ ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get(
|
||||
)
|
||||
|
||||
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
|
||||
METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections")
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = os.environ["FLASK_SECRET_KEY"]
|
||||
@@ -153,6 +159,17 @@ def init_db():
|
||||
updated_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS uploaded_metadata (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
spotify_user_id TEXT NOT NULL,
|
||||
track_id TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
version INTEGER NOT NULL,
|
||||
file_name TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -294,9 +311,30 @@ def get_valid_access_token(spotify_user_id: str) -> str:
|
||||
# ----------------------------
|
||||
|
||||
def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict:
|
||||
"""GET a Spotify Web API endpoint and return the parsed JSON body."""
|
||||
"""
|
||||
GET a Spotify Web API endpoint and return the parsed JSON body.
|
||||
|
||||
Retries on HTTP 429 and 503 with exponential backoff and optional Retry-After,
|
||||
so brief Spotify rate limits often clear before we surface an error to the app.
|
||||
"""
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
r = requests.get(url, headers=headers, params=params)
|
||||
backoff_sec = 1.0
|
||||
max_attempts = 8
|
||||
params_eff = params
|
||||
for attempt in range(max_attempts):
|
||||
r = requests.get(url, headers=headers, params=params_eff)
|
||||
if r.status_code in (429, 503) and attempt < max_attempts - 1:
|
||||
wait = backoff_sec
|
||||
ra = r.headers.get("Retry-After")
|
||||
if ra:
|
||||
try:
|
||||
wait = max(wait, float(ra))
|
||||
except ValueError:
|
||||
pass
|
||||
wait = min(wait, 120.0)
|
||||
time.sleep(wait + random.random() * 0.25 * wait)
|
||||
backoff_sec = min(backoff_sec * 2.0, 60.0)
|
||||
continue
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
@@ -605,28 +643,59 @@ old example 1:
|
||||
}
|
||||
"""
|
||||
|
||||
@app.route("/me")
|
||||
def me():
|
||||
|
||||
def spotify_access_token_from_authorization_header():
|
||||
auth = request.headers.get("Authorization", "") or ""
|
||||
if not auth.startswith("Bearer "):
|
||||
return None
|
||||
token = auth[7:].strip()
|
||||
return token or None
|
||||
|
||||
|
||||
def get_request_spotify_access_token():
|
||||
"""
|
||||
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
|
||||
Fallback to Flask session + stored refresh flow (browser).
|
||||
"""
|
||||
bearer = spotify_access_token_from_authorization_header()
|
||||
if bearer:
|
||||
return bearer
|
||||
spotify_user_id = session.get("spotify_user_id")
|
||||
if not spotify_user_id:
|
||||
return jsonify({"ok": False, "error": "Not logged in"}), 401
|
||||
return None
|
||||
return get_valid_access_token(spotify_user_id)
|
||||
|
||||
access_token = get_valid_access_token(spotify_user_id)
|
||||
|
||||
def require_auth(f):
|
||||
@wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
token = get_request_spotify_access_token()
|
||||
if not token:
|
||||
return jsonify({"ok": False, "error": "Not logged in"}), 401
|
||||
g.spotify_access_token = token
|
||||
return f(*args, **kwargs)
|
||||
return wrapped
|
||||
|
||||
|
||||
@app.route("/me")
|
||||
@require_auth
|
||||
def me():
|
||||
access_token = g.spotify_access_token
|
||||
profile = spotify_get_me(access_token)
|
||||
|
||||
row = get_token_record(spotify_user_id)
|
||||
row = get_token_record(profile["id"])
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"profile": profile,
|
||||
"stored_expires_at": row["expires_at"],
|
||||
"stored_expires_at": row["expires_at"] if row else None,
|
||||
})
|
||||
|
||||
|
||||
@app.route("/playlists")
|
||||
@require_auth
|
||||
def playlists():
|
||||
access_token = get_request_spotify_access_token()
|
||||
if not access_token:
|
||||
return jsonify({"ok": False, "error": "Not logged in"}), 401
|
||||
access_token = g.spotify_access_token
|
||||
|
||||
"""
|
||||
user_id = "Sara"
|
||||
@@ -646,32 +715,34 @@ def playlists():
|
||||
|
||||
|
||||
@app.route("/playlists/<playlist_id>")
|
||||
@require_auth
|
||||
def playlist(playlist_id):
|
||||
access_token = get_request_spotify_access_token()
|
||||
if not access_token:
|
||||
return jsonify({"ok": False, "error": "Not logged in"}), 401
|
||||
access_token = g.spotify_access_token
|
||||
|
||||
playlist_data = spotify_get(
|
||||
f"https://api.spotify.com/v1/playlists/{playlist_id}",
|
||||
access_token,
|
||||
)
|
||||
|
||||
# The playlist response embeds a `tracks` paging object whose first page
|
||||
# contains up to 100 items. For playlists larger than that, follow the
|
||||
# dedicated tracks endpoint until exhausted and splice the full list back
|
||||
# into the response.
|
||||
tracks_obj = playlist_data.get("tracks") or {}
|
||||
if tracks_obj.get("next"):
|
||||
all_tracks = spotify_get_paginated(
|
||||
# Full playlist objects use a paging object at `items` (current Spotify shape)
|
||||
# or legacy `tracks`. Follow `next` on whichever is present.
|
||||
paging_key = (
|
||||
"items"
|
||||
if isinstance(playlist_data.get("items"), dict)
|
||||
else "tracks"
|
||||
)
|
||||
paging = playlist_data.get(paging_key) or {}
|
||||
if paging.get("next"):
|
||||
all_items = spotify_get_paginated(
|
||||
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks",
|
||||
access_token,
|
||||
limit=100,
|
||||
)
|
||||
playlist_data["tracks"] = {
|
||||
**tracks_obj,
|
||||
"items": all_tracks,
|
||||
playlist_data[paging_key] = {
|
||||
**paging,
|
||||
"items": all_items,
|
||||
"offset": 0,
|
||||
"limit": len(all_tracks),
|
||||
"limit": len(all_items),
|
||||
"next": None,
|
||||
"previous": None,
|
||||
}
|
||||
@@ -682,26 +753,98 @@ def playlist(playlist_id):
|
||||
})
|
||||
|
||||
|
||||
def get_request_spotify_access_token():
|
||||
"""
|
||||
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
|
||||
Fallback to Flask session + stored refresh flow (browser).
|
||||
"""
|
||||
bearer = spotify_access_token_from_authorization_header()
|
||||
if bearer:
|
||||
return bearer
|
||||
spotify_user_id = session.get("spotify_user_id")
|
||||
if not spotify_user_id:
|
||||
return None
|
||||
return get_valid_access_token(spotify_user_id)
|
||||
@app.route("/metadata", methods=["GET"])
|
||||
@require_auth
|
||||
def get_metadata():
|
||||
track_id = request.args.get("trackId")
|
||||
meta_type = request.args.get("type", "beats")
|
||||
|
||||
if not track_id:
|
||||
return jsonify({"ok": False, "error": "Missing trackId"}), 400
|
||||
|
||||
access_token = g.spotify_access_token
|
||||
profile = spotify_get_me(access_token)
|
||||
spotify_user_id = profile["id"]
|
||||
|
||||
conn = db()
|
||||
row = conn.execute("""
|
||||
SELECT file_name FROM uploaded_metadata
|
||||
WHERE spotify_user_id = ? AND track_id = ? AND type = ?
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
""", (spotify_user_id, track_id, meta_type)).fetchone()
|
||||
conn.close()
|
||||
|
||||
if not row:
|
||||
return jsonify({"ok": False, "error": "Not found"}), 404
|
||||
|
||||
file_path = os.path.join(METADATA_UPLOAD_DIR, row["file_name"])
|
||||
if not os.path.isfile(file_path):
|
||||
return jsonify({"ok": False, "error": "Not found"}), 404
|
||||
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
collection = json.load(f)
|
||||
|
||||
if not isinstance(collection, dict):
|
||||
return jsonify({"ok": False, "error": "Invalid metadata file"}), 500
|
||||
|
||||
return jsonify({"ok": True, "collection": collection})
|
||||
|
||||
|
||||
def spotify_access_token_from_authorization_header():
|
||||
auth = request.headers.get("Authorization", "") or ""
|
||||
if not auth.startswith("Bearer "):
|
||||
return None
|
||||
token = auth[7:].strip()
|
||||
return token or None
|
||||
@app.route("/metadata", methods=["POST"])
|
||||
@require_auth
|
||||
def upload_metadata():
|
||||
access_token = g.spotify_access_token
|
||||
|
||||
if not request.is_json:
|
||||
return jsonify({"ok": False, "error": "Expected application/json"}), 400
|
||||
|
||||
body = request.get_json(silent=True)
|
||||
if not isinstance(body, dict):
|
||||
return jsonify({"ok": False, "error": "Invalid JSON body"}), 400
|
||||
|
||||
track_id = body.get("trackId")
|
||||
meta_type = body.get("type")
|
||||
version = body.get("version")
|
||||
collection = body.get("collection")
|
||||
|
||||
if not track_id or not meta_type or version is None or collection is None:
|
||||
return jsonify({
|
||||
"ok": False,
|
||||
"error": "Missing trackId, type, version, or collection",
|
||||
}), 400
|
||||
|
||||
try:
|
||||
version = int(version)
|
||||
except (TypeError, ValueError):
|
||||
return jsonify({"ok": False, "error": "version must be an integer"}), 400
|
||||
|
||||
if not isinstance(collection, dict):
|
||||
return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400
|
||||
|
||||
profile = spotify_get_me(access_token)
|
||||
spotify_user_id = profile["id"]
|
||||
|
||||
os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True)
|
||||
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
|
||||
safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120]
|
||||
file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json"
|
||||
file_path = os.path.join(METADATA_UPLOAD_DIR, file_name)
|
||||
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
json.dump(collection, f, ensure_ascii=False)
|
||||
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
conn = db()
|
||||
conn.execute("""
|
||||
INSERT INTO uploaded_metadata (
|
||||
spotify_user_id, track_id, type, version, file_name, created_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
""", (spotify_user_id, track_id, meta_type, version, file_name, now))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return jsonify({"ok": True, "file_name": file_name}), 201
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
17
beat.py
17
beat.py
@@ -1,6 +1,8 @@
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt # for debug only
|
||||
from sqi import gauss
|
||||
|
||||
# note: may be called ZxingDetector instead?
|
||||
class SsfZxing:
|
||||
"""
|
||||
Find beats in a Sum Slope Function by detecting threshold crossings.
|
||||
@@ -160,14 +162,27 @@ class RegularBeatFinder:
|
||||
num_freqs = 200 #: number of freq steps to evaluate
|
||||
range_f1 = 0.5 #: lowest detection frequency in Hz
|
||||
range_f2 = 4.0 #: highest detection frequency in Hz
|
||||
f_bias_width = 0.2 #: gaussian std relative to num_freqs within f1..f2
|
||||
|
||||
def __init__(self): pass
|
||||
|
||||
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None):
|
||||
def find_beat(self, fs, ssf_zxings, f_hint=None, debug_fe=False, debug_i=None):
|
||||
"""Find the optimal beat frequency."""
|
||||
act_ibis = np.diff(ssf_zxings)
|
||||
# nice-to: may be interesting to also use as score: the ssf amplitude info at the beats to which we aligned
|
||||
# evaluate mean absolute errors for all frequencies
|
||||
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
|
||||
# bias with f_hint - once we know the beat freq, make it more likely for it to be found everywhere
|
||||
if f_hint is not None:
|
||||
nf, f1, f2 = RegularBeatFinder.num_freqs, RegularBeatFinder.range_f1, RegularBeatFinder.range_f2
|
||||
bias = gauss(
|
||||
nf,
|
||||
(f_hint - f1) / (f2 - f1) * nf,
|
||||
RegularBeatFinder.f_bias_width * nf
|
||||
)
|
||||
freqs_bias = 1.0 / (np.max(bias)+bias) # make 'f_hint' at most 2x more likely -- (1+bias) if normalized
|
||||
freq_errs *= freqs_bias
|
||||
#
|
||||
if debug_fe:
|
||||
plt.figure(figsize=(8,2))
|
||||
plt.plot(freqs, freq_errs)
|
||||
|
||||
10
rhythm.py
10
rhythm.py
@@ -131,19 +131,21 @@ class BassAnalyzer(Analyzer):
|
||||
Wp_force = None
|
||||
I_force = None
|
||||
|
||||
def __init__(self, fs, sig, Wp_force=None):
|
||||
def __init__(self, fs, sig, Wp_force=None, I_force=None):
|
||||
"""
|
||||
:param fs: sampling rate
|
||||
:param sig: audio signal normalized to [-1,1]
|
||||
"""
|
||||
super(BassAnalyzer, self).__init__()
|
||||
self.D = int(self.shift_sec * fs) #: spectrogram step
|
||||
if self.Wp_force:
|
||||
self.Wp = self.Wp_force
|
||||
elif Wp_force:
|
||||
if Wp_force:
|
||||
self.Wp = Wp_force
|
||||
elif self.Wp_force:
|
||||
self.Wp = self.Wp_force
|
||||
else:
|
||||
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
|
||||
if I_force:
|
||||
self.I_force = I_force
|
||||
self.U = self.Wp // self.W # ratio
|
||||
|
||||
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
|
||||
|
||||
@@ -19,14 +19,19 @@ class Segmenter:
|
||||
|
||||
def __init__(self): pass
|
||||
|
||||
def get_segments(self, fs, guitar):
|
||||
i_stxs = self.get_segment_boundaries(fs, guitar)
|
||||
i_stxs = np.pad(i_stxs, (1, 0))
|
||||
return i_stxs
|
||||
|
||||
def get_segment_boundaries(self, fs, guitar):
|
||||
"""split the spectral power signal 'guitar' into stochastically similar segments."""
|
||||
segment_ids = self.get_segments(fs, guitar)
|
||||
segment_ids = self._get_segments(fs, guitar)
|
||||
stxs = np.diff(segment_ids) != 0
|
||||
i_stxs = np.where(stxs)[0]
|
||||
return i_stxs
|
||||
|
||||
def get_segments(self, fs, guitar):
|
||||
def _get_segments(self, fs, guitar):
|
||||
"""split the spectral power signal 'guitar' into stochastically similar segments."""
|
||||
seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec)
|
||||
seg_guitar_data = self._sig_stochastics(fs, guitar)
|
||||
|
||||
179
song.py
Normal file
179
song.py
Normal file
@@ -0,0 +1,179 @@
|
||||
import numpy as np
|
||||
|
||||
from rhythm import BassAnalyzer, GuitarAnalyzer
|
||||
from segmenter import Segmenter
|
||||
from beat import SsfZxing, RegularBeatFinder
|
||||
from sqi import gauss, shift
|
||||
|
||||
class SongBeatDetector:
|
||||
SEGMENT_SLICE_LEN_SEC = 8.0 #: slice length for processing (long enough to contain bar structure; short enough for a constant freq. beat placement)
|
||||
SSF_REL_THRES = 1.5 #: optimize for slope of error (mae) function over beat frequency
|
||||
NE_THRES = 30.0 #: normalized error threshold for 'good' slices
|
||||
def __init__(self): pass
|
||||
def detect(self, fs, sig, use_f_hint=True, debug_fe_idx=None):
|
||||
self.fs = fs
|
||||
#self.sig = sig
|
||||
|
||||
self.ba = BassAnalyzer(fs, sig)
|
||||
self.bass, times = self.ba.viterbi_wavelet_scalogram_amplitudes(dbg_time=True)
|
||||
# times: durations of different stages
|
||||
|
||||
self.ga = GuitarAnalyzer(fs, sig)
|
||||
self.guitar = self.ga.spectrogram_power_amplitudes()
|
||||
|
||||
fsd = fs / self.ga.D # <- guitar ('ga')
|
||||
self.D = self.ga.D # <- guitar ('ga')
|
||||
|
||||
# self.bass, self.guitar: functions on windowed spectrum 0.008 sec apart (125 Hz)
|
||||
self.sg = Segmenter()
|
||||
self.i_seg = self.sg.get_segments(fsd, self.guitar) # <- guitar
|
||||
self.t_seg = self.i_seg / fsd
|
||||
self.fsd = fsd # reciprocal window step size
|
||||
|
||||
# we segment on 'guitar' info, but process 'bass' later
|
||||
|
||||
if use_f_hint:
|
||||
# initial estimate (without 'f_hint')
|
||||
zds_initial = self._estimate_segments(debug_fe_idx=None)
|
||||
self.zds_initial = zds_initial
|
||||
ifbs_good = np.array([zdd['ne'] < SongBeatDetector.NE_THRES for zdd in zds_initial])
|
||||
fbs = np.array([zdd['fb'] for zdd in zds_initial])[np.where(ifbs_good)[0]]
|
||||
bins, hfreq = np.histogram(fbs)
|
||||
ih = np.argmax(bins)
|
||||
self.f_hint = np.mean((hfreq[ih], hfreq[ih+1])) # center freq of bin
|
||||
else:
|
||||
self.f_hint = None
|
||||
|
||||
# actual estimate (using 'f_hint' to bias each segment)
|
||||
self.zds = self._estimate_segments(f_hint=self.f_hint, debug_fe_idx=debug_fe_idx)
|
||||
|
||||
return self.zds
|
||||
|
||||
def _estimate_segments(self, f_hint=None, debug_fe_idx=None):
|
||||
zds = []
|
||||
fsd = self.fsd
|
||||
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
|
||||
# for each segment
|
||||
for i in np.arange(self.i_seg.shape[0]-1):
|
||||
i1, i2 = self.i_seg[i], self.i_seg[i+1]
|
||||
t1, t2 = i1 / fsd, i2 / fsd
|
||||
# split segment into slices
|
||||
if i2-i1 < seg_sl: continue
|
||||
num_sl = (i2-i1) // seg_sl
|
||||
for m in np.arange(num_sl):
|
||||
j1, j2 = i1+m*seg_sl, i1+(m+1)*seg_sl
|
||||
sig_slice = self.bass[slice(j1, j2)] # <- bass
|
||||
|
||||
if debug_fe_idx is not None:
|
||||
# there will be many (upto 50) different slices - do not debug-plot them all
|
||||
debug_fe_sidx = debug_fe_idx / fs * fsd
|
||||
debug_fe = i1 <= debug_fe_sidx < i2
|
||||
else:
|
||||
debug_fe = False
|
||||
zdd = self._process_slice(j1, j2, m, sig_slice, f_hint=f_hint, debug_fe=debug_fe)
|
||||
zds.append(zdd)
|
||||
|
||||
return zds
|
||||
|
||||
def _process_slice(self, j1, j2, m, sig_slice, f_hint=None, debug_fe=False):
|
||||
"""
|
||||
:param j1: lower index into 'sig_slice'
|
||||
:param j2: upper index into 'sig_slice'
|
||||
:param m: slice number (used to check if debugging)
|
||||
:param debug_fe: show plots for SSF and raw/reg beat placement
|
||||
"""
|
||||
# TODO: C++ impl of SsfZxing._ssf_det_zxings() has diverged.
|
||||
# - refractory period changes
|
||||
# - ssf_th filter with 6-points
|
||||
# - ?? others ??
|
||||
# NOTE: SsfZxing here is always getting short 8-sec slices only (nb. for 'ssf_th' comput.)
|
||||
|
||||
fsd = self.fsd # reciprocal window step size
|
||||
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
|
||||
|
||||
SsfZxing.ssf_rel_thres = SongBeatDetector.SSF_REL_THRES
|
||||
zd = SsfZxing()
|
||||
ssf, ssf_th = zd._ssf_function(fsd, sig_slice)
|
||||
ssf_zxings = zd._ssf_det_zxings(fsd, ssf, ssf_th)
|
||||
|
||||
zdd = {
|
||||
'i1': j1 * self.D, 'i2': j2 * self.D,
|
||||
# ssf_zxings: raw beats (relative to slice)
|
||||
'zd': zd, 'ssf': ssf, 'ssf_zxings': ssf_zxings,
|
||||
'sig_slice': sig_slice, 'sig_source': 'bass',
|
||||
'ssf_th': np.ones(ssf.shape[0]) * ssf_th
|
||||
}
|
||||
|
||||
# (only plot first slice of a wider segment)
|
||||
#if num_sl > 2 and m == 0:
|
||||
if debug_fe:
|
||||
#
|
||||
# scalogram image, with viterbi path
|
||||
self.ba.debug_plot(j1, j2) # TODO: adapt 'bass'
|
||||
plt.title(f'scalogram & viterbi path, slice [{j1}:{j2}]')
|
||||
|
||||
# SSF function and detected raw beats
|
||||
zd.debug_plot(0, seg_sl)
|
||||
plt.title(f'raw beats, slice [{j1}:{j2}]')
|
||||
|
||||
# nice-to: optimize phase, (maybe iteratively, optimize phase and freq each)
|
||||
bf = RegularBeatFinder()
|
||||
fb, ne = bf.find_beat(fsd, ssf_zxings, f_hint=f_hint, debug_fe=debug_fe, debug_i=None)
|
||||
if debug_fe: plt.title(f'regular-beat placement error (mae), slice [{j1}:{j2}]')
|
||||
# mae is unnurmalized here, as returned from RegularBeatFinder._get_opt_ibi_freq_2()
|
||||
zdd.update({
|
||||
# bf: beat finder
|
||||
# fb: beat frequency, in Hz
|
||||
# ne: normalized mae error
|
||||
'bf': bf, 'fb': fb, 'ne': ne
|
||||
})
|
||||
# TODO: ne > 30 is suspiciously bad - filter those "detections" out eventually
|
||||
# TODO: # catch basic errors: ne == 0, or len(est_zxings) == 0, means slice is bad
|
||||
# NOTE: since 2x the zero-crossings, we get twice the frequency here.
|
||||
# NOTE: this means 0.5 lower freq bound of RegularBeatFinder will find at most 60 bpm in the song.
|
||||
|
||||
# TODO: RegularBeatFinder currently not using 'phase' info, but should be optimized
|
||||
# TODO: (currently we start the pattern at the first detected beat, may or may not be good)
|
||||
est_zxings = np.cumsum(np.pad(bf.freq_to_est_ibis(fsd, fb, j2-j1), (1,0))) # rel. to slice
|
||||
if ssf_zxings.shape[0] > 0:
|
||||
est_zxings += ssf_zxings[0] # add phase = currently we just start at first detected beat
|
||||
# nice-to: median-filter the freq, etc.pp.
|
||||
# nice-to: avoid adding len(est_zxings)=0 entries later
|
||||
|
||||
# trim back to max. index
|
||||
est_zxings = est_zxings[np.where(est_zxings < ssf.shape[0])[0]]
|
||||
|
||||
zdd.update({
|
||||
# est_zxings: regular beats (relative to slice)
|
||||
'est_zxings': est_zxings
|
||||
})
|
||||
|
||||
if debug_fe:
|
||||
plt.figure(figsize=(8,2))
|
||||
plt.plot(ssf)
|
||||
plt.plot(np.arange(ssf.shape[0]), np.ones(ssf.shape[0]) * ssf_th); None
|
||||
plt.scatter(ssf_zxings, np.ones(ssf_zxings.shape[0]) * ssf_th, c='r')
|
||||
plt.scatter(est_zxings, np.ones(est_zxings.shape[0]) * ssf_th, c='g')
|
||||
plt.title(f'ssf, ssf_th, raw beats (r), reg beats (g), slice [{j1}:{j2}]')
|
||||
|
||||
return zdd
|
||||
|
||||
# _debug_fmt_est_zxings
|
||||
def _place_fmt_zxings(self, fsd, ssf, ssf_zxings):
|
||||
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
|
||||
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
|
||||
#gauss_amplitude = 2.0
|
||||
|
||||
#def get_snr(self, fsd, ssf, ssf_threshold, ssf_zxings):
|
||||
# """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
|
||||
sigma = fsd * gauss_beat_template_sigma_sec
|
||||
W = int(fsd * gauss_beat_template_win_sec)
|
||||
gb = gauss(W, W//2, sigma)
|
||||
# place gaussians on estimated beat locations
|
||||
ssf_est = np.zeros(ssf.shape[0])
|
||||
for i in ssf_zxings:
|
||||
ssf_est += shift(ssf.shape[0], i, gb)
|
||||
ssf_est /= gb[W//2] # normalize amplitude to 1.0
|
||||
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
|
||||
return ssf_est
|
||||
|
||||
Reference in New Issue
Block a user