Compare commits

..

15 Commits

8 changed files with 784 additions and 56 deletions

11
TODO.md Normal file
View File

@@ -0,0 +1,11 @@
# TODO
- tests: beat and guitar synthesizer
- generate rhythmic sequence and test the algos
.
O> "2027-04-29 TestApi Bass song.ipynb": [21]
- why is ssf continually rising?
- because of using 'fs' not 'fsd'
.

428
api.py
View File

@@ -17,20 +17,24 @@
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
import os
import re
import sqlite3
from base64 import b64encode
from datetime import datetime, timedelta, timezone
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import json
from functools import wraps
from urllib.parse import urlencode
from flask import Flask, request, session, jsonify, make_response
from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for
from werkzeug.middleware.proxy_fix import ProxyFix
from authomatic import Authomatic
from authomatic.adapters import WerkzeugAdapter
from authomatic.providers import oauth2
from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP
import random
import time
import requests
@@ -95,14 +99,24 @@ PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify]
SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"]
SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"]
# Must exactly match a Redirect URI configured in your Spotify app settings.
# OAuth redirect registered in the Spotify Developer Dashboard (Authorization Code
# flow). Spotify sends the browser here with ?code=&state= — not to the Android app scheme.
REDIRECT_URI = os.environ.get(
"SPOTIFY_REDIRECT_URI",
#"https://api.lockstep.at/spotify/callback"
"https://api.lockstep.at/login/spotify/"
)
# After server-side token exchange, the browser may be redirected to this URL with
# tokens in the query string (mobile / Custom Tab). This is NOT registered with Spotify;
# only REDIRECT_URI above goes in the dashboard for this flow.
ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get(
"SPOTIFY_APP_POST_LOGIN_REDIRECT_URI",
"at.lockstep.player://spotify/callback",
)
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections")
app = Flask(__name__)
app.secret_key = os.environ["FLASK_SECRET_KEY"]
@@ -145,6 +159,17 @@ def init_db():
updated_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS uploaded_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
spotify_user_id TEXT NOT NULL,
track_id TEXT NOT NULL,
type TEXT NOT NULL,
version INTEGER NOT NULL,
file_name TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
@@ -214,25 +239,41 @@ def spotify_basic_auth_header() -> str:
def refresh_spotify_token(refresh_token: str) -> dict:
body = urlencode({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}).encode("utf-8")
req = Request(
r = requests.post(
"https://accounts.spotify.com/api/token",
data=body,
method="POST",
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
headers={
"Authorization": spotify_basic_auth_header(),
"Content-Type": "application/x-www-form-urlencoded",
},
)
with urlopen(req) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if not r.ok:
# The token endpoint returns errors as flat
# {"error": "<code>", "error_description": "<msg>"}, which differs from
# the Web API's {"error": {"status": ..., "message": ...}} shape. Rewrite
# the body so our error handler surfaces a consistent envelope to
# clients regardless of which Spotify endpoint failed.
try:
body = r.json()
except ValueError:
body = {}
message = (
body.get("error_description")
or body.get("error")
or r.reason
or "Token refresh failed"
)
r._content = json.dumps({
"error": {"status": r.status_code, "message": message}
}).encode("utf-8")
r.headers["Content-Type"] = "application/json"
return payload
r.raise_for_status()
return r.json()
def get_valid_access_token(spotify_user_id: str) -> str:
@@ -266,17 +307,121 @@ def get_valid_access_token(spotify_user_id: str) -> str:
# ----------------------------
# Simple Spotify API call
# Spotify Web API helpers
# ----------------------------
def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict:
"""
GET a Spotify Web API endpoint and return the parsed JSON body.
Retries on HTTP 429 and 503 with exponential backoff and optional Retry-After,
so brief Spotify rate limits often clear before we surface an error to the app.
"""
headers = {"Authorization": f"Bearer {access_token}"}
backoff_sec = 1.0
max_attempts = 8
params_eff = params
for attempt in range(max_attempts):
r = requests.get(url, headers=headers, params=params_eff)
if r.status_code in (429, 503) and attempt < max_attempts - 1:
wait = backoff_sec
ra = r.headers.get("Retry-After")
if ra:
try:
wait = max(wait, float(ra))
except ValueError:
pass
wait = min(wait, 120.0)
time.sleep(wait + random.random() * 0.25 * wait)
backoff_sec = min(backoff_sec * 2.0, 60.0)
continue
r.raise_for_status()
return r.json()
def spotify_get_paginated(
url: str,
access_token: str,
limit: int = 50,
max_items: int | None = None,
) -> list:
"""
Fetch all items from a paginated Spotify Web API endpoint.
Spotify paging objects return up to `limit` items per page (50 max for most
endpoints) and a `next` URL. We follow `next` until it is null, collecting
items along the way.
"""
items: list = []
params: dict | None = {"limit": limit, "offset": 0}
next_url: str | None = url
while next_url is not None:
page = spotify_get(next_url, access_token, params=params)
items.extend(page.get("items", []))
if max_items is not None and len(items) >= max_items:
return items[:max_items]
# `next` is a fully-qualified URL with limit/offset already encoded,
# so we must not pass `params` again after the first request.
next_url = page.get("next")
params = None
return items
def spotify_get_me(access_token: str) -> dict:
req = Request(
"https://api.spotify.com/v1/me",
headers={"Authorization": f"Bearer {access_token}"},
method="GET",
)
with urlopen(req) as resp:
return json.loads(resp.read().decode("utf-8"))
return spotify_get("https://api.spotify.com/v1/me", access_token)
# ----------------------------
# Error handling
# ----------------------------
@app.errorhandler(requests.HTTPError)
def handle_spotify_http_error(e: requests.HTTPError):
"""
Translate a non-2xx upstream response from Spotify into a JSON envelope,
passing through the upstream HTTP status code.
Spotify error bodies look like {"error": {"status": ..., "message": ...}}
when JSON is returned; we surface that message when available.
"""
resp = e.response
status = resp.status_code if resp is not None else 502
spotify_error = None
error_message = str(e)
if resp is not None:
try:
spotify_error = resp.json()
except ValueError:
spotify_error = None
if (
isinstance(spotify_error, dict)
and isinstance(spotify_error.get("error"), dict)
):
error_message = (
spotify_error["error"].get("message") or error_message
)
return jsonify({
"ok": False,
"error": error_message,
"spotify": spotify_error,
}), status
@app.errorhandler(requests.RequestException)
def handle_spotify_request_error(e: requests.RequestException):
"""Network-level failures (DNS, connection, timeout) have no upstream status."""
return jsonify({
"ok": False,
"error": f"Upstream request failed: {e}",
}), 502
# ----------------------------
@@ -294,6 +439,28 @@ def index():
@app.route("/login/<provider_name>/", methods=["GET", "POST"])
def login(provider_name):
# Authomatic 1.3.0 (oauth2.login) only runs "phase 1" — redirect to Spotify —
# when there are no query parameters, or only ``user_state``:
# elif (not self.params or (len(self.params) == 1 and 'user_state' in self.params))
# A mobile client opening ``/login/spotify/?redirect_uri=...`` therefore matches
# no branch; login() returns without calling redirect() → empty body and HTTP 200
# (white page). Stash the app callback in the session and reload without query args.
if (
provider_name == "spotify"
and request.args.get("redirect_uri")
and "code" not in request.args
and "error" not in request.args
):
requested = request.args.get("redirect_uri", "")
if requested != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
return jsonify({
"ok": False,
"error": "redirect_uri not allowed for this client",
}), 400
session["spotify_oauth_app_redirect_uri"] = requested
session.modified = True
return redirect(url_for("login", provider_name=provider_name), code=302)
response = make_response()
# Let Authomatic handle the OAuth2 handshake.
@@ -301,7 +468,7 @@ def login(provider_name):
WerkzeugAdapter(request, response),
provider_name,
session=session,
session_saver=lambda: session.modified
session_saver=lambda: setattr(session, "modified", True),
)
# If result is None, Authomatic is still redirecting/processing.
@@ -372,6 +539,27 @@ def login(provider_name):
# keep the Spotify user id in session
session["spotify_user_id"] = result.user.id
app_redirect = session.pop("spotify_oauth_app_redirect_uri", None)
if app_redirect:
if app_redirect != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
return jsonify({
"ok": False,
"error": "Stored app redirect_uri does not match allowlist",
}), 400
sep = "&" if ("?" in app_redirect) else "?"
target = (
f"{app_redirect}{sep}"
+ urlencode(
{
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": str(expires_in),
"token_type": token_type or "",
},
)
)
return redirect(target, code=302)
return jsonify({
"ok": True,
"spotify_user_id": result.user.id,
@@ -455,44 +643,210 @@ old example 1:
}
"""
@app.route("/me")
def me():
def spotify_access_token_from_authorization_header():
auth = request.headers.get("Authorization", "") or ""
if not auth.startswith("Bearer "):
return None
token = auth[7:].strip()
return token or None
def get_request_spotify_access_token():
"""
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
Fallback to Flask session + stored refresh flow (browser).
"""
bearer = spotify_access_token_from_authorization_header()
if bearer:
return bearer
spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id:
return jsonify({"ok": False, "error": "Not logged in"}), 401
return None
return get_valid_access_token(spotify_user_id)
access_token = get_valid_access_token(spotify_user_id)
def require_auth(f):
@wraps(f)
def wrapped(*args, **kwargs):
token = get_request_spotify_access_token()
if not token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
g.spotify_access_token = token
return f(*args, **kwargs)
return wrapped
@app.route("/me")
@require_auth
def me():
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
row = get_token_record(spotify_user_id)
row = get_token_record(profile["id"])
return jsonify({
"ok": True,
"profile": profile,
"stored_expires_at": row["expires_at"],
"stored_expires_at": row["expires_at"] if row else None,
})
@app.route("/playlists")
@require_auth
def playlists():
spotify_user_id = "cidermole"
access_token = get_valid_access_token(spotify_user_id)
access_token = g.spotify_access_token
"""
user_id = "Sara"
url = f"https://api.spotify.com/v1/users/{user_id}/playlists"
# -> 403 Forbidden
"""
url = f"https://api.spotify.com/v1/me/playlists"
headers={"Authorization": f"Bearer {access_token}"}
r = requests.get(url, headers=headers)
# TODO: pagination (limit,offset)
items = spotify_get_paginated(
"https://api.spotify.com/v1/me/playlists",
access_token,
)
return jsonify({
"ok": True,
"response": r.json()
"total": len(items),
"items": items,
})
@app.route("/playlists/<playlist_id>")
@require_auth
def playlist(playlist_id):
access_token = g.spotify_access_token
playlist_data = spotify_get(
f"https://api.spotify.com/v1/playlists/{playlist_id}",
access_token,
)
# Full playlist objects use a paging object at `items` (current Spotify shape)
# or legacy `tracks`. Follow `next` on whichever is present.
paging_key = (
"items"
if isinstance(playlist_data.get("items"), dict)
else "tracks"
)
paging = playlist_data.get(paging_key) or {}
if paging.get("next"):
all_items = spotify_get_paginated(
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks",
access_token,
limit=100,
)
playlist_data[paging_key] = {
**paging,
"items": all_items,
"offset": 0,
"limit": len(all_items),
"next": None,
"previous": None,
}
return jsonify({
"ok": True,
"playlist": playlist_data,
})
@app.route("/metadata", methods=["GET"])
@require_auth
def get_metadata():
track_id = request.args.get("trackId")
meta_type = request.args.get("type", "beats")
if not track_id:
return jsonify({"ok": False, "error": "Missing trackId"}), 400
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
conn = db()
row = conn.execute("""
SELECT file_name FROM uploaded_metadata
WHERE spotify_user_id = ? AND track_id = ? AND type = ?
ORDER BY created_at DESC
LIMIT 1
""", (spotify_user_id, track_id, meta_type)).fetchone()
conn.close()
if not row:
return jsonify({"ok": False, "error": "Not found"}), 404
file_path = os.path.join(METADATA_UPLOAD_DIR, row["file_name"])
if not os.path.isfile(file_path):
return jsonify({"ok": False, "error": "Not found"}), 404
with open(file_path, "r", encoding="utf-8") as f:
collection = json.load(f)
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "Invalid metadata file"}), 500
return jsonify({"ok": True, "collection": collection})
@app.route("/metadata", methods=["POST"])
@require_auth
def upload_metadata():
access_token = g.spotify_access_token
if not request.is_json:
return jsonify({"ok": False, "error": "Expected application/json"}), 400
body = request.get_json(silent=True)
if not isinstance(body, dict):
return jsonify({"ok": False, "error": "Invalid JSON body"}), 400
track_id = body.get("trackId")
meta_type = body.get("type")
version = body.get("version")
collection = body.get("collection")
if not track_id or not meta_type or version is None or collection is None:
return jsonify({
"ok": False,
"error": "Missing trackId, type, version, or collection",
}), 400
try:
version = int(version)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "version must be an integer"}), 400
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120]
file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json"
file_path = os.path.join(METADATA_UPLOAD_DIR, file_name)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(collection, f, ensure_ascii=False)
now = datetime.now(timezone.utc).isoformat()
conn = db()
conn.execute("""
INSERT INTO uploaded_metadata (
spotify_user_id, track_id, type, version, file_name, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (spotify_user_id, track_id, meta_type, version, file_name, now))
conn.commit()
conn.close()
return jsonify({"ok": True, "file_name": file_name}), 201
if __name__ == "__main__":
init_db()
app.run(host="127.0.0.1", port=8000, debug=True)

54
beat.py
View File

@@ -1,5 +1,8 @@
import numpy as np
import matplotlib.pyplot as plt # for debug only
from sqi import gauss
# note: may be called ZxingDetector instead?
class SsfZxing:
"""
Find beats in a Sum Slope Function by detecting threshold crossings.
@@ -7,18 +10,21 @@ class SsfZxing:
"""
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
# these two depend on each other.
t_range = 0.024 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
sw_sec = 0.04 #: upslope width in sec (for SSF function)
ssf_rel_thres = 3 #: magic number from Zong 2003, to scale mean SSF amplitude
ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
# TODO: C++ impl has diverged.
# - refractory period changes
# - ssf_th filter with 6-points
# - ?? others ??
def __init__(self): pass
def _ssf_det_zxings(self, fs, ssf):
def _ssf_det_zxings(self, fs, ssf, ssf_th):
"""detect threshold crossings in 'ssf' signal."""
i_holdoff = int(self.t_holdoff * fs)
# TODO: check if we need lowpass instead of mean for 'ssf_th'
ssf_th = self.ssf_rel_thres * np.mean(ssf)
# threshold crossing
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
@@ -31,26 +37,38 @@ class SsfZxing:
i_range = int(self.t_range * fs)
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
if ssf_z[i]:
rise = ssf[i+i_range] - ssf[i-i_range]
rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i])
if rise < ssf_th * self.ssf_rel_rise:
ssf_z[i] = 0
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
ssf_zxings = np.where(ssf_z)[0]
# only integer-index resolution (no interpolation)
self.ssf_zxings = ssf_zxings
return ssf_zxings
def _ssf_function(self, fs, y):
"""sum-slope function."""
sw = int(self.sw_sec*fs)
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
duks = np.pad(np.cumsum(duk), (0, sw))
duks_r = np.roll(duks, sw)
ssf = (duks - duks_r)[:-sw]
return ssf
# compute threshold
# TODO: check if we need lowpass instead of mean for 'ssf_th'
ssf_th = self.ssf_rel_thres * np.mean(ssf)
self.ssf, self.ssf_th = ssf, ssf_th
return ssf, ssf_th
def debug_plot(self, i1, i2):
ssf, ssf_th, ssf_zxings = self.ssf, self.ssf_th, self.ssf_zxings
ssf_slice = ssf[i1:i2]
ssf_th_slice = ssf_th[i1:i2] if isinstance(ssf_th, np.ndarray) else ssf_th
plt.figure(figsize=(8, 2))
plt.plot(ssf_slice)
plt.plot(np.arange(ssf_slice.shape[0]), np.ones(ssf_slice.shape[0]) * ssf_th_slice)
plt.scatter(ssf_zxings[i1:i2], np.ones(ssf_zxings[i1:i2].shape[0]) * ssf_th_slice, c='r')
def get_mae_dist(ibis):
"""make triangular wave between beats, representing absolute beat placement error."""
@@ -128,6 +146,11 @@ def get_mae_err(fs, freq, phase, act_ibis, debug=False):
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
# TODO: may need to weight these two differently
# TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24]
# TODO: (check: is match always slightly to the left of the trough / smooth minimum?)
# TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??)
# (or even penalize differently instead of adding dir1 and dir2)
return mae
class RegularBeatFinder:
@@ -139,14 +162,27 @@ class RegularBeatFinder:
num_freqs = 200 #: number of freq steps to evaluate
range_f1 = 0.5 #: lowest detection frequency in Hz
range_f2 = 4.0 #: highest detection frequency in Hz
f_bias_width = 0.2 #: gaussian std relative to num_freqs within f1..f2
def __init__(self): pass
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None):
def find_beat(self, fs, ssf_zxings, f_hint=None, debug_fe=False, debug_i=None):
"""Find the optimal beat frequency."""
act_ibis = np.diff(ssf_zxings)
# nice-to: may be interesting to also use as score: the ssf amplitude info at the beats to which we aligned
# evaluate mean absolute errors for all frequencies
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
# bias with f_hint - once we know the beat freq, make it more likely for it to be found everywhere
if f_hint is not None:
nf, f1, f2 = RegularBeatFinder.num_freqs, RegularBeatFinder.range_f1, RegularBeatFinder.range_f2
bias = gauss(
nf,
(f_hint - f1) / (f2 - f1) * nf,
RegularBeatFinder.f_bias_width * nf
)
freqs_bias = 1.0 / (np.max(bias)+bias) # make 'f_hint' at most 2x more likely -- (1+bias) if normalized
freq_errs *= freqs_bias
#
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(freqs, freq_errs)

