Compare commits

...

9 Commits

6 changed files with 408 additions and 53 deletions

11
TODO.md Normal file
View File

@@ -0,0 +1,11 @@
# TODO
- tests: beat and guitar synthesizer
- generate rhythmic sequence and test the algos
.
O> "2027-04-29 TestApi Bass song.ipynb": [21]
- why is ssf continually rising?
- because of using 'fs' not 'fsd'
.

231
api.py
View File

@@ -17,19 +17,24 @@
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
import os
import re
import sqlite3
from base64 import b64encode
from datetime import datetime, timedelta, timezone
import json
from functools import wraps
from urllib.parse import urlencode
from flask import Flask, request, session, jsonify, make_response, redirect, url_for
from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for
from werkzeug.middleware.proxy_fix import ProxyFix
from authomatic import Authomatic
from authomatic.adapters import WerkzeugAdapter
from authomatic.providers import oauth2
from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP
import random
import time
import requests
@@ -111,6 +116,7 @@ ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get(
)
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections")
app = Flask(__name__)
app.secret_key = os.environ["FLASK_SECRET_KEY"]
@@ -153,6 +159,17 @@ def init_db():
updated_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS uploaded_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
spotify_user_id TEXT NOT NULL,
track_id TEXT NOT NULL,
type TEXT NOT NULL,
version INTEGER NOT NULL,
file_name TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
@@ -294,9 +311,30 @@ def get_valid_access_token(spotify_user_id: str) -> str:
# ----------------------------
def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict:
"""GET a Spotify Web API endpoint and return the parsed JSON body."""
"""
GET a Spotify Web API endpoint and return the parsed JSON body.
Retries on HTTP 429 and 503 with exponential backoff and optional Retry-After,
so brief Spotify rate limits often clear before we surface an error to the app.
"""
headers = {"Authorization": f"Bearer {access_token}"}
r = requests.get(url, headers=headers, params=params)
backoff_sec = 1.0
max_attempts = 8
params_eff = params
for attempt in range(max_attempts):
r = requests.get(url, headers=headers, params=params_eff)
if r.status_code in (429, 503) and attempt < max_attempts - 1:
wait = backoff_sec
ra = r.headers.get("Retry-After")
if ra:
try:
wait = max(wait, float(ra))
except ValueError:
pass
wait = min(wait, 120.0)
time.sleep(wait + random.random() * 0.25 * wait)
backoff_sec = min(backoff_sec * 2.0, 60.0)
continue
r.raise_for_status()
return r.json()
@@ -605,28 +643,59 @@ old example 1:
}
"""
@app.route("/me")
def me():
def spotify_access_token_from_authorization_header():
auth = request.headers.get("Authorization", "") or ""
if not auth.startswith("Bearer "):
return None
token = auth[7:].strip()
return token or None
def get_request_spotify_access_token():
"""
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
Fallback to Flask session + stored refresh flow (browser).
"""
bearer = spotify_access_token_from_authorization_header()
if bearer:
return bearer
spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id:
return jsonify({"ok": False, "error": "Not logged in"}), 401
return None
return get_valid_access_token(spotify_user_id)
access_token = get_valid_access_token(spotify_user_id)
def require_auth(f):
@wraps(f)
def wrapped(*args, **kwargs):
token = get_request_spotify_access_token()
if not token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
g.spotify_access_token = token
return f(*args, **kwargs)
return wrapped
@app.route("/me")
@require_auth
def me():
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
row = get_token_record(spotify_user_id)
row = get_token_record(profile["id"])
return jsonify({
"ok": True,
"profile": profile,
"stored_expires_at": row["expires_at"],
"stored_expires_at": row["expires_at"] if row else None,
})
@app.route("/playlists")
@require_auth
def playlists():
access_token = get_request_spotify_access_token()
if not access_token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
access_token = g.spotify_access_token
"""
user_id = "Sara"
@@ -646,32 +715,34 @@ def playlists():
@app.route("/playlists/<playlist_id>")
@require_auth
def playlist(playlist_id):
access_token = get_request_spotify_access_token()
if not access_token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
access_token = g.spotify_access_token
playlist_data = spotify_get(
f"https://api.spotify.com/v1/playlists/{playlist_id}",
access_token,
)
# The playlist response embeds a `tracks` paging object whose first page
# contains up to 100 items. For playlists larger than that, follow the
# dedicated tracks endpoint until exhausted and splice the full list back
# into the response.
tracks_obj = playlist_data.get("tracks") or {}
if tracks_obj.get("next"):
all_tracks = spotify_get_paginated(
# Full playlist objects use a paging object at `items` (current Spotify shape)
# or legacy `tracks`. Follow `next` on whichever is present.
paging_key = (
"items"
if isinstance(playlist_data.get("items"), dict)
else "tracks"
)
paging = playlist_data.get(paging_key) or {}
if paging.get("next"):
all_items = spotify_get_paginated(
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks",
access_token,
limit=100,
)
playlist_data["tracks"] = {
**tracks_obj,
"items": all_tracks,
playlist_data[paging_key] = {
**paging,
"items": all_items,
"offset": 0,
"limit": len(all_tracks),
"limit": len(all_items),
"next": None,
"previous": None,
}
@@ -682,26 +753,98 @@ def playlist(playlist_id):
})
def get_request_spotify_access_token():
"""
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
Fallback to Flask session + stored refresh flow (browser).
"""
bearer = spotify_access_token_from_authorization_header()
if bearer:
return bearer
spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id:
return None
return get_valid_access_token(spotify_user_id)
@app.route("/metadata", methods=["GET"])
@require_auth
def get_metadata():
track_id = request.args.get("trackId")
meta_type = request.args.get("type", "beats")
if not track_id:
return jsonify({"ok": False, "error": "Missing trackId"}), 400
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
conn = db()
row = conn.execute("""
SELECT file_name FROM uploaded_metadata
WHERE spotify_user_id = ? AND track_id = ? AND type = ?
ORDER BY created_at DESC
LIMIT 1
""", (spotify_user_id, track_id, meta_type)).fetchone()
conn.close()
if not row:
return jsonify({"ok": False, "error": "Not found"}), 404
file_path = os.path.join(METADATA_UPLOAD_DIR, row["file_name"])
if not os.path.isfile(file_path):
return jsonify({"ok": False, "error": "Not found"}), 404
with open(file_path, "r", encoding="utf-8") as f:
collection = json.load(f)
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "Invalid metadata file"}), 500
return jsonify({"ok": True, "collection": collection})
def spotify_access_token_from_authorization_header():
auth = request.headers.get("Authorization", "") or ""
if not auth.startswith("Bearer "):
return None
token = auth[7:].strip()
return token or None
@app.route("/metadata", methods=["POST"])
@require_auth
def upload_metadata():
access_token = g.spotify_access_token
if not request.is_json:
return jsonify({"ok": False, "error": "Expected application/json"}), 400
body = request.get_json(silent=True)
if not isinstance(body, dict):
return jsonify({"ok": False, "error": "Invalid JSON body"}), 400
track_id = body.get("trackId")
meta_type = body.get("type")
version = body.get("version")
collection = body.get("collection")
if not track_id or not meta_type or version is None or collection is None:
return jsonify({
"ok": False,
"error": "Missing trackId, type, version, or collection",
}), 400
try:
version = int(version)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "version must be an integer"}), 400
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120]
file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json"
file_path = os.path.join(METADATA_UPLOAD_DIR, file_name)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(collection, f, ensure_ascii=False)
now = datetime.now(timezone.utc).isoformat()
conn = db()
conn.execute("""
INSERT INTO uploaded_metadata (
spotify_user_id, track_id, type, version, file_name, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (spotify_user_id, track_id, meta_type, version, file_name, now))
conn.commit()
conn.close()
return jsonify({"ok": True, "file_name": file_name}), 201
if __name__ == "__main__":

17
beat.py
View File

@@ -1,6 +1,8 @@
import numpy as np
import matplotlib.pyplot as plt # for debug only
from sqi import gauss
# note: may be called ZxingDetector instead?
class SsfZxing:
"""
Find beats in a Sum Slope Function by detecting threshold crossings.
@@ -160,14 +162,27 @@ class RegularBeatFinder:
num_freqs = 200 #: number of freq steps to evaluate
range_f1 = 0.5 #: lowest detection frequency in Hz
range_f2 = 4.0 #: highest detection frequency in Hz
f_bias_width = 0.2 #: gaussian std relative to num_freqs within f1..f2
def __init__(self): pass
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None):
def find_beat(self, fs, ssf_zxings, f_hint=None, debug_fe=False, debug_i=None):
"""Find the optimal beat frequency."""
act_ibis = np.diff(ssf_zxings)
# nice-to: may be interesting to also use as score: the ssf amplitude info at the beats to which we aligned
# evaluate mean absolute errors for all frequencies
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
# bias with f_hint - once we know the beat freq, make it more likely for it to be found everywhere
if f_hint is not None:
nf, f1, f2 = RegularBeatFinder.num_freqs, RegularBeatFinder.range_f1, RegularBeatFinder.range_f2
bias = gauss(
nf,
(f_hint - f1) / (f2 - f1) * nf,
RegularBeatFinder.f_bias_width * nf
)
freqs_bias = 1.0 / (np.max(bias)+bias) # make 'f_hint' at most 2x more likely -- (1+bias) if normalized
freq_errs *= freqs_bias
#
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(freqs, freq_errs)

View File

@@ -131,19 +131,21 @@ class BassAnalyzer(Analyzer):
Wp_force = None
I_force = None
def __init__(self, fs, sig, Wp_force=None):
def __init__(self, fs, sig, Wp_force=None, I_force=None):
"""
:param fs: sampling rate
:param sig: audio signal normalized to [-1,1]
"""
super(BassAnalyzer, self).__init__()
self.D = int(self.shift_sec * fs) #: spectrogram step
if self.Wp_force:
self.Wp = self.Wp_force
elif Wp_force:
if Wp_force:
self.Wp = Wp_force
elif self.Wp_force:
self.Wp = self.Wp_force
else:
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
if I_force:
self.I_force = I_force
self.U = self.Wp // self.W # ratio
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)

View File

@@ -19,14 +19,19 @@ class Segmenter:
def __init__(self): pass
def get_segments(self, fs, guitar):
i_stxs = self.get_segment_boundaries(fs, guitar)
i_stxs = np.pad(i_stxs, (1, 0))
return i_stxs
def get_segment_boundaries(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments."""
segment_ids = self.get_segments(fs, guitar)
segment_ids = self._get_segments(fs, guitar)
stxs = np.diff(segment_ids) != 0
i_stxs = np.where(stxs)[0]
return i_stxs
def get_segments(self, fs, guitar):
def _get_segments(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments."""
seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec)
seg_guitar_data = self._sig_stochastics(fs, guitar)

179
song.py Normal file
View File

@@ -0,0 +1,179 @@
import numpy as np
from rhythm import BassAnalyzer, GuitarAnalyzer
from segmenter import Segmenter
from beat import SsfZxing, RegularBeatFinder
from sqi import gauss, shift
class SongBeatDetector:
SEGMENT_SLICE_LEN_SEC = 8.0 #: slice length for processing (long enough to contain bar structure; short enough for a constant freq. beat placement)
SSF_REL_THRES = 1.5 #: optimize for slope of error (mae) function over beat frequency
NE_THRES = 30.0 #: normalized error threshold for 'good' slices
def __init__(self): pass
def detect(self, fs, sig, use_f_hint=True, debug_fe_idx=None):
self.fs = fs
#self.sig = sig
self.ba = BassAnalyzer(fs, sig)
self.bass, times = self.ba.viterbi_wavelet_scalogram_amplitudes(dbg_time=True)
# times: durations of different stages
self.ga = GuitarAnalyzer(fs, sig)
self.guitar = self.ga.spectrogram_power_amplitudes()
fsd = fs / self.ga.D # <- guitar ('ga')
self.D = self.ga.D # <- guitar ('ga')
# self.bass, self.guitar: functions on windowed spectrum 0.008 sec apart (125 Hz)
self.sg = Segmenter()
self.i_seg = self.sg.get_segments(fsd, self.guitar) # <- guitar
self.t_seg = self.i_seg / fsd
self.fsd = fsd # reciprocal window step size
# we segment on 'guitar' info, but process 'bass' later
if use_f_hint:
# initial estimate (without 'f_hint')
zds_initial = self._estimate_segments(debug_fe_idx=None)
self.zds_initial = zds_initial
ifbs_good = np.array([zdd['ne'] < SongBeatDetector.NE_THRES for zdd in zds_initial])
fbs = np.array([zdd['fb'] for zdd in zds_initial])[np.where(ifbs_good)[0]]
bins, hfreq = np.histogram(fbs)
ih = np.argmax(bins)
self.f_hint = np.mean((hfreq[ih], hfreq[ih+1])) # center freq of bin
else:
self.f_hint = None
# actual estimate (using 'f_hint' to bias each segment)
self.zds = self._estimate_segments(f_hint=self.f_hint, debug_fe_idx=debug_fe_idx)
return self.zds
def _estimate_segments(self, f_hint=None, debug_fe_idx=None):
zds = []
fsd = self.fsd
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
# for each segment
for i in np.arange(self.i_seg.shape[0]-1):
i1, i2 = self.i_seg[i], self.i_seg[i+1]
t1, t2 = i1 / fsd, i2 / fsd
# split segment into slices
if i2-i1 < seg_sl: continue
num_sl = (i2-i1) // seg_sl
for m in np.arange(num_sl):
j1, j2 = i1+m*seg_sl, i1+(m+1)*seg_sl
sig_slice = self.bass[slice(j1, j2)] # <- bass
if debug_fe_idx is not None:
# there will be many (upto 50) different slices - do not debug-plot them all
debug_fe_sidx = debug_fe_idx / fs * fsd
debug_fe = i1 <= debug_fe_sidx < i2
else:
debug_fe = False
zdd = self._process_slice(j1, j2, m, sig_slice, f_hint=f_hint, debug_fe=debug_fe)
zds.append(zdd)
return zds
def _process_slice(self, j1, j2, m, sig_slice, f_hint=None, debug_fe=False):
"""
:param j1: lower index into 'sig_slice'
:param j2: upper index into 'sig_slice'
:param m: slice number (used to check if debugging)
:param debug_fe: show plots for SSF and raw/reg beat placement
"""
# TODO: C++ impl of SsfZxing._ssf_det_zxings() has diverged.
# - refractory period changes
# - ssf_th filter with 6-points
# - ?? others ??
# NOTE: SsfZxing here is always getting short 8-sec slices only (nb. for 'ssf_th' comput.)
fsd = self.fsd # reciprocal window step size
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
SsfZxing.ssf_rel_thres = SongBeatDetector.SSF_REL_THRES
zd = SsfZxing()
ssf, ssf_th = zd._ssf_function(fsd, sig_slice)
ssf_zxings = zd._ssf_det_zxings(fsd, ssf, ssf_th)
zdd = {
'i1': j1 * self.D, 'i2': j2 * self.D,
# ssf_zxings: raw beats (relative to slice)
'zd': zd, 'ssf': ssf, 'ssf_zxings': ssf_zxings,
'sig_slice': sig_slice, 'sig_source': 'bass',
'ssf_th': np.ones(ssf.shape[0]) * ssf_th
}
# (only plot first slice of a wider segment)
#if num_sl > 2 and m == 0:
if debug_fe:
#
# scalogram image, with viterbi path
self.ba.debug_plot(j1, j2) # TODO: adapt 'bass'
plt.title(f'scalogram & viterbi path, slice [{j1}:{j2}]')
# SSF function and detected raw beats
zd.debug_plot(0, seg_sl)
plt.title(f'raw beats, slice [{j1}:{j2}]')
# nice-to: optimize phase, (maybe iteratively, optimize phase and freq each)
bf = RegularBeatFinder()
fb, ne = bf.find_beat(fsd, ssf_zxings, f_hint=f_hint, debug_fe=debug_fe, debug_i=None)
if debug_fe: plt.title(f'regular-beat placement error (mae), slice [{j1}:{j2}]')
# mae is unnurmalized here, as returned from RegularBeatFinder._get_opt_ibi_freq_2()
zdd.update({
# bf: beat finder
# fb: beat frequency, in Hz
# ne: normalized mae error
'bf': bf, 'fb': fb, 'ne': ne
})
# TODO: ne > 30 is suspiciously bad - filter those "detections" out eventually
# TODO: # catch basic errors: ne == 0, or len(est_zxings) == 0, means slice is bad
# NOTE: since 2x the zero-crossings, we get twice the frequency here.
# NOTE: this means 0.5 lower freq bound of RegularBeatFinder will find at most 60 bpm in the song.
# TODO: RegularBeatFinder currently not using 'phase' info, but should be optimized
# TODO: (currently we start the pattern at the first detected beat, may or may not be good)
est_zxings = np.cumsum(np.pad(bf.freq_to_est_ibis(fsd, fb, j2-j1), (1,0))) # rel. to slice
if ssf_zxings.shape[0] > 0:
est_zxings += ssf_zxings[0] # add phase = currently we just start at first detected beat
# nice-to: median-filter the freq, etc.pp.
# nice-to: avoid adding len(est_zxings)=0 entries later
# trim back to max. index
est_zxings = est_zxings[np.where(est_zxings < ssf.shape[0])[0]]
zdd.update({
# est_zxings: regular beats (relative to slice)
'est_zxings': est_zxings
})
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(ssf)
plt.plot(np.arange(ssf.shape[0]), np.ones(ssf.shape[0]) * ssf_th); None
plt.scatter(ssf_zxings, np.ones(ssf_zxings.shape[0]) * ssf_th, c='r')
plt.scatter(est_zxings, np.ones(est_zxings.shape[0]) * ssf_th, c='g')
plt.title(f'ssf, ssf_th, raw beats (r), reg beats (g), slice [{j1}:{j2}]')
return zdd
# _debug_fmt_est_zxings
def _place_fmt_zxings(self, fsd, ssf, ssf_zxings):
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
#gauss_amplitude = 2.0
#def get_snr(self, fsd, ssf, ssf_threshold, ssf_zxings):
# """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
sigma = fsd * gauss_beat_template_sigma_sec
W = int(fsd * gauss_beat_template_win_sec)
gb = gauss(W, W//2, sigma)
# place gaussians on estimated beat locations
ssf_est = np.zeros(ssf.shape[0])
for i in ssf_zxings:
ssf_est += shift(ssf.shape[0], i, gb)
ssf_est /= gb[W//2] # normalize amplitude to 1.0
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
return ssf_est