Compare commits

..

5 Commits

3 changed files with 190 additions and 42 deletions

178
api.py
View File

@@ -17,13 +17,15 @@
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
import os
import re
import sqlite3
from base64 import b64encode
from datetime import datetime, timedelta, timezone
import json
from functools import wraps
from urllib.parse import urlencode
from flask import Flask, request, session, jsonify, make_response, redirect, url_for
from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for
from werkzeug.middleware.proxy_fix import ProxyFix
from authomatic import Authomatic
from authomatic.adapters import WerkzeugAdapter
@@ -114,6 +116,7 @@ ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get(
)
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections")
app = Flask(__name__)
app.secret_key = os.environ["FLASK_SECRET_KEY"]
@@ -156,6 +159,17 @@ def init_db():
updated_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS uploaded_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
spotify_user_id TEXT NOT NULL,
track_id TEXT NOT NULL,
type TEXT NOT NULL,
version INTEGER NOT NULL,
file_name TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
@@ -629,28 +643,59 @@ old example 1:
}
"""
@app.route("/me")
def me():
def spotify_access_token_from_authorization_header():
auth = request.headers.get("Authorization", "") or ""
if not auth.startswith("Bearer "):
return None
token = auth[7:].strip()
return token or None
def get_request_spotify_access_token():
"""
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
Fallback to Flask session + stored refresh flow (browser).
"""
bearer = spotify_access_token_from_authorization_header()
if bearer:
return bearer
spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id:
return jsonify({"ok": False, "error": "Not logged in"}), 401
return None
return get_valid_access_token(spotify_user_id)
access_token = get_valid_access_token(spotify_user_id)
def require_auth(f):
@wraps(f)
def wrapped(*args, **kwargs):
token = get_request_spotify_access_token()
if not token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
g.spotify_access_token = token
return f(*args, **kwargs)
return wrapped
@app.route("/me")
@require_auth
def me():
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
row = get_token_record(spotify_user_id)
row = get_token_record(profile["id"])
return jsonify({
"ok": True,
"profile": profile,
"stored_expires_at": row["expires_at"],
"stored_expires_at": row["expires_at"] if row else None,
})
@app.route("/playlists")
@require_auth
def playlists():
access_token = get_request_spotify_access_token()
if not access_token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
access_token = g.spotify_access_token
"""
user_id = "Sara"
@@ -670,10 +715,9 @@ def playlists():
@app.route("/playlists/<playlist_id>")
@require_auth
def playlist(playlist_id):
access_token = get_request_spotify_access_token()
if not access_token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
access_token = g.spotify_access_token
playlist_data = spotify_get(
f"https://api.spotify.com/v1/playlists/{playlist_id}",
@@ -709,26 +753,98 @@ def playlist(playlist_id):
})
def get_request_spotify_access_token():
"""
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
Fallback to Flask session + stored refresh flow (browser).
"""
bearer = spotify_access_token_from_authorization_header()
if bearer:
return bearer
spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id:
return None
return get_valid_access_token(spotify_user_id)
@app.route("/metadata", methods=["GET"])
@require_auth
def get_metadata():
track_id = request.args.get("trackId")
meta_type = request.args.get("type", "beats")
if not track_id:
return jsonify({"ok": False, "error": "Missing trackId"}), 400
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
conn = db()
row = conn.execute("""
SELECT file_name FROM uploaded_metadata
WHERE spotify_user_id = ? AND track_id = ? AND type = ?
ORDER BY created_at DESC
LIMIT 1
""", (spotify_user_id, track_id, meta_type)).fetchone()
conn.close()
if not row:
return jsonify({"ok": False, "error": "Not found"}), 404
file_path = os.path.join(METADATA_UPLOAD_DIR, row["file_name"])
if not os.path.isfile(file_path):
return jsonify({"ok": False, "error": "Not found"}), 404
with open(file_path, "r", encoding="utf-8") as f:
collection = json.load(f)
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "Invalid metadata file"}), 500
return jsonify({"ok": True, "collection": collection})
def spotify_access_token_from_authorization_header():
auth = request.headers.get("Authorization", "") or ""
if not auth.startswith("Bearer "):
return None
token = auth[7:].strip()
return token or None
@app.route("/metadata", methods=["POST"])
@require_auth
def upload_metadata():
access_token = g.spotify_access_token
if not request.is_json:
return jsonify({"ok": False, "error": "Expected application/json"}), 400
body = request.get_json(silent=True)
if not isinstance(body, dict):
return jsonify({"ok": False, "error": "Invalid JSON body"}), 400
track_id = body.get("trackId")
meta_type = body.get("type")
version = body.get("version")
collection = body.get("collection")
if not track_id or not meta_type or version is None or collection is None:
return jsonify({
"ok": False,
"error": "Missing trackId, type, version, or collection",
}), 400
try:
version = int(version)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "version must be an integer"}), 400
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120]
file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json"
file_path = os.path.join(METADATA_UPLOAD_DIR, file_name)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(collection, f, ensure_ascii=False)
now = datetime.now(timezone.utc).isoformat()
conn = db()
conn.execute("""
INSERT INTO uploaded_metadata (
spotify_user_id, track_id, type, version, file_name, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (spotify_user_id, track_id, meta_type, version, file_name, now))
conn.commit()
conn.close()
return jsonify({"ok": True, "file_name": file_name}), 201
if __name__ == "__main__":

16
beat.py
View File

@@ -1,5 +1,6 @@
import numpy as np
import matplotlib.pyplot as plt # for debug only
from sqi import gauss
# note: may be called ZxingDetector instead?
class SsfZxing:
@@ -161,14 +162,27 @@ class RegularBeatFinder:
num_freqs = 200 #: number of freq steps to evaluate
range_f1 = 0.5 #: lowest detection frequency in Hz
range_f2 = 4.0 #: highest detection frequency in Hz
f_bias_width = 0.2 #: gaussian std relative to num_freqs within f1..f2
def __init__(self): pass
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None):
def find_beat(self, fs, ssf_zxings, f_hint=None, debug_fe=False, debug_i=None):
"""Find the optimal beat frequency."""
act_ibis = np.diff(ssf_zxings)
# nice-to: may be interesting to also use as score: the ssf amplitude info at the beats to which we aligned
# evaluate mean absolute errors for all frequencies
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
# bias with f_hint - once we know the beat freq, make it more likely for it to be found everywhere
if f_hint is not None:
nf, f1, f2 = RegularBeatFinder.num_freqs, RegularBeatFinder.range_f1, RegularBeatFinder.range_f2
bias = gauss(
nf,
(f_hint - f1) / (f2 - f1) * nf,
RegularBeatFinder.f_bias_width * nf
)
freqs_bias = 1.0 / (np.max(bias)+bias) # make 'f_hint' at most 2x more likely -- (1+bias) if normalized
freq_errs *= freqs_bias
#
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(freqs, freq_errs)

38
song.py
View File

@@ -8,8 +8,9 @@ from sqi import gauss, shift
class SongBeatDetector:
SEGMENT_SLICE_LEN_SEC = 8.0 #: slice length for processing (long enough to contain bar structure; short enough for a constant freq. beat placement)
SSF_REL_THRES = 1.5 #: optimize for slope of error (mae) function over beat frequency
NE_THRES = 30.0 #: normalized error threshold for 'good' slices
def __init__(self): pass
def detect(self, fs, sig, debug_fe_idx=None):
def detect(self, fs, sig, use_f_hint=True, debug_fe_idx=None):
self.fs = fs
#self.sig = sig
@@ -31,10 +32,27 @@ class SongBeatDetector:
# we segment on 'guitar' info, but process 'bass' later
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd)
if use_f_hint:
# initial estimate (without 'f_hint')
zds_initial = self._estimate_segments(debug_fe_idx=None)
self.zds_initial = zds_initial
ifbs_good = np.array([zdd['ne'] < SongBeatDetector.NE_THRES for zdd in zds_initial])
fbs = np.array([zdd['fb'] for zdd in zds_initial])[np.where(ifbs_good)[0]]
bins, hfreq = np.histogram(fbs)
ih = np.argmax(bins)
self.f_hint = np.mean((hfreq[ih], hfreq[ih+1])) # center freq of bin
else:
self.f_hint = None
self.zds = []
# actual estimate (using 'f_hint' to bias each segment)
self.zds = self._estimate_segments(f_hint=self.f_hint, debug_fe_idx=debug_fe_idx)
return self.zds
def _estimate_segments(self, f_hint=None, debug_fe_idx=None):
zds = []
fsd = self.fsd
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
# for each segment
for i in np.arange(self.i_seg.shape[0]-1):
i1, i2 = self.i_seg[i], self.i_seg[i+1]
@@ -52,17 +70,16 @@ class SongBeatDetector:
debug_fe = i1 <= debug_fe_sidx < i2
else:
debug_fe = False
zdd = self._process_slice(j1, j2, m, seg_sl, sig_slice, debug_fe=debug_fe)
self.zds.append(zdd)
zdd = self._process_slice(j1, j2, m, sig_slice, f_hint=f_hint, debug_fe=debug_fe)
zds.append(zdd)
return self.zds
return zds
def _process_slice(self, j1, j2, m, seg_sl, sig_slice, debug_fe=False):
def _process_slice(self, j1, j2, m, sig_slice, f_hint=None, debug_fe=False):
"""
:param j1: lower index into 'sig_slice'
:param j2: upper index into 'sig_slice'
:param m: slice number (used to check if debugging)
:param seg_sl: segment slice length in 1/fsd units
:param debug_fe: show plots for SSF and raw/reg beat placement
"""
# TODO: C++ impl of SsfZxing._ssf_det_zxings() has diverged.
@@ -71,7 +88,8 @@ class SongBeatDetector:
# - ?? others ??
# NOTE: SsfZxing here is always getting short 8-sec slices only (nb. for 'ssf_th' comput.)
fsd = self.fsd #: reciprocal window step size
fsd = self.fsd # reciprocal window step size
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
SsfZxing.ssf_rel_thres = SongBeatDetector.SSF_REL_THRES
zd = SsfZxing()
@@ -100,7 +118,7 @@ class SongBeatDetector:
# nice-to: optimize phase, (maybe iteratively, optimize phase and freq each)
bf = RegularBeatFinder()
fb, ne = bf.find_beat(fsd, ssf_zxings, debug_fe=debug_fe, debug_i=None)
fb, ne = bf.find_beat(fsd, ssf_zxings, f_hint=f_hint, debug_fe=debug_fe, debug_i=None)
if debug_fe: plt.title(f'regular-beat placement error (mae), slice [{j1}:{j2}]')
# mae is unnurmalized here, as returned from RegularBeatFinder._get_opt_ibi_freq_2()
zdd.update({