104
docs/playlists.md Normal file
View File

@@ -0,0 +1,104 @@
# Playlist endpoints
All routes require an authenticated session (`spotify_user_id` after Spotify login). Responses are JSON (`application/json`).
---
## `GET /playlists`
Returns every playlist for the current user by following Spotifys paginated [`GET /v1/me/playlists`](https://developer.spotify.com/documentation/web-api/reference/get-a-list-of-current-users-playlists) until all pages are loaded.
### Success (200)
| Field | Type | Description |
| --- | --- | --- |
| `ok` | `boolean` | Always `true` on success. |
| `total` | `number` | Count of playlists in `items`. |
| `items` | `array` | Each element is a **simplified playlist object** from Spotify |
**Typical fields on each element of `items`** (Spotify `SimplifiedPlaylistObject`):
| Field | Type |
| --- | --- |
| `description` | `string` \| `null` |
| `id` | `string` |
| `images` | `array` of `{ "url": string, "height": number \| null, "width": number \| null }` |
| `name` | `string` |
| `primary_color` | `string` \| `null` |
| `snapshot_id` | `string` |
| `tracks` | `object` — e.g. `{ "href": string, "total": number }` (track list stub, not full tracks) |
### Errors
`{ "ok": false, "error": string, ... }`
---
## `GET /playlists/<playlist_id>`
`<playlist_id>` is the Spotify playlist ID (the same id as in playlist URLs / `items[].id`).
Fetches [`GET /v1/playlists/{playlist_id}`](https://developer.spotify.com/documentation/web-api/reference/get-playlist) following pagination.
### Success (200)
| Field | Type | Description |
| --- | --- | --- |
| `ok` | `boolean` | Always `true` on success. |
| `playlist` | `object` | **Full playlist object** from Spotify, with `tracks` possibly expanded to every track as described above. |
**Typical fields on `playlist`** (Spotify `PlaylistObject`):
| Field | Type |
| --- | --- |
| `description` | `string` \| `null` |
| `id` | `string` |
| `images` | `array` (image objects, as above) |
| `name` | `string` |
| `primary_color` | `string` \| `null` |
| `snapshot_id` | `string` |
| `tracks` | `{"items": [Track, ...], ...}` |
**Typical fields on each element of `playlist.tracks.items`** (Spotify playlist track wrapper):
| Field | Type |
| --- | --- |
| `track` | `object` \| `null` — full or linked track; `null` if removed |
Nested objects use Spotifys **Track**, **Artist** (simplified), and **Album** (simplified) shapes below (field availability can vary by market or API version; see Spotifys reference).
#### `track` — Spotify `TrackObject`
Returned as the non-`null` value of `playlist.tracks.items[].track` (playlist context usually includes a **full** track with **simplified** `album` and `artists` entries).
| Field | Type |
| --- | --- |
| `album` | `object`**SimplifiedAlbumObject** (see below) |
| `artists` | `array` of **SimplifiedArtistObject** (see below) |
| `duration_ms` | `number` |
| `id` | `string` |
| `name` | `string` |
#### `track.artists[]` — Spotify **SimplifiedArtistObject**
| Field | Type |
| --- | --- |
| `id` | `string` |
| `name` | `string` |
#### `track.album` — Spotify **SimplifiedAlbumObject**
| Field | Type |
| --- | --- |
| `artists` | `array` of **SimplifiedArtistObject** (album-level credits) |
| `id` | `string` |
| `images` | `array` of `{ "url": string, "height": number \| null, "width": number \| null }` |
| `name` | `string` |
### Errors
Same as `/playlists`: **401** when not logged in; otherwise Spotify errors and network errors per the global error handlers.
---
For authoritative field lists and edge cases, see [Spotify Web API reference](https://developer.spotify.com/documentation/web-api).

View File

@@ -13,6 +13,7 @@
import numpy as np
from numpy.fft import fft
import matplotlib.pyplot as plt # for debug only
from hsh_signal.signal import lowpass_fft
import time
@@ -81,7 +82,30 @@ def gabor_wavelet(omega, nu, fs, T, tt=None):
psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega)
return psi
class BassAnalyzer:
class Analyzer:
def __init__(self): pass
def debug_plot(self, i1, i2):
Scp2, path = self.Scp2, self.path
fs, Dp = self.fs, self.Dp
ss, omega, nu, fsp, Wp, I, J, freqs = self.pms
Scp2_slice = np.abs(Scp2[i1:i2])
plt.figure(figsize=(8,2))
plt.imshow(Scp2_slice.T, origin='lower')
x_positions = np.arange(Scp2_slice.shape[0]//250+1)*250
if x_positions[-1] == Scp2_slice.shape[0]:
x_positions[-1] -= 1 # so last tick is shown properly
t1 = i1 / (fs / Dp)
x_labels = ['{:.1f}'.format(t1+x*Dp/fs) for x in x_positions]
plt.xticks(x_positions, x_labels)
y_positions = np.arange(Scp2_slice.shape[1]//50)*50
y_labels = ['{:.1f}'.format((nu/(omega*ss[y]))) for y in y_positions] # Hz equivalents of wavelet scale
plt.yticks(y_positions, y_labels)
plt.plot(np.arange(Scp2_slice.shape[0]), path[i1:i2], c='r')
class BassAnalyzer(Analyzer):
"""
Rhythm analysis from songs.
Provides a beat amplitude signal from the audio signal.
@@ -107,18 +131,21 @@ class BassAnalyzer:
Wp_force = None
I_force = None
def __init__(self, fs, sig, Wp_force=None):
def __init__(self, fs, sig, Wp_force=None, I_force=None):
"""
:param fs: sampling rate
:param sig: audio signal normalized to [-1,1]
"""
super(BassAnalyzer, self).__init__()
self.D = int(self.shift_sec * fs) #: spectrogram step
if self.Wp_force:
self.Wp = self.Wp_force
elif Wp_force:
if Wp_force:
self.Wp = Wp_force
elif self.Wp_force:
self.Wp = self.Wp_force
else:
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
if I_force:
self.I_force = I_force
self.U = self.Wp // self.W # ratio
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
@@ -151,6 +178,11 @@ class BassAnalyzer:
t8 = time.time()
ampl = self._viterbi_ampl(Scp2, path)
t9 = time.time()
self.Scp2 = Scp2
self.path = path
self.pms = pms
if not dbg_time:
return ampl
else:
@@ -192,6 +224,11 @@ class BassAnalyzer:
pt_re = (np.diff(pt) == 1).astype(int) # rising edge
self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal
# clip B, to force **reasonable** frequency range for wavelets
# (noise will otherwise cause many transitions -> high B -> bass falls below freq range -> algo fail)
B_min, B_max = 0.5 * M / (fs / self.D), 5.0 * M / (fs / self.D)
self.B = np.clip(self.B, a_min=B_min, a_max=B_max)
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int)
ptr = pt[squashed_idxs]

View File

@@ -9,6 +9,8 @@ def median_filter(a, w):
o[i] = np.median(sl)
return o
# nice-to: split longer segments (above 30 sec), merge very-short segments
class Segmenter:
seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec
seg_win_step_sec = 1.0 #: step for segmentation, in sec
@@ -17,14 +19,19 @@ class Segmenter:
def __init__(self): pass
def get_segments(self, fs, guitar):
i_stxs = self.get_segment_boundaries(fs, guitar)
i_stxs = np.pad(i_stxs, (1, 0))
return i_stxs
def get_segment_boundaries(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments."""
segment_ids = self.get_segments(fs, guitar)
segment_ids = self._get_segments(fs, guitar)
stxs = np.diff(segment_ids) != 0
i_stxs = np.where(stxs)[0]
return i_stxs
def get_segments(self, fs, guitar):
def _get_segments(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments."""
seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec)
seg_guitar_data = self._sig_stochastics(fs, guitar)

179
song.py Normal file
View File

@@ -0,0 +1,179 @@
import numpy as np
from rhythm import BassAnalyzer, GuitarAnalyzer
from segmenter import Segmenter
from beat import SsfZxing, RegularBeatFinder
from sqi import gauss, shift
class SongBeatDetector:
SEGMENT_SLICE_LEN_SEC = 8.0 #: slice length for processing (long enough to contain bar structure; short enough for a constant freq. beat placement)
SSF_REL_THRES = 1.5 #: optimize for slope of error (mae) function over beat frequency
NE_THRES = 30.0 #: normalized error threshold for 'good' slices
def __init__(self): pass
def detect(self, fs, sig, use_f_hint=True, debug_fe_idx=None):
self.fs = fs
#self.sig = sig
self.ba = BassAnalyzer(fs, sig)
self.bass, times = self.ba.viterbi_wavelet_scalogram_amplitudes(dbg_time=True)
# times: durations of different stages
self.ga = GuitarAnalyzer(fs, sig)
self.guitar = self.ga.spectrogram_power_amplitudes()
fsd = fs / self.ga.D # <- guitar ('ga')
self.D = self.ga.D # <- guitar ('ga')
# self.bass, self.guitar: functions on windowed spectrum 0.008 sec apart (125 Hz)
self.sg = Segmenter()
self.i_seg = self.sg.get_segments(fsd, self.guitar) # <- guitar
self.t_seg = self.i_seg / fsd
self.fsd = fsd # reciprocal window step size
# we segment on 'guitar' info, but process 'bass' later
if use_f_hint:
# initial estimate (without 'f_hint')
zds_initial = self._estimate_segments(debug_fe_idx=None)
self.zds_initial = zds_initial
ifbs_good = np.array([zdd['ne'] < SongBeatDetector.NE_THRES for zdd in zds_initial])
fbs = np.array([zdd['fb'] for zdd in zds_initial])[np.where(ifbs_good)[0]]
bins, hfreq = np.histogram(fbs)
ih = np.argmax(bins)
self.f_hint = np.mean((hfreq[ih], hfreq[ih+1])) # center freq of bin
else:
self.f_hint = None
# actual estimate (using 'f_hint' to bias each segment)
self.zds = self._estimate_segments(f_hint=self.f_hint, debug_fe_idx=debug_fe_idx)
return self.zds
def _estimate_segments(self, f_hint=None, debug_fe_idx=None):
zds = []
fsd = self.fsd
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
# for each segment
for i in np.arange(self.i_seg.shape[0]-1):
i1, i2 = self.i_seg[i], self.i_seg[i+1]
t1, t2 = i1 / fsd, i2 / fsd
# split segment into slices
if i2-i1 < seg_sl: continue
num_sl = (i2-i1) // seg_sl
for m in np.arange(num_sl):
j1, j2 = i1+m*seg_sl, i1+(m+1)*seg_sl
sig_slice = self.bass[slice(j1, j2)] # <- bass
if debug_fe_idx is not None:
# there will be many (upto 50) different slices - do not debug-plot them all
debug_fe_sidx = debug_fe_idx / fs * fsd
debug_fe = i1 <= debug_fe_sidx < i2
else:
debug_fe = False
zdd = self._process_slice(j1, j2, m, sig_slice, f_hint=f_hint, debug_fe=debug_fe)
zds.append(zdd)
return zds
def _process_slice(self, j1, j2, m, sig_slice, f_hint=None, debug_fe=False):
"""
:param j1: lower index into 'sig_slice'
:param j2: upper index into 'sig_slice'
:param m: slice number (used to check if debugging)
:param debug_fe: show plots for SSF and raw/reg beat placement
"""
# TODO: C++ impl of SsfZxing._ssf_det_zxings() has diverged.
# - refractory period changes
# - ssf_th filter with 6-points
# - ?? others ??
# NOTE: SsfZxing here is always getting short 8-sec slices only (nb. for 'ssf_th' comput.)
fsd = self.fsd # reciprocal window step size
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
SsfZxing.ssf_rel_thres = SongBeatDetector.SSF_REL_THRES
zd = SsfZxing()
ssf, ssf_th = zd._ssf_function(fsd, sig_slice)
ssf_zxings = zd._ssf_det_zxings(fsd, ssf, ssf_th)
zdd = {
'i1': j1 * self.D, 'i2': j2 * self.D,
# ssf_zxings: raw beats (relative to slice)
'zd': zd, 'ssf': ssf, 'ssf_zxings': ssf_zxings,
'sig_slice': sig_slice, 'sig_source': 'bass',
'ssf_th': np.ones(ssf.shape[0]) * ssf_th
}
# (only plot first slice of a wider segment)
#if num_sl > 2 and m == 0:
if debug_fe:
#
# scalogram image, with viterbi path
self.ba.debug_plot(j1, j2) # TODO: adapt 'bass'
plt.title(f'scalogram & viterbi path, slice [{j1}:{j2}]')
# SSF function and detected raw beats
zd.debug_plot(0, seg_sl)
plt.title(f'raw beats, slice [{j1}:{j2}]')
# nice-to: optimize phase, (maybe iteratively, optimize phase and freq each)
bf = RegularBeatFinder()
fb, ne = bf.find_beat(fsd, ssf_zxings, f_hint=f_hint, debug_fe=debug_fe, debug_i=None)
if debug_fe: plt.title(f'regular-beat placement error (mae), slice [{j1}:{j2}]')
# mae is unnurmalized here, as returned from RegularBeatFinder._get_opt_ibi_freq_2()
zdd.update({
# bf: beat finder
# fb: beat frequency, in Hz
# ne: normalized mae error
'bf': bf, 'fb': fb, 'ne': ne
})
# TODO: ne > 30 is suspiciously bad - filter those "detections" out eventually
# TODO: # catch basic errors: ne == 0, or len(est_zxings) == 0, means slice is bad
# NOTE: since 2x the zero-crossings, we get twice the frequency here.
# NOTE: this means 0.5 lower freq bound of RegularBeatFinder will find at most 60 bpm in the song.
# TODO: RegularBeatFinder currently not using 'phase' info, but should be optimized
# TODO: (currently we start the pattern at the first detected beat, may or may not be good)
est_zxings = np.cumsum(np.pad(bf.freq_to_est_ibis(fsd, fb, j2-j1), (1,0))) # rel. to slice
if ssf_zxings.shape[0] > 0:
est_zxings += ssf_zxings[0] # add phase = currently we just start at first detected beat
# nice-to: median-filter the freq, etc.pp.
# nice-to: avoid adding len(est_zxings)=0 entries later
# trim back to max. index
est_zxings = est_zxings[np.where(est_zxings < ssf.shape[0])[0]]
zdd.update({
# est_zxings: regular beats (relative to slice)
'est_zxings': est_zxings
})
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(ssf)
plt.plot(np.arange(ssf.shape[0]), np.ones(ssf.shape[0]) * ssf_th); None
plt.scatter(ssf_zxings, np.ones(ssf_zxings.shape[0]) * ssf_th, c='r')
plt.scatter(est_zxings, np.ones(est_zxings.shape[0]) * ssf_th, c='g')
plt.title(f'ssf, ssf_th, raw beats (r), reg beats (g), slice [{j1}:{j2}]')
return zdd
# _debug_fmt_est_zxings
def _place_fmt_zxings(self, fsd, ssf, ssf_zxings):
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
#gauss_amplitude = 2.0
#def get_snr(self, fsd, ssf, ssf_threshold, ssf_zxings):
# """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
sigma = fsd * gauss_beat_template_sigma_sec
W = int(fsd * gauss_beat_template_win_sec)
gb = gauss(W, W//2, sigma)
# place gaussians on estimated beat locations
ssf_est = np.zeros(ssf.shape[0])
for i in ssf_zxings:
ssf_est += shift(ssf.shape[0], i, gb)
ssf_est /= gb[W//2] # normalize amplitude to 1.0
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
return ssf_est

6
sqi.py
View File

@@ -28,14 +28,14 @@ class SigQuality:
def __init__(self): pass
def get_snr(self, fs, ssf, ssf_threshold, ssf_zxings):
def get_snr(self, fs, ssf, ssf_threshold, est_zxings):
"""Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
sigma = fs * self.gauss_beat_template_sigma_sec
W = int(fs * self.gauss_beat_template_win_sec)
gb = gauss(W, W//2, sigma)
# place gaussians on estimated beat locations
ssf_est = np.zeros(ssf.shape[0])
for i in ssf_zxings:
for i in est_zxings:
ssf_est += shift(ssf.shape[0], i, gb)
ssf_est /= gb[W//2] # normalize amplitude to 1.0
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
@@ -49,7 +49,7 @@ class SigQuality:
sqi_noise = np.sum(sqi_pen * (ssf**2))
# noise is everywhere, while signal is only around detected peaks - correct for this.
goal_density = np.mean(np.clip(2*sigma / np.diff(ssf_zxings), a_min=0, a_max=1))
goal_density = np.mean(np.clip(2*sigma / np.diff(est_zxings), a_min=0, a_max=1))
sqi_goal /= goal_density
sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise))