Compare commits

..

19 Commits

Author SHA1 Message Date
02549e5618 feat: fetch beat metadata from api 2026-05-31 11:41:46 +02:00
3b7bae0a38 feat: upload metadata, unify auth 2026-05-30 19:47:17 +02:00
041cba8224 fix: fix mixed up f1, f2 2026-05-17 17:51:24 +02:00
729555acc3 feat: find_beat(): bias with f_hint - once we know the beat freq 2026-05-17 17:01:48 +02:00
f3f580f923 feat: find_beat(): bias with f_hint - once we know the beat freq 2026-05-17 16:55:13 +02:00
e349278c06 docs: add TODO.md 2026-05-17 12:33:09 +02:00
71f1975a97 feat: song: bass reg beat detector in slices 2026-05-17 12:32:39 +02:00
ee5a1376ee fix: playlist field: tracks -> items 2026-05-14 12:23:33 +02:00
e42cddd645 feat: exponential backoff 2026-05-14 12:23:18 +02:00
b0a7202f32 feat: allow bearer token to be passed through from the app 2026-05-14 01:54:02 +02:00
71f55ab20d feat: allow redirect_uri param to spotify login, for Lockstep Demo app flow 2026-05-14 01:37:23 +02:00
378009f8b0 api: wrap Spotify playlists 2026-05-13 11:40:50 +02:00
4627786dc4 chore: debug for SsfZxing detector 2026-05-13 05:16:43 +02:00
11934b1f61 feat: restrict B in BassAnalyzer to sensible freq range 2026-05-13 05:16:17 +02:00
d10187878d fix: look in a win for ssf rise; cosmetics 2026-04-28 10:10:52 +02:00
e506a3e580 fix: fs/D in lowpass, add missing fs attr 2026-04-27 23:50:55 +02:00
975adcdee4 feat: Segmenter, RegularBeatFinder, SigQuality 2026-04-27 22:13:02 +02:00
5d9de7d8f1 feat: GuitarAnalyzer (spectrogram power in freq range) 2026-04-27 11:10:08 +02:00
132dea15fa fix: amplitude cutoff lowpass instead of mean 2026-04-27 11:06:39 +02:00
9 changed files with 1178 additions and 46 deletions

11
TODO.md Normal file
View File

@@ -0,0 +1,11 @@
# TODO
- tests: beat and guitar synthesizer
- generate rhythmic sequence and test the algos
.
O> "2027-04-29 TestApi Bass song.ipynb": [21]
- why is ssf continually rising?
- because of using 'fs' not 'fsd'
.

428
api.py
View File

@@ -17,20 +17,24 @@
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done # $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
import os import os
import re
import sqlite3 import sqlite3
from base64 import b64encode from base64 import b64encode
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import json import json
from functools import wraps
from urllib.parse import urlencode
from flask import Flask, request, session, jsonify, make_response from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for
from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.middleware.proxy_fix import ProxyFix
from authomatic import Authomatic from authomatic import Authomatic
from authomatic.adapters import WerkzeugAdapter from authomatic.adapters import WerkzeugAdapter
from authomatic.providers import oauth2 from authomatic.providers import oauth2
from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP
import random
import time
import requests import requests
@@ -95,14 +99,24 @@ PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify]
SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"] SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"]
SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"] SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"]
# Must exactly match a Redirect URI configured in your Spotify app settings. # OAuth redirect registered in the Spotify Developer Dashboard (Authorization Code
# flow). Spotify sends the browser here with ?code=&state= — not to the Android app scheme.
REDIRECT_URI = os.environ.get( REDIRECT_URI = os.environ.get(
"SPOTIFY_REDIRECT_URI", "SPOTIFY_REDIRECT_URI",
#"https://api.lockstep.at/spotify/callback" #"https://api.lockstep.at/spotify/callback"
"https://api.lockstep.at/login/spotify/" "https://api.lockstep.at/login/spotify/"
) )
# After server-side token exchange, the browser may be redirected to this URL with
# tokens in the query string (mobile / Custom Tab). This is NOT registered with Spotify;
# only REDIRECT_URI above goes in the dashboard for this flow.
ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get(
"SPOTIFY_APP_POST_LOGIN_REDIRECT_URI",
"at.lockstep.player://spotify/callback",
)
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db") DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections")
app = Flask(__name__) app = Flask(__name__)
app.secret_key = os.environ["FLASK_SECRET_KEY"] app.secret_key = os.environ["FLASK_SECRET_KEY"]
@@ -145,6 +159,17 @@ def init_db():
updated_at TEXT NOT NULL updated_at TEXT NOT NULL
) )
""") """)
conn.execute("""
CREATE TABLE IF NOT EXISTS uploaded_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
spotify_user_id TEXT NOT NULL,
track_id TEXT NOT NULL,
type TEXT NOT NULL,
version INTEGER NOT NULL,
file_name TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit() conn.commit()
conn.close() conn.close()
@@ -214,25 +239,41 @@ def spotify_basic_auth_header() -> str:
def refresh_spotify_token(refresh_token: str) -> dict: def refresh_spotify_token(refresh_token: str) -> dict:
body = urlencode({ r = requests.post(
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}).encode("utf-8")
req = Request(
"https://accounts.spotify.com/api/token", "https://accounts.spotify.com/api/token",
data=body, data={
method="POST", "grant_type": "refresh_token",
"refresh_token": refresh_token,
},
headers={ headers={
"Authorization": spotify_basic_auth_header(), "Authorization": spotify_basic_auth_header(),
"Content-Type": "application/x-www-form-urlencoded", "Content-Type": "application/x-www-form-urlencoded",
}, },
) )
with urlopen(req) as resp: if not r.ok:
payload = json.loads(resp.read().decode("utf-8")) # The token endpoint returns errors as flat
# {"error": "<code>", "error_description": "<msg>"}, which differs from
# the Web API's {"error": {"status": ..., "message": ...}} shape. Rewrite
# the body so our error handler surfaces a consistent envelope to
# clients regardless of which Spotify endpoint failed.
try:
body = r.json()
except ValueError:
body = {}
message = (
body.get("error_description")
or body.get("error")
or r.reason
or "Token refresh failed"
)
r._content = json.dumps({
"error": {"status": r.status_code, "message": message}
}).encode("utf-8")
r.headers["Content-Type"] = "application/json"
return payload r.raise_for_status()
return r.json()
def get_valid_access_token(spotify_user_id: str) -> str: def get_valid_access_token(spotify_user_id: str) -> str:
@@ -266,17 +307,121 @@ def get_valid_access_token(spotify_user_id: str) -> str:
# ---------------------------- # ----------------------------
# Simple Spotify API call # Spotify Web API helpers
# ---------------------------- # ----------------------------
def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict:
"""
GET a Spotify Web API endpoint and return the parsed JSON body.
Retries on HTTP 429 and 503 with exponential backoff and optional Retry-After,
so brief Spotify rate limits often clear before we surface an error to the app.
"""
headers = {"Authorization": f"Bearer {access_token}"}
backoff_sec = 1.0
max_attempts = 8
params_eff = params
for attempt in range(max_attempts):
r = requests.get(url, headers=headers, params=params_eff)
if r.status_code in (429, 503) and attempt < max_attempts - 1:
wait = backoff_sec
ra = r.headers.get("Retry-After")
if ra:
try:
wait = max(wait, float(ra))
except ValueError:
pass
wait = min(wait, 120.0)
time.sleep(wait + random.random() * 0.25 * wait)
backoff_sec = min(backoff_sec * 2.0, 60.0)
continue
r.raise_for_status()
return r.json()
def spotify_get_paginated(
url: str,
access_token: str,
limit: int = 50,
max_items: int | None = None,
) -> list:
"""
Fetch all items from a paginated Spotify Web API endpoint.
Spotify paging objects return up to `limit` items per page (50 max for most
endpoints) and a `next` URL. We follow `next` until it is null, collecting
items along the way.
"""
items: list = []
params: dict | None = {"limit": limit, "offset": 0}
next_url: str | None = url
while next_url is not None:
page = spotify_get(next_url, access_token, params=params)
items.extend(page.get("items", []))
if max_items is not None and len(items) >= max_items:
return items[:max_items]
# `next` is a fully-qualified URL with limit/offset already encoded,
# so we must not pass `params` again after the first request.
next_url = page.get("next")
params = None
return items
def spotify_get_me(access_token: str) -> dict: def spotify_get_me(access_token: str) -> dict:
req = Request( return spotify_get("https://api.spotify.com/v1/me", access_token)
"https://api.spotify.com/v1/me",
headers={"Authorization": f"Bearer {access_token}"},
method="GET", # ----------------------------
) # Error handling
with urlopen(req) as resp: # ----------------------------
return json.loads(resp.read().decode("utf-8"))
@app.errorhandler(requests.HTTPError)
def handle_spotify_http_error(e: requests.HTTPError):
"""
Translate a non-2xx upstream response from Spotify into a JSON envelope,
passing through the upstream HTTP status code.
Spotify error bodies look like {"error": {"status": ..., "message": ...}}
when JSON is returned; we surface that message when available.
"""
resp = e.response
status = resp.status_code if resp is not None else 502
spotify_error = None
error_message = str(e)
if resp is not None:
try:
spotify_error = resp.json()
except ValueError:
spotify_error = None
if (
isinstance(spotify_error, dict)
and isinstance(spotify_error.get("error"), dict)
):
error_message = (
spotify_error["error"].get("message") or error_message
)
return jsonify({
"ok": False,
"error": error_message,
"spotify": spotify_error,
}), status
@app.errorhandler(requests.RequestException)
def handle_spotify_request_error(e: requests.RequestException):
"""Network-level failures (DNS, connection, timeout) have no upstream status."""
return jsonify({
"ok": False,
"error": f"Upstream request failed: {e}",
}), 502
# ---------------------------- # ----------------------------
@@ -294,6 +439,28 @@ def index():
@app.route("/login/<provider_name>/", methods=["GET", "POST"]) @app.route("/login/<provider_name>/", methods=["GET", "POST"])
def login(provider_name): def login(provider_name):
# Authomatic 1.3.0 (oauth2.login) only runs "phase 1" — redirect to Spotify —
# when there are no query parameters, or only ``user_state``:
# elif (not self.params or (len(self.params) == 1 and 'user_state' in self.params))
# A mobile client opening ``/login/spotify/?redirect_uri=...`` therefore matches
# no branch; login() returns without calling redirect() → empty body and HTTP 200
# (white page). Stash the app callback in the session and reload without query args.
if (
provider_name == "spotify"
and request.args.get("redirect_uri")
and "code" not in request.args
and "error" not in request.args
):
requested = request.args.get("redirect_uri", "")
if requested != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
return jsonify({
"ok": False,
"error": "redirect_uri not allowed for this client",
}), 400
session["spotify_oauth_app_redirect_uri"] = requested
session.modified = True
return redirect(url_for("login", provider_name=provider_name), code=302)
response = make_response() response = make_response()
# Let Authomatic handle the OAuth2 handshake. # Let Authomatic handle the OAuth2 handshake.
@@ -301,7 +468,7 @@ def login(provider_name):
WerkzeugAdapter(request, response), WerkzeugAdapter(request, response),
provider_name, provider_name,
session=session, session=session,
session_saver=lambda: session.modified session_saver=lambda: setattr(session, "modified", True),
) )
# If result is None, Authomatic is still redirecting/processing. # If result is None, Authomatic is still redirecting/processing.
@@ -372,6 +539,27 @@ def login(provider_name):
# keep the Spotify user id in session # keep the Spotify user id in session
session["spotify_user_id"] = result.user.id session["spotify_user_id"] = result.user.id
app_redirect = session.pop("spotify_oauth_app_redirect_uri", None)
if app_redirect:
if app_redirect != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
return jsonify({
"ok": False,
"error": "Stored app redirect_uri does not match allowlist",
}), 400
sep = "&" if ("?" in app_redirect) else "?"
target = (
f"{app_redirect}{sep}"
+ urlencode(
{
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": str(expires_in),
"token_type": token_type or "",
},
)
)
return redirect(target, code=302)
return jsonify({ return jsonify({
"ok": True, "ok": True,
"spotify_user_id": result.user.id, "spotify_user_id": result.user.id,
@@ -455,44 +643,210 @@ old example 1:
} }
""" """
@app.route("/me")
def me(): def spotify_access_token_from_authorization_header():
auth = request.headers.get("Authorization", "") or ""
if not auth.startswith("Bearer "):
return None
token = auth[7:].strip()
return token or None
def get_request_spotify_access_token():
"""
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
Fallback to Flask session + stored refresh flow (browser).
"""
bearer = spotify_access_token_from_authorization_header()
if bearer:
return bearer
spotify_user_id = session.get("spotify_user_id") spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id: if not spotify_user_id:
return jsonify({"ok": False, "error": "Not logged in"}), 401 return None
return get_valid_access_token(spotify_user_id)
access_token = get_valid_access_token(spotify_user_id)
def require_auth(f):
@wraps(f)
def wrapped(*args, **kwargs):
token = get_request_spotify_access_token()
if not token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
g.spotify_access_token = token
return f(*args, **kwargs)
return wrapped
@app.route("/me")
@require_auth
def me():
access_token = g.spotify_access_token
profile = spotify_get_me(access_token) profile = spotify_get_me(access_token)
row = get_token_record(spotify_user_id) row = get_token_record(profile["id"])
return jsonify({ return jsonify({
"ok": True, "ok": True,
"profile": profile, "profile": profile,
"stored_expires_at": row["expires_at"], "stored_expires_at": row["expires_at"] if row else None,
}) })
@app.route("/playlists") @app.route("/playlists")
@require_auth
def playlists(): def playlists():
spotify_user_id = "cidermole" access_token = g.spotify_access_token
access_token = get_valid_access_token(spotify_user_id)
""" """
user_id = "Sara" user_id = "Sara"
url = f"https://api.spotify.com/v1/users/{user_id}/playlists" url = f"https://api.spotify.com/v1/users/{user_id}/playlists"
# -> 403 Forbidden # -> 403 Forbidden
""" """
url = f"https://api.spotify.com/v1/me/playlists" items = spotify_get_paginated(
headers={"Authorization": f"Bearer {access_token}"} "https://api.spotify.com/v1/me/playlists",
r = requests.get(url, headers=headers) access_token,
# TODO: pagination (limit,offset) )
return jsonify({ return jsonify({
"ok": True, "ok": True,
"response": r.json() "total": len(items),
"items": items,
}) })
@app.route("/playlists/<playlist_id>")
@require_auth
def playlist(playlist_id):
access_token = g.spotify_access_token
playlist_data = spotify_get(
f"https://api.spotify.com/v1/playlists/{playlist_id}",
access_token,
)
# Full playlist objects use a paging object at `items` (current Spotify shape)
# or legacy `tracks`. Follow `next` on whichever is present.
paging_key = (
"items"
if isinstance(playlist_data.get("items"), dict)
else "tracks"
)
paging = playlist_data.get(paging_key) or {}
if paging.get("next"):
all_items = spotify_get_paginated(
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks",
access_token,
limit=100,
)
playlist_data[paging_key] = {
**paging,
"items": all_items,
"offset": 0,
"limit": len(all_items),
"next": None,
"previous": None,
}
return jsonify({
"ok": True,
"playlist": playlist_data,
})
@app.route("/metadata", methods=["GET"])
@require_auth
def get_metadata():
track_id = request.args.get("trackId")
meta_type = request.args.get("type", "beats")
if not track_id:
return jsonify({"ok": False, "error": "Missing trackId"}), 400
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
conn = db()
row = conn.execute("""
SELECT file_name FROM uploaded_metadata
WHERE spotify_user_id = ? AND track_id = ? AND type = ?
ORDER BY created_at DESC
LIMIT 1
""", (spotify_user_id, track_id, meta_type)).fetchone()
conn.close()
if not row:
return jsonify({"ok": False, "error": "Not found"}), 404
file_path = os.path.join(METADATA_UPLOAD_DIR, row["file_name"])
if not os.path.isfile(file_path):
return jsonify({"ok": False, "error": "Not found"}), 404
with open(file_path, "r", encoding="utf-8") as f:
collection = json.load(f)
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "Invalid metadata file"}), 500
return jsonify({"ok": True, "collection": collection})
@app.route("/metadata", methods=["POST"])
@require_auth
def upload_metadata():
access_token = g.spotify_access_token
if not request.is_json:
return jsonify({"ok": False, "error": "Expected application/json"}), 400
body = request.get_json(silent=True)
if not isinstance(body, dict):
return jsonify({"ok": False, "error": "Invalid JSON body"}), 400
track_id = body.get("trackId")
meta_type = body.get("type")
version = body.get("version")
collection = body.get("collection")
if not track_id or not meta_type or version is None or collection is None:
return jsonify({
"ok": False,
"error": "Missing trackId, type, version, or collection",
}), 400
try:
version = int(version)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "version must be an integer"}), 400
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120]
file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json"
file_path = os.path.join(METADATA_UPLOAD_DIR, file_name)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(collection, f, ensure_ascii=False)
now = datetime.now(timezone.utc).isoformat()
conn = db()
conn.execute("""
INSERT INTO uploaded_metadata (
spotify_user_id, track_id, type, version, file_name, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (spotify_user_id, track_id, meta_type, version, file_name, now))
conn.commit()
conn.close()
return jsonify({"ok": True, "file_name": file_name}), 201
if __name__ == "__main__": if __name__ == "__main__":
init_db() init_db()
app.run(host="127.0.0.1", port=8000, debug=True) app.run(host="127.0.0.1", port=8000, debug=True)

216
beat.py Normal file
View File

@@ -0,0 +1,216 @@
import numpy as np
import matplotlib.pyplot as plt # for debug only
from sqi import gauss
# note: may be called ZxingDetector instead?
class SsfZxing:
"""
Find beats in a Sum Slope Function by detecting threshold crossings.
See Zong et al, from CINC 2003.
"""
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
# these two depend on each other.
t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
sw_sec = 0.04 #: upslope width in sec (for SSF function)
ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
# TODO: C++ impl has diverged.
# - refractory period changes
# - ssf_th filter with 6-points
# - ?? others ??
def __init__(self): pass
def _ssf_det_zxings(self, fs, ssf, ssf_th):
"""detect threshold crossings in 'ssf' signal."""
i_holdoff = int(self.t_holdoff * fs)
# threshold crossing
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
ssf_z = (ssf_pk - ssf_pks) == 1
# holdoff
for i in np.arange(ssf_z.shape[0] - i_holdoff):
if ssf_z[i]:
ssf_z[i+1:i+1+i_holdoff] = 0
# rise amplitude filter (check in 2-3 samples [0.024 sec or so] and compare vs. threshold)
i_range = int(self.t_range * fs)
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
if ssf_z[i]:
rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i])
if rise < ssf_th * self.ssf_rel_rise:
ssf_z[i] = 0
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_zxings = np.where(ssf_z)[0]
# only integer-index resolution (no interpolation)
self.ssf_zxings = ssf_zxings
return ssf_zxings
def _ssf_function(self, fs, y):
"""sum-slope function."""
sw = int(self.sw_sec*fs)
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
duks = np.pad(np.cumsum(duk), (0, sw))
duks_r = np.roll(duks, sw)
ssf = (duks - duks_r)[:-sw]
# compute threshold
# TODO: check if we need lowpass instead of mean for 'ssf_th'
ssf_th = self.ssf_rel_thres * np.mean(ssf)
self.ssf, self.ssf_th = ssf, ssf_th
return ssf, ssf_th
def debug_plot(self, i1, i2):
ssf, ssf_th, ssf_zxings = self.ssf, self.ssf_th, self.ssf_zxings
ssf_slice = ssf[i1:i2]
ssf_th_slice = ssf_th[i1:i2] if isinstance(ssf_th, np.ndarray) else ssf_th
plt.figure(figsize=(8, 2))
plt.plot(ssf_slice)
plt.plot(np.arange(ssf_slice.shape[0]), np.ones(ssf_slice.shape[0]) * ssf_th_slice)
plt.scatter(ssf_zxings[i1:i2], np.ones(ssf_zxings[i1:i2].shape[0]) * ssf_th_slice, c='r')
def get_mae_dist(ibis):
"""make triangular wave between beats, representing absolute beat placement error."""
dist = np.zeros(np.sum(ibis)+1)
# fill with distances
p_i, i = 0, 0
for ibi in ibis:
i += ibi
l_c = ibi // 2
dist[p_i:p_i+l_c+ibi%2] = np.arange(l_c+ibi%2)
dist[p_i+l_c+ibi%2:i] = 1 + np.arange(ibi-l_c-ibi%2)[::-1]
p_i = i
return dist
def get_mae_err_1(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave (estimate)
and actually detected beats. computes 'dist' from 'act_ibis' (direction 1).
"""
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
# 2 pi freq n / fs + phase = k * pi
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
# n = (k - phase/pi) * fs / (2 freq)
# k_max = 2 freq n_max / fs + phase/pi
N = np.sum(act_ibis)+1
phase %= 2*np.pi
k_max = 2 * freq * N / fs + phase/np.pi + 1
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
est_ibis = np.diff(i_est_zxing)
dist = get_mae_dist(est_ibis)
assert np.sum(est_ibis) > np.sum(act_ibis)
if debug:
plt.figure(figsize=(8,2))
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
mae_errs = dist[np.cumsum(act_ibis)]
return np.sum(mae_errs)
def get_mae_err_2(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave (estimate)
and actually detected beats. computes 'dist' from est_zxings (direction 2)
"""
dist = np.pad(get_mae_dist(act_ibis), (0,1))
dist[-1] = 1 # sentinel for rounding errors in np.round()
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
# 2 pi freq n / fs + phase = k * pi
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
# n = (k - phase/pi) * fs / (2 freq)
# k_max = 2 freq n_max / fs + phase/pi
N = np.sum(act_ibis)+1
phase %= 2*np.pi
k_max = 2 * freq * N / fs + phase/np.pi
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
if debug:
plt.figure(figsize=(8,2))
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
mae_errs = dist[i_est_zxing]
return np.sum(mae_errs)
def get_mae_err(fs, freq, phase, act_ibis, debug=False):
"""
compute beat placement error between zero crossings of given sine wave
and actually detected beats.
"""
mae = 0
# we compute both directions, to properly handle "missing beats"
# (in direction 1, an optimal solution is "fully dense", "freq = fs/2", because each 'act_beat' will align to the next index)
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
# TODO: may need to weight these two differently
# TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24]
# TODO: (check: is match always slightly to the left of the trough / smooth minimum?)
# TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??)
# (or even penalize differently instead of adding dir1 and dir2)
return mae
class RegularBeatFinder:
"""
Optimize a beat frequency by placing a regular beat over the detected beats.
Always finds a beat within f1..f2 range,
ignoring whether the detected beat is an upper harmonic.
"""
num_freqs = 200 #: number of freq steps to evaluate
range_f1 = 0.5 #: lowest detection frequency in Hz
range_f2 = 4.0 #: highest detection frequency in Hz
f_bias_width = 0.2 #: gaussian std relative to num_freqs within f1..f2
def __init__(self): pass
def find_beat(self, fs, ssf_zxings, f_hint=None, debug_fe=False, debug_i=None):
"""Find the optimal beat frequency."""
act_ibis = np.diff(ssf_zxings)
# nice-to: may be interesting to also use as score: the ssf amplitude info at the beats to which we aligned
# evaluate mean absolute errors for all frequencies
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
# bias with f_hint - once we know the beat freq, make it more likely for it to be found everywhere
if f_hint is not None:
nf, f1, f2 = RegularBeatFinder.num_freqs, RegularBeatFinder.range_f1, RegularBeatFinder.range_f2
bias = gauss(
nf,
(f_hint - f1) / (f2 - f1) * nf,
RegularBeatFinder.f_bias_width * nf
)
freqs_bias = 1.0 / (np.max(bias)+bias) # make 'f_hint' at most 2x more likely -- (1+bias) if normalized
freq_errs *= freqs_bias
#
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(freqs, freq_errs)
# get optimal frequency
i_freq = np.argmin(freq_errs)
# compute normalized error (mean beat placement error in samples)
N = np.sum(act_ibis)+1
k_max = 2 * freqs[i_freq] * N / fs # + phase/np.pi
norm_err = freq_errs[i_freq] / k_max
#
return freqs[i_freq], norm_err
def freq_to_est_ibis(self, fs, freq, N):
phase = 0
k_max = 2 * freq * N / fs + phase/np.pi
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
return np.diff(i_est_zxing)
def _get_opt_ibi_freq_2(self, fs, act_ibis, debug_i=None):
"""get mean absolute errors for a range of frequencies."""
assert self.range_f2 < fs/8, "it is at most useful to go until f = fs/8" # (and even then, get_mae_dist() triangle will not be very useful)
t_freqs = np.linspace(self.range_f1, self.range_f2, self.num_freqs)
fe = np.zeros(self.num_freqs)
for i, f in enumerate(t_freqs):
fe[i] = get_mae_err(fs, f, 0.0, act_ibis)
if debug_i is not None:
# plot colors:
# y: estimate
# r: actual
get_mae_err(fs, t_freqs[debug_i], 0.0, act_ibis, debug=True)
return t_freqs, fe

104
docs/playlists.md Normal file
View File

@@ -0,0 +1,104 @@
# Playlist endpoints
All routes require an authenticated session (`spotify_user_id` after Spotify login). Responses are JSON (`application/json`).
---
## `GET /playlists`
Returns every playlist for the current user by following Spotifys paginated [`GET /v1/me/playlists`](https://developer.spotify.com/documentation/web-api/reference/get-a-list-of-current-users-playlists) until all pages are loaded.
### Success (200)
| Field | Type | Description |
| --- | --- | --- |
| `ok` | `boolean` | Always `true` on success. |
| `total` | `number` | Count of playlists in `items`. |
| `items` | `array` | Each element is a **simplified playlist object** from Spotify |
**Typical fields on each element of `items`** (Spotify `SimplifiedPlaylistObject`):
| Field | Type |
| --- | --- |
| `description` | `string` \| `null` |
| `id` | `string` |
| `images` | `array` of `{ "url": string, "height": number \| null, "width": number \| null }` |
| `name` | `string` |
| `primary_color` | `string` \| `null` |
| `snapshot_id` | `string` |
| `tracks` | `object` — e.g. `{ "href": string, "total": number }` (track list stub, not full tracks) |
### Errors
`{ "ok": false, "error": string, ... }`
---
## `GET /playlists/<playlist_id>`
`<playlist_id>` is the Spotify playlist ID (the same id as in playlist URLs / `items[].id`).
Fetches [`GET /v1/playlists/{playlist_id}`](https://developer.spotify.com/documentation/web-api/reference/get-playlist) following pagination.
### Success (200)
| Field | Type | Description |
| --- | --- | --- |
| `ok` | `boolean` | Always `true` on success. |
| `playlist` | `object` | **Full playlist object** from Spotify, with `tracks` possibly expanded to every track as described above. |
**Typical fields on `playlist`** (Spotify `PlaylistObject`):
| Field | Type |
| --- | --- |
| `description` | `string` \| `null` |
| `id` | `string` |
| `images` | `array` (image objects, as above) |
| `name` | `string` |
| `primary_color` | `string` \| `null` |
| `snapshot_id` | `string` |
| `tracks` | `{"items": [Track, ...], ...}` |
**Typical fields on each element of `playlist.tracks.items`** (Spotify playlist track wrapper):
| Field | Type |
| --- | --- |
| `track` | `object` \| `null` — full or linked track; `null` if removed |
Nested objects use Spotifys **Track**, **Artist** (simplified), and **Album** (simplified) shapes below (field availability can vary by market or API version; see Spotifys reference).
#### `track` — Spotify `TrackObject`
Returned as the non-`null` value of `playlist.tracks.items[].track` (playlist context usually includes a **full** track with **simplified** `album` and `artists` entries).
| Field | Type |
| --- | --- |
| `album` | `object`**SimplifiedAlbumObject** (see below) |
| `artists` | `array` of **SimplifiedArtistObject** (see below) |
| `duration_ms` | `number` |
| `id` | `string` |
| `name` | `string` |
#### `track.artists[]` — Spotify **SimplifiedArtistObject**
| Field | Type |
| --- | --- |
| `id` | `string` |
| `name` | `string` |
#### `track.album` — Spotify **SimplifiedAlbumObject**
| Field | Type |
| --- | --- |
| `artists` | `array` of **SimplifiedArtistObject** (album-level credits) |
| `id` | `string` |
| `images` | `array` of `{ "url": string, "height": number \| null, "width": number \| null }` |
| `name` | `string` |
### Errors
Same as `/playlists`: **401** when not logged in; otherwise Spotify errors and network errors per the global error handlers.
---
For authoritative field lists and edge cases, see [Spotify Web API reference](https://developer.spotify.com/documentation/web-api).

View File

@@ -4,3 +4,5 @@ authomatic
requests requests
numpy numpy
scipy scipy
scikit-learn
hsh-signal

158
rhythm.py
View File

@@ -13,8 +13,10 @@
import numpy as np import numpy as np
from numpy.fft import fft from numpy.fft import fft
from scipy.signal import fftconvolve import matplotlib.pyplot as plt # for debug only
from scipy.io import wavfile
from hsh_signal.signal import lowpass_fft
import time
def viterbi_highest_frequency_path_vectorized(Scp2, jump_penalty=2.0, use_log_amplitude=True): def viterbi_highest_frequency_path_vectorized(Scp2, jump_penalty=2.0, use_log_amplitude=True):
Scp2 = np.asarray(Scp2, dtype=float) Scp2 = np.asarray(Scp2, dtype=float)
@@ -80,7 +82,30 @@ def gabor_wavelet(omega, nu, fs, T, tt=None):
psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega) psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega)
return psi return psi
class BassAnalyzer: class Analyzer:
def __init__(self): pass
def debug_plot(self, i1, i2):
Scp2, path = self.Scp2, self.path
fs, Dp = self.fs, self.Dp
ss, omega, nu, fsp, Wp, I, J, freqs = self.pms
Scp2_slice = np.abs(Scp2[i1:i2])
plt.figure(figsize=(8,2))
plt.imshow(Scp2_slice.T, origin='lower')
x_positions = np.arange(Scp2_slice.shape[0]//250+1)*250
if x_positions[-1] == Scp2_slice.shape[0]:
x_positions[-1] -= 1 # so last tick is shown properly
t1 = i1 / (fs / Dp)
x_labels = ['{:.1f}'.format(t1+x*Dp/fs) for x in x_positions]
plt.xticks(x_positions, x_labels)
y_positions = np.arange(Scp2_slice.shape[1]//50)*50
y_labels = ['{:.1f}'.format((nu/(omega*ss[y]))) for y in y_positions] # Hz equivalents of wavelet scale
plt.yticks(y_positions, y_labels)
plt.plot(np.arange(Scp2_slice.shape[0]), path[i1:i2], c='r')
class BassAnalyzer(Analyzer):
""" """
Rhythm analysis from songs. Rhythm analysis from songs.
Provides a beat amplitude signal from the audio signal. Provides a beat amplitude signal from the audio signal.
@@ -103,14 +128,24 @@ class BassAnalyzer:
wavelet_win_sec = 0.175 wavelet_win_sec = 0.175
k_omega, k_nu = 0.12, 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B') k_omega, k_nu = 0.12, 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B')
viterbi_jump_penalty = 5000.0 viterbi_jump_penalty = 5000.0
Wp_force = None
I_force = None
def __init__(self, fs, sig): def __init__(self, fs, sig, Wp_force=None, I_force=None):
""" """
:param fs: sampling rate :param fs: sampling rate
:param sig: audio signal normalized to [-1,1] :param sig: audio signal normalized to [-1,1]
""" """
super(BassAnalyzer, self).__init__()
self.D = int(self.shift_sec * fs) #: spectrogram step self.D = int(self.shift_sec * fs) #: spectrogram step
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window if Wp_force:
self.Wp = Wp_force
elif self.Wp_force:
self.Wp = self.Wp_force
else:
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
if I_force:
self.I_force = I_force
self.U = self.Wp // self.W # ratio self.U = self.Wp // self.W # ratio
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters) self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
@@ -120,21 +155,38 @@ class BassAnalyzer:
self.M = (self.L-self.W) // self.D + 1 #: number of time steps self.M = (self.L-self.W) // self.D + 1 #: number of time steps
self.fs = fs self.fs = fs
def viterbi_wavelet_scalogram_amplitudes(self): def viterbi_wavelet_scalogram_amplitudes(self, dbg_time=False):
""" """
Compute scalogram amplitudes from Viterbi path of highest-power frequencies. Compute scalogram amplitudes from Viterbi path of highest-power frequencies.
NOTE: downsampled from the original 'fs'. NOTE: downsampled from the original 'fs'.
:returns: (fsd, sig): sampling rate, amplitude signal :returns: (fsd, sig): sampling rate, amplitude signal
""" """
t1 = time.time()
Spf = self._spectrogram() Spf = self._spectrogram()
t2 = time.time()
pto = self._pulse_train(Spf) pto = self._pulse_train(Spf)
t3 = time.time()
Spf2 = self._spectrogram_2() Spf2 = self._spectrogram_2()
t4 = time.time()
pms = self._scalogram_params(pto) pms = self._scalogram_params(pto)
t5 = time.time()
Spsi_ss = self._scalogram_wavelets(pms) Spsi_ss = self._scalogram_wavelets(pms)
t6 = time.time()
Scp2 = self._scalogram(Spf2, Spsi_ss) Scp2 = self._scalogram(Spf2, Spsi_ss)
t7 = time.time()
path = self._viterbi_path(Scp2) path = self._viterbi_path(Scp2)
t8 = time.time()
ampl = self._viterbi_ampl(Scp2, path) ampl = self._viterbi_ampl(Scp2, path)
return ampl t9 = time.time()
self.Scp2 = Scp2
self.path = path
self.pms = pms
if not dbg_time:
return ampl
else:
return ampl, np.diff([t1, t2, t3, t4, t5, t6, t7, t8, t9])
def _spectrogram(self): def _spectrogram(self):
"""W-FFT (STFTs) to determine scalogram parameters""" """W-FFT (STFTs) to determine scalogram parameters"""
@@ -162,13 +214,21 @@ class BassAnalyzer:
g = np.abs(Spf) # (M x W) g = np.abs(Spf) # (M x W)
g_bar = np.mean(g, axis=1) # (M) g_bar = np.mean(g, axis=1) # (M)
# TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const. # TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const.
A = np.mean(g_bar) # amplitude cutoff for pulse train #A = np.mean(g_bar) # amplitude cutoff for pulse train
ip = int(fs)
g_bar_l = lowpass_fft(np.pad(g_bar, (ip, ip), mode='edge'), fps=fs/self.D, cf=0.5, tw=0.05)[ip:-ip]
A = g_bar_l
# compute transitions (pulse train) # compute transitions (pulse train)
pt = (g_bar > A).astype(int) # pulse train pt = (g_bar > A).astype(int) # pulse train
pt_re = (np.diff(pt) == 1).astype(int) # rising edge pt_re = (np.diff(pt) == 1).astype(int) # rising edge
self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal
# clip B, to force **reasonable** frequency range for wavelets
# (noise will otherwise cause many transitions -> high B -> bass falls below freq range -> algo fail)
B_min, B_max = 0.5 * M / (fs / self.D), 5.0 * M / (fs / self.D)
self.B = np.clip(self.B, a_min=B_min, a_max=B_max)
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded) # resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int) squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int)
ptr = pt[squashed_idxs] ptr = pt[squashed_idxs]
@@ -192,6 +252,8 @@ class BassAnalyzer:
f2 = self.f2 f2 = self.f2
Wp, Mp, Dp = self.Wp, self.Mp, self.Dp Wp, Mp, Dp = self.Wp, self.Mp, self.Dp
# TODO(perf): 5.0 sec runtime
# #
# compute spectrogram: 'Spf2' (M x Wp) <- from 'f' # compute spectrogram: 'Spf2' (M x Wp) <- from 'f'
# #
@@ -220,7 +282,10 @@ class BassAnalyzer:
T = (Lp - Wp) / fsp # un-padded sample count -> time length T = (Lp - Wp) / fsp # un-padded sample count -> time length
#omega = p * T / B # width parameter of Gabor wavelet #omega = p * T / B # width parameter of Gabor wavelet
#nu = B / (p * T) # frequency parameter of Gabor wavelet #nu = B / (p * T) # frequency parameter of Gabor wavelet
I = int(np.log2(p**2 * T**2 / (delta * B**2)) - 3/2) # number of octaves if self.I_force:
I = self.I_force
else:
I = int(np.log2(p**2 * T**2 / (delta * B**2)) - 3/2) # number of octaves
J = int(256 / I) # number of voices per octave J = int(256 / I) # number of voices per octave
r = np.linspace(0, I*J-1, I*J) r = np.linspace(0, I*J-1, I*J)
@@ -253,12 +318,18 @@ class BassAnalyzer:
# T, Lp, Wp # T, Lp, Wp
T, Lp, Wp = self.T, self.Lp, self.Wp T, Lp, Wp = self.T, self.Lp, self.Wp
# TODO(perf): reduce num of wavelets, and/or parallelize into freq slices
# TODO(perf): 3.5 sec runtime
# compute convolution with wavelets, by multiplication in freq-domain # compute convolution with wavelets, by multiplication in freq-domain
# 'Scp2' (M x I*J) # 'Scp2' (M x I*J)
Scp2 = np.matmul(Spf2, Spsi_ss.T) * (T/(Lp-Wp)) Scp2 = np.matmul(Spf2, Spsi_ss.T) * (T/(Lp-Wp))
return Scp2 return Scp2
def _viterbi_path(self, Scp2): def _viterbi_path(self, Scp2):
# TODO(perf): parallelize into time slices
# TODO(perf): 4.5 sec runtime
# TODO: check if we should re-weight freq-jumps, because of log-scale frequencies # TODO: check if we should re-weight freq-jumps, because of log-scale frequencies
path, dp, backptr = viterbi_highest_frequency_path_vectorized( path, dp, backptr = viterbi_highest_frequency_path_vectorized(
(np.abs(Scp2)**2).T, (np.abs(Scp2)**2).T,
@@ -271,3 +342,72 @@ class BassAnalyzer:
def _viterbi_ampl(self, Scp2, path): def _viterbi_ampl(self, Scp2, path):
max_amplitudes = np.array([np.abs(Scp2[i, path[i]]) for i in range(Scp2.shape[0])]) max_amplitudes = np.array([np.abs(Scp2[i, path[i]]) for i in range(Scp2.shape[0])])
return max_amplitudes return max_amplitudes
class GuitarAnalyzer:
"""
Rhythm analysis from songs.
Provides a beat amplitude signal from the audio signal.
Performs short-time Fourier Transform on the signal,
then returns the f1..f2 band power for each window.
For low-frequency instruments like percussion,
use BassAnalyzer instead.
"""
W = 1024 #: window size (must be even, so that right padding W/2-1 works)
shift_sec = 0.008 #: window shift in sec ('delta_tau') between subsequent windows
target_band_f1 = 800.0 #: lower bound of target freq band in Hz
target_band_f2 = 4300.0 #: upper bound of target freq band in Hz
def __init__(self, fs, sig):
"""
:param fs: sampling rate
:param sig: audio signal normalized to [-1,1]
"""
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
self.fs = fs
self.D = int(self.shift_sec * fs) #: spectrogram step
self.L = self.f.shape[0]
self.M = (self.L-self.W) // self.D + 1 #: number of time steps
self.fs = fs
def spectrogram_power_amplitudes(self):
"""
Compute spectrogram power from a target frequency range.
NOTE: downsampled from the original 'fs'.
:returns: (fsd, sig): sampling rate, amplitude signal
"""
Spf = self._spectrogram()
ampl = self._spectrogram_power(Spf)
return ampl
def _spectrogram_power(self, Spf):
# Spf
# fs, W
fs, W = self.fs, self.W
k1, k2 = int(self.target_band_f1/fs*W), int(self.target_band_f2/fs*W) # freq band range in W-FFT
#
# spectrum power in f1..f2 bands
#
#hp_slice = highpass(np.sum(np.abs(Spf_slice[:, k1:k2]), axis=1), fps=fs/Dp, cf=2.0, tw=0.2)
hp_slice = np.sum(np.abs(Spf[:, k1:k2]), axis=1)-np.mean(np.sum(np.abs(Spf[:, k1:k2]), axis=1))
return hp_slice
def _spectrogram(self):
"""W-FFT (STFTs) to determine scalogram parameters"""
# *f
# M, W, D
f = self.f
M, W, D = self.M, self.W, self.D
#
# compute spectrogram: 'Spf' (M x W) <- from 'f'
#
iwss = np.linspace(W//2, W//2 + (M-1)*D, M, dtype=int) # 'D'-spaced start time indices of windows on 's'
Spf = np.zeros((M, W), dtype=np.complex128)
for i, iw in zip(range(M), iwss):
iws, iwe = iw-W//2, iw+W//2
Spf[i,:] = fft(f[iws:iwe])
return Spf

70
segmenter.py Normal file
View File

@@ -0,0 +1,70 @@
import numpy as np
from sklearn.cluster import KMeans
def median_filter(a, w):
ap = np.pad(a, (w//2, w//2), mode='edge')
o = np.zeros(a.shape[0])
for i in np.arange(a.shape[0]):
sl = ap[i:i+w]
o[i] = np.median(sl)
return o
# nice-to: split longer segments (above 30 sec), merge very-short segments
class Segmenter:
seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec
seg_win_step_sec = 1.0 #: step for segmentation, in sec
n_clusters = 8 #: clusters for KMeans algorithm
seg_filt_win_sec = 20.0 #: median filter width for smoothing segments
def __init__(self): pass
def get_segments(self, fs, guitar):
i_stxs = self.get_segment_boundaries(fs, guitar)
i_stxs = np.pad(i_stxs, (1, 0))
return i_stxs
def get_segment_boundaries(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments."""
segment_ids = self._get_segments(fs, guitar)
stxs = np.diff(segment_ids) != 0
i_stxs = np.where(stxs)[0]
return i_stxs
def _get_segments(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments."""
seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec)
seg_guitar_data = self._sig_stochastics(fs, guitar)
X = np.vstack((
seg_guitar_data[:,0]*1.4,
np.sqrt(np.sum(seg_guitar_data[:,1:]**2, axis=1))
)).T
# cluster by stochastic characteristics
kmeans = KMeans(n_clusters=self.n_clusters, random_state=0, n_init="auto").fit(X)
segment_ids = np.floor(median_filter(kmeans.labels_, seg_filt_win)).astype(int)
# up-sample segment id assignment
iidx = np.linspace(0, segment_ids.shape[0], guitar.shape[0], endpoint=False).astype(int)
return segment_ids[iidx]
def _sig_stochastics(self, fs, y):
"""compute the stochastic moments of the signal. normalized."""
seg_win_size = int(self.seg_win_size_sec * fs)
seg_win_step = int(self.seg_win_step_sec * fs)
#
seg_y_data = np.zeros((y.shape[0] // seg_win_step, 4))
y_pad = np.pad(y, (seg_win_size // 2, seg_win_size // 2))
y_0_max, y_0_mean = np.max(y), np.mean(y)
y_1_max = np.max(np.mean((y - y_0_mean)**2))
y_2_max = np.max(np.mean(np.abs((y - y_0_mean)**3)))
y_3_max = np.max(np.mean((y - y_0_mean)**4))
wo = int(self.seg_win_size_sec/self.seg_win_step_sec)
for i in np.arange(wo//2, y.shape[0] // seg_win_step - wo//2):
i_c = int((i+0.5)*seg_win_step)
y_slice = y_pad[i_c-seg_win_size//2:i_c+seg_win_size//2]
mean = np.mean(y_slice)
seg_y_data[i,0] = mean / y_0_max
seg_y_data[i,1] = np.mean((y_slice - mean)**2) / y_1_max
seg_y_data[i,2] = np.mean(np.abs((y_slice - mean)**3)) / y_2_max / 2
seg_y_data[i,3] = np.mean((y_slice - mean)**4) / y_3_max / 4
#
return seg_y_data

179
song.py Normal file
View File

@@ -0,0 +1,179 @@
import numpy as np
from rhythm import BassAnalyzer, GuitarAnalyzer
from segmenter import Segmenter
from beat import SsfZxing, RegularBeatFinder
from sqi import gauss, shift
class SongBeatDetector:
SEGMENT_SLICE_LEN_SEC = 8.0 #: slice length for processing (long enough to contain bar structure; short enough for a constant freq. beat placement)
SSF_REL_THRES = 1.5 #: optimize for slope of error (mae) function over beat frequency
NE_THRES = 30.0 #: normalized error threshold for 'good' slices
def __init__(self): pass
def detect(self, fs, sig, use_f_hint=True, debug_fe_idx=None):
self.fs = fs
#self.sig = sig
self.ba = BassAnalyzer(fs, sig)
self.bass, times = self.ba.viterbi_wavelet_scalogram_amplitudes(dbg_time=True)
# times: durations of different stages
self.ga = GuitarAnalyzer(fs, sig)
self.guitar = self.ga.spectrogram_power_amplitudes()
fsd = fs / self.ga.D # <- guitar ('ga')
self.D = self.ga.D # <- guitar ('ga')
# self.bass, self.guitar: functions on windowed spectrum 0.008 sec apart (125 Hz)
self.sg = Segmenter()
self.i_seg = self.sg.get_segments(fsd, self.guitar) # <- guitar
self.t_seg = self.i_seg / fsd
self.fsd = fsd # reciprocal window step size
# we segment on 'guitar' info, but process 'bass' later
if use_f_hint:
# initial estimate (without 'f_hint')
zds_initial = self._estimate_segments(debug_fe_idx=None)
self.zds_initial = zds_initial
ifbs_good = np.array([zdd['ne'] < SongBeatDetector.NE_THRES for zdd in zds_initial])
fbs = np.array([zdd['fb'] for zdd in zds_initial])[np.where(ifbs_good)[0]]
bins, hfreq = np.histogram(fbs)
ih = np.argmax(bins)
self.f_hint = np.mean((hfreq[ih], hfreq[ih+1])) # center freq of bin
else:
self.f_hint = None
# actual estimate (using 'f_hint' to bias each segment)
self.zds = self._estimate_segments(f_hint=self.f_hint, debug_fe_idx=debug_fe_idx)
return self.zds
def _estimate_segments(self, f_hint=None, debug_fe_idx=None):
zds = []
fsd = self.fsd
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
# for each segment
for i in np.arange(self.i_seg.shape[0]-1):
i1, i2 = self.i_seg[i], self.i_seg[i+1]
t1, t2 = i1 / fsd, i2 / fsd
# split segment into slices
if i2-i1 < seg_sl: continue
num_sl = (i2-i1) // seg_sl
for m in np.arange(num_sl):
j1, j2 = i1+m*seg_sl, i1+(m+1)*seg_sl
sig_slice = self.bass[slice(j1, j2)] # <- bass
if debug_fe_idx is not None:
# there will be many (upto 50) different slices - do not debug-plot them all
debug_fe_sidx = debug_fe_idx / fs * fsd
debug_fe = i1 <= debug_fe_sidx < i2
else:
debug_fe = False
zdd = self._process_slice(j1, j2, m, sig_slice, f_hint=f_hint, debug_fe=debug_fe)
zds.append(zdd)
return zds
def _process_slice(self, j1, j2, m, sig_slice, f_hint=None, debug_fe=False):
"""
:param j1: lower index into 'sig_slice'
:param j2: upper index into 'sig_slice'
:param m: slice number (used to check if debugging)
:param debug_fe: show plots for SSF and raw/reg beat placement
"""
# TODO: C++ impl of SsfZxing._ssf_det_zxings() has diverged.
# - refractory period changes
# - ssf_th filter with 6-points
# - ?? others ??
# NOTE: SsfZxing here is always getting short 8-sec slices only (nb. for 'ssf_th' comput.)
fsd = self.fsd # reciprocal window step size
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
SsfZxing.ssf_rel_thres = SongBeatDetector.SSF_REL_THRES
zd = SsfZxing()
ssf, ssf_th = zd._ssf_function(fsd, sig_slice)
ssf_zxings = zd._ssf_det_zxings(fsd, ssf, ssf_th)
zdd = {
'i1': j1 * self.D, 'i2': j2 * self.D,
# ssf_zxings: raw beats (relative to slice)
'zd': zd, 'ssf': ssf, 'ssf_zxings': ssf_zxings,
'sig_slice': sig_slice, 'sig_source': 'bass',
'ssf_th': np.ones(ssf.shape[0]) * ssf_th
}
# (only plot first slice of a wider segment)
#if num_sl > 2 and m == 0:
if debug_fe:
#
# scalogram image, with viterbi path
self.ba.debug_plot(j1, j2) # TODO: adapt 'bass'
plt.title(f'scalogram & viterbi path, slice [{j1}:{j2}]')
# SSF function and detected raw beats
zd.debug_plot(0, seg_sl)
plt.title(f'raw beats, slice [{j1}:{j2}]')
# nice-to: optimize phase, (maybe iteratively, optimize phase and freq each)
bf = RegularBeatFinder()
fb, ne = bf.find_beat(fsd, ssf_zxings, f_hint=f_hint, debug_fe=debug_fe, debug_i=None)
if debug_fe: plt.title(f'regular-beat placement error (mae), slice [{j1}:{j2}]')
# mae is unnurmalized here, as returned from RegularBeatFinder._get_opt_ibi_freq_2()
zdd.update({
# bf: beat finder
# fb: beat frequency, in Hz
# ne: normalized mae error
'bf': bf, 'fb': fb, 'ne': ne
})
# TODO: ne > 30 is suspiciously bad - filter those "detections" out eventually
# TODO: # catch basic errors: ne == 0, or len(est_zxings) == 0, means slice is bad
# NOTE: since 2x the zero-crossings, we get twice the frequency here.
# NOTE: this means 0.5 lower freq bound of RegularBeatFinder will find at most 60 bpm in the song.
# TODO: RegularBeatFinder currently not using 'phase' info, but should be optimized
# TODO: (currently we start the pattern at the first detected beat, may or may not be good)
est_zxings = np.cumsum(np.pad(bf.freq_to_est_ibis(fsd, fb, j2-j1), (1,0))) # rel. to slice
if ssf_zxings.shape[0] > 0:
est_zxings += ssf_zxings[0] # add phase = currently we just start at first detected beat
# nice-to: median-filter the freq, etc.pp.
# nice-to: avoid adding len(est_zxings)=0 entries later
# trim back to max. index
est_zxings = est_zxings[np.where(est_zxings < ssf.shape[0])[0]]
zdd.update({
# est_zxings: regular beats (relative to slice)
'est_zxings': est_zxings
})
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(ssf)
plt.plot(np.arange(ssf.shape[0]), np.ones(ssf.shape[0]) * ssf_th); None
plt.scatter(ssf_zxings, np.ones(ssf_zxings.shape[0]) * ssf_th, c='r')
plt.scatter(est_zxings, np.ones(est_zxings.shape[0]) * ssf_th, c='g')
plt.title(f'ssf, ssf_th, raw beats (r), reg beats (g), slice [{j1}:{j2}]')
return zdd
# _debug_fmt_est_zxings
def _place_fmt_zxings(self, fsd, ssf, ssf_zxings):
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
#gauss_amplitude = 2.0
#def get_snr(self, fsd, ssf, ssf_threshold, ssf_zxings):
# """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
sigma = fsd * gauss_beat_template_sigma_sec
W = int(fsd * gauss_beat_template_win_sec)
gb = gauss(W, W//2, sigma)
# place gaussians on estimated beat locations
ssf_est = np.zeros(ssf.shape[0])
for i in ssf_zxings:
ssf_est += shift(ssf.shape[0], i, gb)
ssf_est /= gb[W//2] # normalize amplitude to 1.0
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
return ssf_est

56
sqi.py Normal file
View File

@@ -0,0 +1,56 @@
import numpy as np
def gauss(N, mu, sigma):
x = np.arange(N)
norm = sigma * np.sqrt(2*np.pi)
return np.exp(-1/2 * (x - mu)**2 / sigma**2) / norm
def shift(N, n, x):
"""shift the center of the signal 'x' to time index 'n'."""
y = np.zeros(N)
xl = x.shape[0]
s = n - xl // 2
x_bi, x_ei = np.clip(xl // 2 - n, a_min=0, a_max=xl), np.clip(N + xl - xl // 2 - n - xl % 2, a_min=0, a_max=xl)
y_bi, y_ei = np.clip(n - xl // 2, a_min=0, a_max=N), np.clip(n + xl // 2 + xl % 2, a_min=0, a_max=N)
y[y_bi:y_ei] = x[x_bi:x_ei]
return y
class SigQuality:
"""
Compute the Signal-to-Noise Ratio of beats
"""
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
#gauss_amplitude = 2.0
def __init__(self): pass
def get_snr(self, fs, ssf, ssf_threshold, est_zxings):
"""Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
sigma = fs * self.gauss_beat_template_sigma_sec
W = int(fs * self.gauss_beat_template_win_sec)
gb = gauss(W, W//2, sigma)
# place gaussians on estimated beat locations
ssf_est = np.zeros(ssf.shape[0])
for i in est_zxings:
ssf_est += shift(ssf.shape[0], i, gb)
ssf_est /= gb[W//2] # normalize amplitude to 1.0
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
#sqi_ref = ssf_est * (self.gauss_amplitude * ssf_threshold) # set amplitude
# penalty term = (where there should be no amplitude, according to gaussians)
sqi_pen = 1.0 - ssf_est
# sqi = ratio of signal energy in goal vs. in noise
sqi_goal = np.sum(ssf_est * (ssf**2))
sqi_noise = np.sum(sqi_pen * (ssf**2))
# noise is everywhere, while signal is only around detected peaks - correct for this.
goal_density = np.mean(np.clip(2*sigma / np.diff(est_zxings), a_min=0, a_max=1))
sqi_goal /= goal_density
sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise))
return sqi