Compare commits

...

12 Commits

7 changed files with 714 additions and 44 deletions

11
TODO.md Normal file
View File

@@ -0,0 +1,11 @@
# TODO
- tests: beat and guitar synthesizer
- generate rhythmic sequence and test the algos
.
O> "2027-04-29 TestApi Bass song.ipynb": [21]
- why is ssf continually rising?
- because of using 'fs' not 'fsd'
.

424
api.py
View File

@@ -17,20 +17,24 @@
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done # $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
import os import os
import re
import sqlite3 import sqlite3
from base64 import b64encode from base64 import b64encode
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import json import json
from functools import wraps
from urllib.parse import urlencode
from flask import Flask, request, session, jsonify, make_response from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for
from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.middleware.proxy_fix import ProxyFix
from authomatic import Authomatic from authomatic import Authomatic
from authomatic.adapters import WerkzeugAdapter from authomatic.adapters import WerkzeugAdapter
from authomatic.providers import oauth2 from authomatic.providers import oauth2
from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP
import random
import time
import requests import requests
@@ -95,14 +99,24 @@ PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify]
SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"] SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"]
SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"] SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"]
# Must exactly match a Redirect URI configured in your Spotify app settings. # OAuth redirect registered in the Spotify Developer Dashboard (Authorization Code
# flow). Spotify sends the browser here with ?code=&state= — not to the Android app scheme.
REDIRECT_URI = os.environ.get( REDIRECT_URI = os.environ.get(
"SPOTIFY_REDIRECT_URI", "SPOTIFY_REDIRECT_URI",
#"https://api.lockstep.at/spotify/callback" #"https://api.lockstep.at/spotify/callback"
"https://api.lockstep.at/login/spotify/" "https://api.lockstep.at/login/spotify/"
) )
# After server-side token exchange, the browser may be redirected to this URL with
# tokens in the query string (mobile / Custom Tab). This is NOT registered with Spotify;
# only REDIRECT_URI above goes in the dashboard for this flow.
ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get(
"SPOTIFY_APP_POST_LOGIN_REDIRECT_URI",
"at.lockstep.player://spotify/callback",
)
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db") DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections")
app = Flask(__name__) app = Flask(__name__)
app.secret_key = os.environ["FLASK_SECRET_KEY"] app.secret_key = os.environ["FLASK_SECRET_KEY"]
@@ -145,6 +159,17 @@ def init_db():
updated_at TEXT NOT NULL updated_at TEXT NOT NULL
) )
""") """)
conn.execute("""
CREATE TABLE IF NOT EXISTS uploaded_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
spotify_user_id TEXT NOT NULL,
track_id TEXT NOT NULL,
type TEXT NOT NULL,
version INTEGER NOT NULL,
file_name TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit() conn.commit()
conn.close() conn.close()
@@ -214,25 +239,41 @@ def spotify_basic_auth_header() -> str:
def refresh_spotify_token(refresh_token: str) -> dict: def refresh_spotify_token(refresh_token: str) -> dict:
body = urlencode({ r = requests.post(
"https://accounts.spotify.com/api/token",
data={
"grant_type": "refresh_token", "grant_type": "refresh_token",
"refresh_token": refresh_token, "refresh_token": refresh_token,
}).encode("utf-8") },
req = Request(
"https://accounts.spotify.com/api/token",
data=body,
method="POST",
headers={ headers={
"Authorization": spotify_basic_auth_header(), "Authorization": spotify_basic_auth_header(),
"Content-Type": "application/x-www-form-urlencoded", "Content-Type": "application/x-www-form-urlencoded",
}, },
) )
with urlopen(req) as resp: if not r.ok:
payload = json.loads(resp.read().decode("utf-8")) # The token endpoint returns errors as flat
# {"error": "<code>", "error_description": "<msg>"}, which differs from
# the Web API's {"error": {"status": ..., "message": ...}} shape. Rewrite
# the body so our error handler surfaces a consistent envelope to
# clients regardless of which Spotify endpoint failed.
try:
body = r.json()
except ValueError:
body = {}
message = (
body.get("error_description")
or body.get("error")
or r.reason
or "Token refresh failed"
)
r._content = json.dumps({
"error": {"status": r.status_code, "message": message}
}).encode("utf-8")
r.headers["Content-Type"] = "application/json"
return payload r.raise_for_status()
return r.json()
def get_valid_access_token(spotify_user_id: str) -> str: def get_valid_access_token(spotify_user_id: str) -> str:
@@ -266,17 +307,121 @@ def get_valid_access_token(spotify_user_id: str) -> str:
# ---------------------------- # ----------------------------
# Simple Spotify API call # Spotify Web API helpers
# ---------------------------- # ----------------------------
def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict:
"""
GET a Spotify Web API endpoint and return the parsed JSON body.
Retries on HTTP 429 and 503 with exponential backoff and optional Retry-After,
so brief Spotify rate limits often clear before we surface an error to the app.
"""
headers = {"Authorization": f"Bearer {access_token}"}
backoff_sec = 1.0
max_attempts = 8
params_eff = params
for attempt in range(max_attempts):
r = requests.get(url, headers=headers, params=params_eff)
if r.status_code in (429, 503) and attempt < max_attempts - 1:
wait = backoff_sec
ra = r.headers.get("Retry-After")
if ra:
try:
wait = max(wait, float(ra))
except ValueError:
pass
wait = min(wait, 120.0)
time.sleep(wait + random.random() * 0.25 * wait)
backoff_sec = min(backoff_sec * 2.0, 60.0)
continue
r.raise_for_status()
return r.json()
def spotify_get_paginated(
url: str,
access_token: str,
limit: int = 50,
max_items: int | None = None,
) -> list:
"""
Fetch all items from a paginated Spotify Web API endpoint.
Spotify paging objects return up to `limit` items per page (50 max for most
endpoints) and a `next` URL. We follow `next` until it is null, collecting
items along the way.
"""
items: list = []
params: dict | None = {"limit": limit, "offset": 0}
next_url: str | None = url
while next_url is not None:
page = spotify_get(next_url, access_token, params=params)
items.extend(page.get("items", []))
if max_items is not None and len(items) >= max_items:
return items[:max_items]
# `next` is a fully-qualified URL with limit/offset already encoded,
# so we must not pass `params` again after the first request.
next_url = page.get("next")
params = None
return items
def spotify_get_me(access_token: str) -> dict: def spotify_get_me(access_token: str) -> dict:
req = Request( return spotify_get("https://api.spotify.com/v1/me", access_token)
"https://api.spotify.com/v1/me",
headers={"Authorization": f"Bearer {access_token}"},
method="GET", # ----------------------------
# Error handling
# ----------------------------
@app.errorhandler(requests.HTTPError)
def handle_spotify_http_error(e: requests.HTTPError):
"""
Translate a non-2xx upstream response from Spotify into a JSON envelope,
passing through the upstream HTTP status code.
Spotify error bodies look like {"error": {"status": ..., "message": ...}}
when JSON is returned; we surface that message when available.
"""
resp = e.response
status = resp.status_code if resp is not None else 502
spotify_error = None
error_message = str(e)
if resp is not None:
try:
spotify_error = resp.json()
except ValueError:
spotify_error = None
if (
isinstance(spotify_error, dict)
and isinstance(spotify_error.get("error"), dict)
):
error_message = (
spotify_error["error"].get("message") or error_message
) )
with urlopen(req) as resp:
return json.loads(resp.read().decode("utf-8")) return jsonify({
"ok": False,
"error": error_message,
"spotify": spotify_error,
}), status
@app.errorhandler(requests.RequestException)
def handle_spotify_request_error(e: requests.RequestException):
"""Network-level failures (DNS, connection, timeout) have no upstream status."""
return jsonify({
"ok": False,
"error": f"Upstream request failed: {e}",
}), 502
# ---------------------------- # ----------------------------
@@ -294,6 +439,28 @@ def index():
@app.route("/login/<provider_name>/", methods=["GET", "POST"]) @app.route("/login/<provider_name>/", methods=["GET", "POST"])
def login(provider_name): def login(provider_name):
# Authomatic 1.3.0 (oauth2.login) only runs "phase 1" — redirect to Spotify —
# when there are no query parameters, or only ``user_state``:
# elif (not self.params or (len(self.params) == 1 and 'user_state' in self.params))
# A mobile client opening ``/login/spotify/?redirect_uri=...`` therefore matches
# no branch; login() returns without calling redirect() → empty body and HTTP 200
# (white page). Stash the app callback in the session and reload without query args.
if (
provider_name == "spotify"
and request.args.get("redirect_uri")
and "code" not in request.args
and "error" not in request.args
):
requested = request.args.get("redirect_uri", "")
if requested != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
return jsonify({
"ok": False,
"error": "redirect_uri not allowed for this client",
}), 400
session["spotify_oauth_app_redirect_uri"] = requested
session.modified = True
return redirect(url_for("login", provider_name=provider_name), code=302)
response = make_response() response = make_response()
# Let Authomatic handle the OAuth2 handshake. # Let Authomatic handle the OAuth2 handshake.
@@ -301,7 +468,7 @@ def login(provider_name):
WerkzeugAdapter(request, response), WerkzeugAdapter(request, response),
provider_name, provider_name,
session=session, session=session,
session_saver=lambda: session.modified session_saver=lambda: setattr(session, "modified", True),
) )
# If result is None, Authomatic is still redirecting/processing. # If result is None, Authomatic is still redirecting/processing.
@@ -372,6 +539,27 @@ def login(provider_name):
# keep the Spotify user id in session # keep the Spotify user id in session
session["spotify_user_id"] = result.user.id session["spotify_user_id"] = result.user.id
app_redirect = session.pop("spotify_oauth_app_redirect_uri", None)
if app_redirect:
if app_redirect != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
return jsonify({
"ok": False,
"error": "Stored app redirect_uri does not match allowlist",
}), 400
sep = "&" if ("?" in app_redirect) else "?"
target = (
f"{app_redirect}{sep}"
+ urlencode(
{
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": str(expires_in),
"token_type": token_type or "",
},
)
)
return redirect(target, code=302)
return jsonify({ return jsonify({
"ok": True, "ok": True,
"spotify_user_id": result.user.id, "spotify_user_id": result.user.id,
@@ -455,44 +643,210 @@ old example 1:
} }
""" """
@app.route("/me")
def me(): def spotify_access_token_from_authorization_header():
auth = request.headers.get("Authorization", "") or ""
if not auth.startswith("Bearer "):
return None
token = auth[7:].strip()
return token or None
def get_request_spotify_access_token():
"""
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
Fallback to Flask session + stored refresh flow (browser).
"""
bearer = spotify_access_token_from_authorization_header()
if bearer:
return bearer
spotify_user_id = session.get("spotify_user_id") spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id: if not spotify_user_id:
return jsonify({"ok": False, "error": "Not logged in"}), 401 return None
return get_valid_access_token(spotify_user_id)
access_token = get_valid_access_token(spotify_user_id)
def require_auth(f):
@wraps(f)
def wrapped(*args, **kwargs):
token = get_request_spotify_access_token()
if not token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
g.spotify_access_token = token
return f(*args, **kwargs)
return wrapped
@app.route("/me")
@require_auth
def me():
access_token = g.spotify_access_token
profile = spotify_get_me(access_token) profile = spotify_get_me(access_token)
row = get_token_record(spotify_user_id) row = get_token_record(profile["id"])
return jsonify({ return jsonify({
"ok": True, "ok": True,
"profile": profile, "profile": profile,
"stored_expires_at": row["expires_at"], "stored_expires_at": row["expires_at"] if row else None,
}) })
@app.route("/playlists") @app.route("/playlists")
@require_auth
def playlists(): def playlists():
spotify_user_id = "cidermole" access_token = g.spotify_access_token
access_token = get_valid_access_token(spotify_user_id)
""" """
user_id = "Sara" user_id = "Sara"
url = f"https://api.spotify.com/v1/users/{user_id}/playlists" url = f"https://api.spotify.com/v1/users/{user_id}/playlists"
# -> 403 Forbidden # -> 403 Forbidden
""" """
url = f"https://api.spotify.com/v1/me/playlists" items = spotify_get_paginated(
headers={"Authorization": f"Bearer {access_token}"} "https://api.spotify.com/v1/me/playlists",
r = requests.get(url, headers=headers) access_token,
# TODO: pagination (limit,offset) )
return jsonify({ return jsonify({
"ok": True, "ok": True,
"response": r.json() "total": len(items),
"items": items,
}) })
@app.route("/playlists/<playlist_id>")
@require_auth
def playlist(playlist_id):
access_token = g.spotify_access_token
playlist_data = spotify_get(
f"https://api.spotify.com/v1/playlists/{playlist_id}",
access_token,
)
# Full playlist objects use a paging object at `items` (current Spotify shape)
# or legacy `tracks`. Follow `next` on whichever is present.
paging_key = (
"items"
if isinstance(playlist_data.get("items"), dict)
else "tracks"
)
paging = playlist_data.get(paging_key) or {}
if paging.get("next"):
all_items = spotify_get_paginated(
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks",
access_token,
limit=100,
)
playlist_data[paging_key] = {
**paging,
"items": all_items,
"offset": 0,
"limit": len(all_items),
"next": None,
"previous": None,
}
return jsonify({
"ok": True,
"playlist": playlist_data,
})
@app.route("/metadata", methods=["GET"])
@require_auth
def get_metadata():
track_id = request.args.get("trackId")
meta_type = request.args.get("type", "beats")
if not track_id:
return jsonify({"ok": False, "error": "Missing trackId"}), 400
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
conn = db()
row = conn.execute("""
SELECT file_name FROM uploaded_metadata
WHERE spotify_user_id = ? AND track_id = ? AND type = ?
ORDER BY created_at DESC
LIMIT 1
""", (spotify_user_id, track_id, meta_type)).fetchone()
conn.close()
if not row:
return jsonify({"ok": False, "error": "Not found"}), 404
file_path = os.path.join(METADATA_UPLOAD_DIR, row["file_name"])
if not os.path.isfile(file_path):
return jsonify({"ok": False, "error": "Not found"}), 404
with open(file_path, "r", encoding="utf-8") as f:
collection = json.load(f)
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "Invalid metadata file"}), 500
return jsonify({"ok": True, "collection": collection})
@app.route("/metadata", methods=["POST"])
@require_auth
def upload_metadata():
access_token = g.spotify_access_token
if not request.is_json:
return jsonify({"ok": False, "error": "Expected application/json"}), 400
body = request.get_json(silent=True)
if not isinstance(body, dict):
return jsonify({"ok": False, "error": "Invalid JSON body"}), 400
track_id = body.get("trackId")
meta_type = body.get("type")
version = body.get("version")
collection = body.get("collection")
if not track_id or not meta_type or version is None or collection is None:
return jsonify({
"ok": False,
"error": "Missing trackId, type, version, or collection",
}), 400
try:
version = int(version)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "version must be an integer"}), 400
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120]
file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json"
file_path = os.path.join(METADATA_UPLOAD_DIR, file_name)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(collection, f, ensure_ascii=False)
now = datetime.now(timezone.utc).isoformat()
conn = db()
conn.execute("""
INSERT INTO uploaded_metadata (
spotify_user_id, track_id, type, version, file_name, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (spotify_user_id, track_id, meta_type, version, file_name, now))
conn.commit()
conn.close()
return jsonify({"ok": True, "file_name": file_name}), 201
if __name__ == "__main__": if __name__ == "__main__":
init_db() init_db()
app.run(host="127.0.0.1", port=8000, debug=True) app.run(host="127.0.0.1", port=8000, debug=True)

17
beat.py
View File

@@ -1,6 +1,8 @@
import numpy as np import numpy as np
import matplotlib.pyplot as plt # for debug only import matplotlib.pyplot as plt # for debug only
from sqi import gauss
# note: may be called ZxingDetector instead?
class SsfZxing: class SsfZxing:
""" """
Find beats in a Sum Slope Function by detecting threshold crossings. Find beats in a Sum Slope Function by detecting threshold crossings.
@@ -160,14 +162,27 @@ class RegularBeatFinder:
num_freqs = 200 #: number of freq steps to evaluate num_freqs = 200 #: number of freq steps to evaluate
range_f1 = 0.5 #: lowest detection frequency in Hz range_f1 = 0.5 #: lowest detection frequency in Hz
range_f2 = 4.0 #: highest detection frequency in Hz range_f2 = 4.0 #: highest detection frequency in Hz
f_bias_width = 0.2 #: gaussian std relative to num_freqs within f1..f2
def __init__(self): pass def __init__(self): pass
def find_beat(self, fs, ssf_zxings, debug_fe=False, debug_i=None): def find_beat(self, fs, ssf_zxings, f_hint=None, debug_fe=False, debug_i=None):
"""Find the optimal beat frequency.""" """Find the optimal beat frequency."""
act_ibis = np.diff(ssf_zxings) act_ibis = np.diff(ssf_zxings)
# nice-to: may be interesting to also use as score: the ssf amplitude info at the beats to which we aligned
# evaluate mean absolute errors for all frequencies # evaluate mean absolute errors for all frequencies
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i) freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
# bias with f_hint - once we know the beat freq, make it more likely for it to be found everywhere
if f_hint is not None:
nf, f1, f2 = RegularBeatFinder.num_freqs, RegularBeatFinder.range_f1, RegularBeatFinder.range_f2
bias = gauss(
nf,
(f_hint - f1) / (f2 - f1) * nf,
RegularBeatFinder.f_bias_width * nf
)
freqs_bias = 1.0 / (np.max(bias)+bias) # make 'f_hint' at most 2x more likely -- (1+bias) if normalized
freq_errs *= freqs_bias
#
if debug_fe: if debug_fe:
plt.figure(figsize=(8,2)) plt.figure(figsize=(8,2))
plt.plot(freqs, freq_errs) plt.plot(freqs, freq_errs)

104
docs/playlists.md Normal file
View File

@@ -0,0 +1,104 @@
# Playlist endpoints
All routes require an authenticated session (`spotify_user_id` after Spotify login). Responses are JSON (`application/json`).
---
## `GET /playlists`
Returns every playlist for the current user by following Spotifys paginated [`GET /v1/me/playlists`](https://developer.spotify.com/documentation/web-api/reference/get-a-list-of-current-users-playlists) until all pages are loaded.
### Success (200)
| Field | Type | Description |
| --- | --- | --- |
| `ok` | `boolean` | Always `true` on success. |
| `total` | `number` | Count of playlists in `items`. |
| `items` | `array` | Each element is a **simplified playlist object** from Spotify |
**Typical fields on each element of `items`** (Spotify `SimplifiedPlaylistObject`):
| Field | Type |
| --- | --- |
| `description` | `string` \| `null` |
| `id` | `string` |
| `images` | `array` of `{ "url": string, "height": number \| null, "width": number \| null }` |
| `name` | `string` |
| `primary_color` | `string` \| `null` |
| `snapshot_id` | `string` |
| `tracks` | `object` — e.g. `{ "href": string, "total": number }` (track list stub, not full tracks) |
### Errors
`{ "ok": false, "error": string, ... }`
---
## `GET /playlists/<playlist_id>`
`<playlist_id>` is the Spotify playlist ID (the same id as in playlist URLs / `items[].id`).
Fetches [`GET /v1/playlists/{playlist_id}`](https://developer.spotify.com/documentation/web-api/reference/get-playlist) following pagination.
### Success (200)
| Field | Type | Description |
| --- | --- | --- |
| `ok` | `boolean` | Always `true` on success. |
| `playlist` | `object` | **Full playlist object** from Spotify, with `tracks` possibly expanded to every track as described above. |
**Typical fields on `playlist`** (Spotify `PlaylistObject`):
| Field | Type |
| --- | --- |
| `description` | `string` \| `null` |
| `id` | `string` |
| `images` | `array` (image objects, as above) |
| `name` | `string` |
| `primary_color` | `string` \| `null` |
| `snapshot_id` | `string` |
| `tracks` | `{"items": [Track, ...], ...}` |
**Typical fields on each element of `playlist.tracks.items`** (Spotify playlist track wrapper):
| Field | Type |
| --- | --- |
| `track` | `object` \| `null` — full or linked track; `null` if removed |
Nested objects use Spotifys **Track**, **Artist** (simplified), and **Album** (simplified) shapes below (field availability can vary by market or API version; see Spotifys reference).
#### `track` — Spotify `TrackObject`
Returned as the non-`null` value of `playlist.tracks.items[].track` (playlist context usually includes a **full** track with **simplified** `album` and `artists` entries).
| Field | Type |
| --- | --- |
| `album` | `object`**SimplifiedAlbumObject** (see below) |
| `artists` | `array` of **SimplifiedArtistObject** (see below) |
| `duration_ms` | `number` |
| `id` | `string` |
| `name` | `string` |
#### `track.artists[]` — Spotify **SimplifiedArtistObject**
| Field | Type |
| --- | --- |
| `id` | `string` |
| `name` | `string` |
#### `track.album` — Spotify **SimplifiedAlbumObject**
| Field | Type |
| --- | --- |
| `artists` | `array` of **SimplifiedArtistObject** (album-level credits) |
| `id` | `string` |
| `images` | `array` of `{ "url": string, "height": number \| null, "width": number \| null }` |
| `name` | `string` |
### Errors
Same as `/playlists`: **401** when not logged in; otherwise Spotify errors and network errors per the global error handlers.
---
For authoritative field lists and edge cases, see [Spotify Web API reference](https://developer.spotify.com/documentation/web-api).

View File

@@ -131,19 +131,21 @@ class BassAnalyzer(Analyzer):
Wp_force = None Wp_force = None
I_force = None I_force = None
def __init__(self, fs, sig, Wp_force=None): def __init__(self, fs, sig, Wp_force=None, I_force=None):
""" """
:param fs: sampling rate :param fs: sampling rate
:param sig: audio signal normalized to [-1,1] :param sig: audio signal normalized to [-1,1]
""" """
super(BassAnalyzer, self).__init__() super(BassAnalyzer, self).__init__()
self.D = int(self.shift_sec * fs) #: spectrogram step self.D = int(self.shift_sec * fs) #: spectrogram step
if self.Wp_force: if Wp_force:
self.Wp = self.Wp_force
elif Wp_force:
self.Wp = Wp_force self.Wp = Wp_force
elif self.Wp_force:
self.Wp = self.Wp_force
else: else:
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
if I_force:
self.I_force = I_force
self.U = self.Wp // self.W # ratio self.U = self.Wp // self.W # ratio
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters) self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)

View File

@@ -19,14 +19,19 @@ class Segmenter:
def __init__(self): pass def __init__(self): pass
def get_segments(self, fs, guitar):
i_stxs = self.get_segment_boundaries(fs, guitar)
i_stxs = np.pad(i_stxs, (1, 0))
return i_stxs
def get_segment_boundaries(self, fs, guitar): def get_segment_boundaries(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments.""" """split the spectral power signal 'guitar' into stochastically similar segments."""
segment_ids = self.get_segments(fs, guitar) segment_ids = self._get_segments(fs, guitar)
stxs = np.diff(segment_ids) != 0 stxs = np.diff(segment_ids) != 0
i_stxs = np.where(stxs)[0] i_stxs = np.where(stxs)[0]
return i_stxs return i_stxs
def get_segments(self, fs, guitar): def _get_segments(self, fs, guitar):
"""split the spectral power signal 'guitar' into stochastically similar segments.""" """split the spectral power signal 'guitar' into stochastically similar segments."""
seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec) seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec)
seg_guitar_data = self._sig_stochastics(fs, guitar) seg_guitar_data = self._sig_stochastics(fs, guitar)

179
song.py Normal file
View File

@@ -0,0 +1,179 @@
import numpy as np
from rhythm import BassAnalyzer, GuitarAnalyzer
from segmenter import Segmenter
from beat import SsfZxing, RegularBeatFinder
from sqi import gauss, shift
class SongBeatDetector:
SEGMENT_SLICE_LEN_SEC = 8.0 #: slice length for processing (long enough to contain bar structure; short enough for a constant freq. beat placement)
SSF_REL_THRES = 1.5 #: optimize for slope of error (mae) function over beat frequency
NE_THRES = 30.0 #: normalized error threshold for 'good' slices
def __init__(self): pass
def detect(self, fs, sig, use_f_hint=True, debug_fe_idx=None):
self.fs = fs
#self.sig = sig
self.ba = BassAnalyzer(fs, sig)
self.bass, times = self.ba.viterbi_wavelet_scalogram_amplitudes(dbg_time=True)
# times: durations of different stages
self.ga = GuitarAnalyzer(fs, sig)
self.guitar = self.ga.spectrogram_power_amplitudes()
fsd = fs / self.ga.D # <- guitar ('ga')
self.D = self.ga.D # <- guitar ('ga')
# self.bass, self.guitar: functions on windowed spectrum 0.008 sec apart (125 Hz)
self.sg = Segmenter()
self.i_seg = self.sg.get_segments(fsd, self.guitar) # <- guitar
self.t_seg = self.i_seg / fsd
self.fsd = fsd # reciprocal window step size
# we segment on 'guitar' info, but process 'bass' later
if use_f_hint:
# initial estimate (without 'f_hint')
zds_initial = self._estimate_segments(debug_fe_idx=None)
self.zds_initial = zds_initial
ifbs_good = np.array([zdd['ne'] < SongBeatDetector.NE_THRES for zdd in zds_initial])
fbs = np.array([zdd['fb'] for zdd in zds_initial])[np.where(ifbs_good)[0]]
bins, hfreq = np.histogram(fbs)
ih = np.argmax(bins)
self.f_hint = np.mean((hfreq[ih], hfreq[ih+1])) # center freq of bin
else:
self.f_hint = None
# actual estimate (using 'f_hint' to bias each segment)
self.zds = self._estimate_segments(f_hint=self.f_hint, debug_fe_idx=debug_fe_idx)
return self.zds
def _estimate_segments(self, f_hint=None, debug_fe_idx=None):
zds = []
fsd = self.fsd
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
# for each segment
for i in np.arange(self.i_seg.shape[0]-1):
i1, i2 = self.i_seg[i], self.i_seg[i+1]
t1, t2 = i1 / fsd, i2 / fsd
# split segment into slices
if i2-i1 < seg_sl: continue
num_sl = (i2-i1) // seg_sl
for m in np.arange(num_sl):
j1, j2 = i1+m*seg_sl, i1+(m+1)*seg_sl
sig_slice = self.bass[slice(j1, j2)] # <- bass
if debug_fe_idx is not None:
# there will be many (upto 50) different slices - do not debug-plot them all
debug_fe_sidx = debug_fe_idx / fs * fsd
debug_fe = i1 <= debug_fe_sidx < i2
else:
debug_fe = False
zdd = self._process_slice(j1, j2, m, sig_slice, f_hint=f_hint, debug_fe=debug_fe)
zds.append(zdd)
return zds
def _process_slice(self, j1, j2, m, sig_slice, f_hint=None, debug_fe=False):
"""
:param j1: lower index into 'sig_slice'
:param j2: upper index into 'sig_slice'
:param m: slice number (used to check if debugging)
:param debug_fe: show plots for SSF and raw/reg beat placement
"""
# TODO: C++ impl of SsfZxing._ssf_det_zxings() has diverged.
# - refractory period changes
# - ssf_th filter with 6-points
# - ?? others ??
# NOTE: SsfZxing here is always getting short 8-sec slices only (nb. for 'ssf_th' comput.)
fsd = self.fsd # reciprocal window step size
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
SsfZxing.ssf_rel_thres = SongBeatDetector.SSF_REL_THRES
zd = SsfZxing()
ssf, ssf_th = zd._ssf_function(fsd, sig_slice)
ssf_zxings = zd._ssf_det_zxings(fsd, ssf, ssf_th)
zdd = {
'i1': j1 * self.D, 'i2': j2 * self.D,
# ssf_zxings: raw beats (relative to slice)
'zd': zd, 'ssf': ssf, 'ssf_zxings': ssf_zxings,
'sig_slice': sig_slice, 'sig_source': 'bass',
'ssf_th': np.ones(ssf.shape[0]) * ssf_th
}
# (only plot first slice of a wider segment)
#if num_sl > 2 and m == 0:
if debug_fe:
#
# scalogram image, with viterbi path
self.ba.debug_plot(j1, j2) # TODO: adapt 'bass'
plt.title(f'scalogram & viterbi path, slice [{j1}:{j2}]')
# SSF function and detected raw beats
zd.debug_plot(0, seg_sl)
plt.title(f'raw beats, slice [{j1}:{j2}]')
# nice-to: optimize phase, (maybe iteratively, optimize phase and freq each)
bf = RegularBeatFinder()
fb, ne = bf.find_beat(fsd, ssf_zxings, f_hint=f_hint, debug_fe=debug_fe, debug_i=None)
if debug_fe: plt.title(f'regular-beat placement error (mae), slice [{j1}:{j2}]')
# mae is unnurmalized here, as returned from RegularBeatFinder._get_opt_ibi_freq_2()
zdd.update({
# bf: beat finder
# fb: beat frequency, in Hz
# ne: normalized mae error
'bf': bf, 'fb': fb, 'ne': ne
})
# TODO: ne > 30 is suspiciously bad - filter those "detections" out eventually
# TODO: # catch basic errors: ne == 0, or len(est_zxings) == 0, means slice is bad
# NOTE: since 2x the zero-crossings, we get twice the frequency here.
# NOTE: this means 0.5 lower freq bound of RegularBeatFinder will find at most 60 bpm in the song.
# TODO: RegularBeatFinder currently not using 'phase' info, but should be optimized
# TODO: (currently we start the pattern at the first detected beat, may or may not be good)
est_zxings = np.cumsum(np.pad(bf.freq_to_est_ibis(fsd, fb, j2-j1), (1,0))) # rel. to slice
if ssf_zxings.shape[0] > 0:
est_zxings += ssf_zxings[0] # add phase = currently we just start at first detected beat
# nice-to: median-filter the freq, etc.pp.
# nice-to: avoid adding len(est_zxings)=0 entries later
# trim back to max. index
est_zxings = est_zxings[np.where(est_zxings < ssf.shape[0])[0]]
zdd.update({
# est_zxings: regular beats (relative to slice)
'est_zxings': est_zxings
})
if debug_fe:
plt.figure(figsize=(8,2))
plt.plot(ssf)
plt.plot(np.arange(ssf.shape[0]), np.ones(ssf.shape[0]) * ssf_th); None
plt.scatter(ssf_zxings, np.ones(ssf_zxings.shape[0]) * ssf_th, c='r')
plt.scatter(est_zxings, np.ones(est_zxings.shape[0]) * ssf_th, c='g')
plt.title(f'ssf, ssf_th, raw beats (r), reg beats (g), slice [{j1}:{j2}]')
return zdd
# _debug_fmt_est_zxings
def _place_fmt_zxings(self, fsd, ssf, ssf_zxings):
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
#gauss_amplitude = 2.0
#def get_snr(self, fsd, ssf, ssf_threshold, ssf_zxings):
# """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
sigma = fsd * gauss_beat_template_sigma_sec
W = int(fsd * gauss_beat_template_win_sec)
gb = gauss(W, W//2, sigma)
# place gaussians on estimated beat locations
ssf_est = np.zeros(ssf.shape[0])
for i in ssf_zxings:
ssf_est += shift(ssf.shape[0], i, gb)
ssf_est /= gb[W//2] # normalize amplitude to 1.0
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
return ssf_est