Compare commits
19 Commits
3dc2e4d3d5
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 02549e5618 | |||
| 3b7bae0a38 | |||
| 041cba8224 | |||
| 729555acc3 | |||
| f3f580f923 | |||
| e349278c06 | |||
| 71f1975a97 | |||
| ee5a1376ee | |||
| e42cddd645 | |||
| b0a7202f32 | |||
| 71f55ab20d | |||
| 378009f8b0 | |||
| 4627786dc4 | |||
| 11934b1f61 | |||
| d10187878d | |||
| e506a3e580 | |||
| 975adcdee4 | |||
| 5d9de7d8f1 | |||
| 132dea15fa |
11
TODO.md
Normal file
11
TODO.md
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
# TODO
|
||||||
|
|
||||||
|
- tests: beat and guitar synthesizer
|
||||||
|
- generate rhythmic sequence and test the algos
|
||||||
|
.
|
||||||
|
|
||||||
|
O> "2027-04-29 TestApi Bass song.ipynb": [21]
|
||||||
|
- why is ssf continually rising?
|
||||||
|
- because of using 'fs' not 'fsd'
|
||||||
|
.
|
||||||
|
|
||||||
424
api.py
424
api.py
@@ -17,20 +17,24 @@
|
|||||||
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
|
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from base64 import b64encode
|
from base64 import b64encode
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from urllib.parse import urlencode
|
|
||||||
from urllib.request import Request, urlopen
|
|
||||||
import json
|
import json
|
||||||
|
from functools import wraps
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
from flask import Flask, request, session, jsonify, make_response
|
from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for
|
||||||
from werkzeug.middleware.proxy_fix import ProxyFix
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
||||||
from authomatic import Authomatic
|
from authomatic import Authomatic
|
||||||
from authomatic.adapters import WerkzeugAdapter
|
from authomatic.adapters import WerkzeugAdapter
|
||||||
from authomatic.providers import oauth2
|
from authomatic.providers import oauth2
|
||||||
from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP
|
from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP
|
||||||
|
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
|
|
||||||
@@ -95,14 +99,24 @@ PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify]
|
|||||||
SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"]
|
SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"]
|
||||||
SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"]
|
SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"]
|
||||||
|
|
||||||
# Must exactly match a Redirect URI configured in your Spotify app settings.
|
# OAuth redirect registered in the Spotify Developer Dashboard (Authorization Code
|
||||||
|
# flow). Spotify sends the browser here with ?code=&state= — not to the Android app scheme.
|
||||||
REDIRECT_URI = os.environ.get(
|
REDIRECT_URI = os.environ.get(
|
||||||
"SPOTIFY_REDIRECT_URI",
|
"SPOTIFY_REDIRECT_URI",
|
||||||
#"https://api.lockstep.at/spotify/callback"
|
#"https://api.lockstep.at/spotify/callback"
|
||||||
"https://api.lockstep.at/login/spotify/"
|
"https://api.lockstep.at/login/spotify/"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# After server-side token exchange, the browser may be redirected to this URL with
|
||||||
|
# tokens in the query string (mobile / Custom Tab). This is NOT registered with Spotify;
|
||||||
|
# only REDIRECT_URI above goes in the dashboard for this flow.
|
||||||
|
ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get(
|
||||||
|
"SPOTIFY_APP_POST_LOGIN_REDIRECT_URI",
|
||||||
|
"at.lockstep.player://spotify/callback",
|
||||||
|
)
|
||||||
|
|
||||||
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
|
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
|
||||||
|
METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections")
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
app.secret_key = os.environ["FLASK_SECRET_KEY"]
|
app.secret_key = os.environ["FLASK_SECRET_KEY"]
|
||||||
@@ -145,6 +159,17 @@ def init_db():
|
|||||||
updated_at TEXT NOT NULL
|
updated_at TEXT NOT NULL
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS uploaded_metadata (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
spotify_user_id TEXT NOT NULL,
|
||||||
|
track_id TEXT NOT NULL,
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
version INTEGER NOT NULL,
|
||||||
|
file_name TEXT NOT NULL,
|
||||||
|
created_at TEXT NOT NULL
|
||||||
|
)
|
||||||
|
""")
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@@ -214,25 +239,41 @@ def spotify_basic_auth_header() -> str:
|
|||||||
|
|
||||||
|
|
||||||
def refresh_spotify_token(refresh_token: str) -> dict:
|
def refresh_spotify_token(refresh_token: str) -> dict:
|
||||||
body = urlencode({
|
r = requests.post(
|
||||||
|
"https://accounts.spotify.com/api/token",
|
||||||
|
data={
|
||||||
"grant_type": "refresh_token",
|
"grant_type": "refresh_token",
|
||||||
"refresh_token": refresh_token,
|
"refresh_token": refresh_token,
|
||||||
}).encode("utf-8")
|
},
|
||||||
|
|
||||||
req = Request(
|
|
||||||
"https://accounts.spotify.com/api/token",
|
|
||||||
data=body,
|
|
||||||
method="POST",
|
|
||||||
headers={
|
headers={
|
||||||
"Authorization": spotify_basic_auth_header(),
|
"Authorization": spotify_basic_auth_header(),
|
||||||
"Content-Type": "application/x-www-form-urlencoded",
|
"Content-Type": "application/x-www-form-urlencoded",
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
with urlopen(req) as resp:
|
if not r.ok:
|
||||||
payload = json.loads(resp.read().decode("utf-8"))
|
# The token endpoint returns errors as flat
|
||||||
|
# {"error": "<code>", "error_description": "<msg>"}, which differs from
|
||||||
|
# the Web API's {"error": {"status": ..., "message": ...}} shape. Rewrite
|
||||||
|
# the body so our error handler surfaces a consistent envelope to
|
||||||
|
# clients regardless of which Spotify endpoint failed.
|
||||||
|
try:
|
||||||
|
body = r.json()
|
||||||
|
except ValueError:
|
||||||
|
body = {}
|
||||||
|
message = (
|
||||||
|
body.get("error_description")
|
||||||
|
or body.get("error")
|
||||||
|
or r.reason
|
||||||
|
or "Token refresh failed"
|
||||||
|
)
|
||||||
|
r._content = json.dumps({
|
||||||
|
"error": {"status": r.status_code, "message": message}
|
||||||
|
}).encode("utf-8")
|
||||||
|
r.headers["Content-Type"] = "application/json"
|
||||||
|
|
||||||
return payload
|
r.raise_for_status()
|
||||||
|
return r.json()
|
||||||
|
|
||||||
|
|
||||||
def get_valid_access_token(spotify_user_id: str) -> str:
|
def get_valid_access_token(spotify_user_id: str) -> str:
|
||||||
@@ -266,17 +307,121 @@ def get_valid_access_token(spotify_user_id: str) -> str:
|
|||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
# ----------------------------
|
||||||
# Simple Spotify API call
|
# Spotify Web API helpers
|
||||||
# ----------------------------
|
# ----------------------------
|
||||||
|
|
||||||
|
def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict:
|
||||||
|
"""
|
||||||
|
GET a Spotify Web API endpoint and return the parsed JSON body.
|
||||||
|
|
||||||
|
Retries on HTTP 429 and 503 with exponential backoff and optional Retry-After,
|
||||||
|
so brief Spotify rate limits often clear before we surface an error to the app.
|
||||||
|
"""
|
||||||
|
headers = {"Authorization": f"Bearer {access_token}"}
|
||||||
|
backoff_sec = 1.0
|
||||||
|
max_attempts = 8
|
||||||
|
params_eff = params
|
||||||
|
for attempt in range(max_attempts):
|
||||||
|
r = requests.get(url, headers=headers, params=params_eff)
|
||||||
|
if r.status_code in (429, 503) and attempt < max_attempts - 1:
|
||||||
|
wait = backoff_sec
|
||||||
|
ra = r.headers.get("Retry-After")
|
||||||
|
if ra:
|
||||||
|
try:
|
||||||
|
wait = max(wait, float(ra))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
wait = min(wait, 120.0)
|
||||||
|
time.sleep(wait + random.random() * 0.25 * wait)
|
||||||
|
backoff_sec = min(backoff_sec * 2.0, 60.0)
|
||||||
|
continue
|
||||||
|
r.raise_for_status()
|
||||||
|
return r.json()
|
||||||
|
|
||||||
|
|
||||||
|
def spotify_get_paginated(
|
||||||
|
url: str,
|
||||||
|
access_token: str,
|
||||||
|
limit: int = 50,
|
||||||
|
max_items: int | None = None,
|
||||||
|
) -> list:
|
||||||
|
"""
|
||||||
|
Fetch all items from a paginated Spotify Web API endpoint.
|
||||||
|
|
||||||
|
Spotify paging objects return up to `limit` items per page (50 max for most
|
||||||
|
endpoints) and a `next` URL. We follow `next` until it is null, collecting
|
||||||
|
items along the way.
|
||||||
|
"""
|
||||||
|
items: list = []
|
||||||
|
params: dict | None = {"limit": limit, "offset": 0}
|
||||||
|
next_url: str | None = url
|
||||||
|
|
||||||
|
while next_url is not None:
|
||||||
|
page = spotify_get(next_url, access_token, params=params)
|
||||||
|
items.extend(page.get("items", []))
|
||||||
|
|
||||||
|
if max_items is not None and len(items) >= max_items:
|
||||||
|
return items[:max_items]
|
||||||
|
|
||||||
|
# `next` is a fully-qualified URL with limit/offset already encoded,
|
||||||
|
# so we must not pass `params` again after the first request.
|
||||||
|
next_url = page.get("next")
|
||||||
|
params = None
|
||||||
|
|
||||||
|
return items
|
||||||
|
|
||||||
|
|
||||||
def spotify_get_me(access_token: str) -> dict:
|
def spotify_get_me(access_token: str) -> dict:
|
||||||
req = Request(
|
return spotify_get("https://api.spotify.com/v1/me", access_token)
|
||||||
"https://api.spotify.com/v1/me",
|
|
||||||
headers={"Authorization": f"Bearer {access_token}"},
|
|
||||||
method="GET",
|
# ----------------------------
|
||||||
|
# Error handling
|
||||||
|
# ----------------------------
|
||||||
|
|
||||||
|
@app.errorhandler(requests.HTTPError)
|
||||||
|
def handle_spotify_http_error(e: requests.HTTPError):
|
||||||
|
"""
|
||||||
|
Translate a non-2xx upstream response from Spotify into a JSON envelope,
|
||||||
|
passing through the upstream HTTP status code.
|
||||||
|
|
||||||
|
Spotify error bodies look like {"error": {"status": ..., "message": ...}}
|
||||||
|
when JSON is returned; we surface that message when available.
|
||||||
|
"""
|
||||||
|
resp = e.response
|
||||||
|
status = resp.status_code if resp is not None else 502
|
||||||
|
|
||||||
|
spotify_error = None
|
||||||
|
error_message = str(e)
|
||||||
|
|
||||||
|
if resp is not None:
|
||||||
|
try:
|
||||||
|
spotify_error = resp.json()
|
||||||
|
except ValueError:
|
||||||
|
spotify_error = None
|
||||||
|
|
||||||
|
if (
|
||||||
|
isinstance(spotify_error, dict)
|
||||||
|
and isinstance(spotify_error.get("error"), dict)
|
||||||
|
):
|
||||||
|
error_message = (
|
||||||
|
spotify_error["error"].get("message") or error_message
|
||||||
)
|
)
|
||||||
with urlopen(req) as resp:
|
|
||||||
return json.loads(resp.read().decode("utf-8"))
|
return jsonify({
|
||||||
|
"ok": False,
|
||||||
|
"error": error_message,
|
||||||
|
"spotify": spotify_error,
|
||||||
|
}), status
|
||||||
|
|
||||||
|
|
||||||
|
@app.errorhandler(requests.RequestException)
|
||||||
|
def handle_spotify_request_error(e: requests.RequestException):
|
||||||
|
"""Network-level failures (DNS, connection, timeout) have no upstream status."""
|
||||||
|
return jsonify({
|
||||||
|
"ok": False,
|
||||||
|
"error": f"Upstream request failed: {e}",
|
||||||
|
}), 502
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
# ----------------------------
|
||||||
@@ -294,6 +439,28 @@ def index():
|
|||||||
|
|
||||||
@app.route("/login/<provider_name>/", methods=["GET", "POST"])
|
@app.route("/login/<provider_name>/", methods=["GET", "POST"])
|
||||||
def login(provider_name):
|
def login(provider_name):
|
||||||
|
# Authomatic 1.3.0 (oauth2.login) only runs "phase 1" — redirect to Spotify —
|
||||||
|
# when there are no query parameters, or only ``user_state``:
|
||||||
|
# elif (not self.params or (len(self.params) == 1 and 'user_state' in self.params))
|
||||||
|
# A mobile client opening ``/login/spotify/?redirect_uri=...`` therefore matches
|
||||||
|
# no branch; login() returns without calling redirect() → empty body and HTTP 200
|
||||||
|
# (white page). Stash the app callback in the session and reload without query args.
|
||||||
|
if (
|
||||||
|
provider_name == "spotify"
|
||||||
|
and request.args.get("redirect_uri")
|
||||||
|
and "code" not in request.args
|
||||||
|
and "error" not in request.args
|
||||||
|
):
|
||||||
|
requested = request.args.get("redirect_uri", "")
|
||||||
|
if requested != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
|
||||||
|
return jsonify({
|
||||||
|
"ok": False,
|
||||||
|
"error": "redirect_uri not allowed for this client",
|
||||||
|
}), 400
|
||||||
|
session["spotify_oauth_app_redirect_uri"] = requested
|
||||||
|
session.modified = True
|
||||||
|
return redirect(url_for("login", provider_name=provider_name), code=302)
|
||||||
|
|
||||||
response = make_response()
|
response = make_response()
|
||||||
|
|
||||||
# Let Authomatic handle the OAuth2 handshake.
|
# Let Authomatic handle the OAuth2 handshake.
|
||||||
@@ -301,7 +468,7 @@ def login(provider_name):
|
|||||||
WerkzeugAdapter(request, response),
|
WerkzeugAdapter(request, response),
|
||||||
provider_name,
|
provider_name,
|
||||||
session=session,
|
session=session,
|
||||||
session_saver=lambda: session.modified
|
session_saver=lambda: setattr(session, "modified", True),
|
||||||
)
|
)
|
||||||
|
|
||||||
# If result is None, Authomatic is still redirecting/processing.
|
# If result is None, Authomatic is still redirecting/processing.
|
||||||
@@ -372,6 +539,27 @@ def login(provider_name):
|
|||||||
# keep the Spotify user id in session
|
# keep the Spotify user id in session
|
||||||
session["spotify_user_id"] = result.user.id
|
session["spotify_user_id"] = result.user.id
|
||||||
|
|
||||||
|
app_redirect = session.pop("spotify_oauth_app_redirect_uri", None)
|
||||||
|
if app_redirect:
|
||||||
|
if app_redirect != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
|
||||||
|
return jsonify({
|
||||||
|
"ok": False,
|
||||||
|
"error": "Stored app redirect_uri does not match allowlist",
|
||||||
|
}), 400
|
||||||
|
sep = "&" if ("?" in app_redirect) else "?"
|
||||||
|
target = (
|
||||||
|
f"{app_redirect}{sep}"
|
||||||
|
+ urlencode(
|
||||||
|
{
|
||||||
|
"access_token": access_token,
|
||||||
|
"refresh_token": refresh_token,
|
||||||
|
"expires_in": str(expires_in),
|
||||||
|
"token_type": token_type or "",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return redirect(target, code=302)
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
"ok": True,
|
"ok": True,
|
||||||
"spotify_user_id": result.user.id,
|
"spotify_user_id": result.user.id,
|
||||||
@@ -455,44 +643,210 @@ old example 1:
|
|||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@app.route("/me")
|
|
||||||
def me():
|
def spotify_access_token_from_authorization_header():
|
||||||
|
auth = request.headers.get("Authorization", "") or ""
|
||||||
|
if not auth.startswith("Bearer "):
|
||||||
|
return None
|
||||||
|
token = auth[7:].strip()
|
||||||
|
return token or None
|
||||||
|
|
||||||
|
|
||||||
|
def get_request_spotify_access_token():
|
||||||
|
"""
|
||||||
|
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
|
||||||
|
Fallback to Flask session + stored refresh flow (browser).
|
||||||
|
"""
|
||||||
|
bearer = spotify_access_token_from_authorization_header()
|
||||||
|
if bearer:
|
||||||
|
return bearer
|
||||||
spotify_user_id = session.get("spotify_user_id")
|
spotify_user_id = session.get("spotify_user_id")
|
||||||
if not spotify_user_id:
|
if not spotify_user_id:
|
||||||
return jsonify({"ok": False, "error": "Not logged in"}), 401
|
return None
|
||||||
|
return get_valid_access_token(spotify_user_id)
|
||||||
|
|
||||||
access_token = get_valid_access_token(spotify_user_id)
|
|
||||||
|
def require_auth(f):
|
||||||
|
@wraps(f)
|
||||||
|
def wrapped(*args, **kwargs):
|
||||||
|
token = get_request_spotify_access_token()
|
||||||
|
if not token:
|
||||||
|
return jsonify({"ok": False, "error": "Not logged in"}), 401
|
||||||
|
g.spotify_access_token = token
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/me")
|
||||||
|
@require_auth
|
||||||
|
def me():
|
||||||
|
access_token = g.spotify_access_token
|
||||||
profile = spotify_get_me(access_token)
|
profile = spotify_get_me(access_token)
|
||||||
|
|
||||||
row = get_token_record(spotify_user_id)
|
row = get_token_record(profile["id"])
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
"ok": True,
|
"ok": True,
|
||||||
"profile": profile,
|
"profile": profile,
|
||||||
"stored_expires_at": row["expires_at"],
|
"stored_expires_at": row["expires_at"] if row else None,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
@app.route("/playlists")
|
@app.route("/playlists")
|
||||||
|
@require_auth
|
||||||
def playlists():
|
def playlists():
|
||||||
spotify_user_id = "cidermole"
|
access_token = g.spotify_access_token
|
||||||
access_token = get_valid_access_token(spotify_user_id)
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
user_id = "Sara"
|
user_id = "Sara"
|
||||||
url = f"https://api.spotify.com/v1/users/{user_id}/playlists"
|
url = f"https://api.spotify.com/v1/users/{user_id}/playlists"
|
||||||
# -> 403 Forbidden
|
# -> 403 Forbidden
|
||||||
"""
|
"""
|
||||||
url = f"https://api.spotify.com/v1/me/playlists"
|
items = spotify_get_paginated(
|
||||||
headers={"Authorization": f"Bearer {access_token}"}
|
"https://api.spotify.com/v1/me/playlists",
|
||||||
r = requests.get(url, headers=headers)
|
access_token,
|
||||||
# TODO: pagination (limit,offset)
|
)
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
"ok": True,
|
"ok": True,
|
||||||
"response": r.json()
|
"total": len(items),
|
||||||
|
"items": items,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/playlists/<playlist_id>")
|
||||||
|
@require_auth
|
||||||
|
def playlist(playlist_id):
|
||||||
|
access_token = g.spotify_access_token
|
||||||
|
|
||||||
|
playlist_data = spotify_get(
|
||||||
|
f"https://api.spotify.com/v1/playlists/{playlist_id}",
|
||||||
|
access_token,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Full playlist objects use a paging object at `items` (current Spotify shape)
|
||||||
|
# or legacy `tracks`. Follow `next` on whichever is present.
|
||||||
|
paging_key = (
|
||||||
|
"items"
|
||||||
|
if isinstance(playlist_data.get("items"), dict)
|
||||||
|
else "tracks"
|
||||||
|
)
|
||||||
|
paging = playlist_data.get(paging_key) or {}
|
||||||
|
if paging.get("next"):
|
||||||
|
all_items = spotify_get_paginated(
|
||||||
|
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks",
|
||||||
|
access_token,
|
||||||
|
limit=100,
|
||||||
|
)
|
||||||
|
playlist_data[paging_key] = {
|
||||||
|
**paging,
|
||||||
|
"items": all_items,
|
||||||
|
"offset": 0,
|
||||||
|
"limit": len(all_items),
|
||||||
|
"next": None,
|
||||||
|
"previous": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonify({
|
||||||
|
"ok": True,
|
||||||
|
"playlist": playlist_data,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/metadata", methods=["GET"])
|
||||||
|
@require_auth
|
||||||
|
def get_metadata():
|
||||||
|
track_id = request.args.get("trackId")
|
||||||
|
meta_type = request.args.get("type", "beats")
|
||||||
|
|
||||||
|
if not track_id:
|
||||||
|
return jsonify({"ok": False, "error": "Missing trackId"}), 400
|
||||||
|
|
||||||
|
access_token = g.spotify_access_token
|
||||||
|
profile = spotify_get_me(access_token)
|
||||||
|
spotify_user_id = profile["id"]
|
||||||
|
|
||||||
|
conn = db()
|
||||||
|
row = conn.execute("""
|
||||||
|
SELECT file_name FROM uploaded_metadata
|
||||||
|
WHERE spotify_user_id = ? AND track_id = ? AND type = ?
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
""", (spotify_user_id, track_id, meta_type)).fetchone()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
if not row:
|
||||||
|
return jsonify({"ok": False, "error": "Not found"}), 404
|
||||||
|
|
||||||
|
file_path = os.path.join(METADATA_UPLOAD_DIR, row["file_name"])
|
||||||
|
if not os.path.isfile(file_path):
|
||||||
|
return jsonify({"ok": False, "error": "Not found"}), 404
|
||||||
|
|
||||||
|
with open(file_path, "r", encoding="utf-8") as f:
|
||||||
|
collection = json.load(f)
|
||||||
|
|
||||||
|
if not isinstance(collection, dict):
|
||||||
|
return jsonify({"ok": False, "error": "Invalid metadata file"}), 500
|
||||||
|
|
||||||
|
return jsonify({"ok": True, "collection": collection})
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/metadata", methods=["POST"])
|
||||||
|
@require_auth
|
||||||
|
def upload_metadata():
|
||||||
|
access_token = g.spotify_access_token
|
||||||
|
|
||||||
|
if not request.is_json:
|
||||||
|
return jsonify({"ok": False, "error": "Expected application/json"}), 400
|
||||||
|
|
||||||
|
body = request.get_json(silent=True)
|
||||||
|
if not isinstance(body, dict):
|
||||||
|
return jsonify({"ok": False, "error": "Invalid JSON body"}), 400
|
||||||
|
|
||||||
|
track_id = body.get("trackId")
|
||||||
|
meta_type = body.get("type")
|
||||||
|
version = body.get("version")
|
||||||
|
collection = body.get("collection")
|
||||||
|
|
||||||
|
if not track_id or not meta_type or version is None or collection is None:
|
||||||
|
return jsonify({
|
||||||
|
"ok": False,
|
||||||
|
"error": "Missing trackId, type, version, or collection",
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
try:
|
||||||
|
version = int(version)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return jsonify({"ok": False, "error": "version must be an integer"}), 400
|
||||||
|
|
||||||
|
if not isinstance(collection, dict):
|
||||||
|
return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400
|
||||||
|
|
||||||
|
profile = spotify_get_me(access_token)
|
||||||
|
spotify_user_id = profile["id"]
|
||||||
|
|
||||||
|
os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True)
|
||||||
|
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
|
||||||
|
safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120]
|
||||||
|
file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json"
|
||||||
|
file_path = os.path.join(METADATA_UPLOAD_DIR, file_name)
|
||||||
|
|
||||||
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(collection, f, ensure_ascii=False)
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
conn = db()
|
||||||
|
conn.execute("""
|
||||||
|
INSERT INTO uploaded_metadata (
|
||||||
|
spotify_user_id, track_id, type, version, file_name, created_at
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
""", (spotify_user_id, track_id, meta_type, version, file_name, now))
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
return jsonify({"ok": True, "file_name": file_name}), 201
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
init_db()
|
init_db()
|
||||||
app.run(host="127.0.0.1", port=8000, debug=True)
|
app.run(host="127.0.0.1", port=8000, debug=True)
|
||||||
|
|||||||
216
beat.py
Normal file
216
beat.py
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt # for debug only
|
||||||
|
from sqi import gauss
|
||||||
|
|
||||||
|
# note: may be called ZxingDetector instead?
|
||||||
|
class SsfZxing:
|
||||||
|
"""
|
||||||
|
Find beats in a Sum Slope Function by detecting threshold crossings.
|
||||||
|
See Zong et al, from CINC 2003.
|
||||||
|
"""
|
||||||
|
t_holdoff = 0.1 #: hold-off period in sec (ignore zxings after initial rise)
|
||||||
|
# these two depend on each other.
|
||||||
|
t_range = 0.032 #: rise amplitude range in sec: +/- around transition, we check the rise amplitude. about 2*sw_sec but nb. 0.008 sec steps in fs/D rate
|
||||||
|
sw_sec = 0.04 #: upslope width in sec (for SSF function)
|
||||||
|
ssf_rel_thres = 3 #: magic number from Zong 2003, threshold from mean SSF amplitude
|
||||||
|
ssf_rel_rise = 0.8 #: minimum rise of SSF edge (from foot to peak) relative to 'ssf_th'
|
||||||
|
|
||||||
|
# TODO: C++ impl has diverged.
|
||||||
|
# - refractory period changes
|
||||||
|
# - ssf_th filter with 6-points
|
||||||
|
# - ?? others ??
|
||||||
|
|
||||||
|
def __init__(self): pass
|
||||||
|
|
||||||
|
def _ssf_det_zxings(self, fs, ssf, ssf_th):
|
||||||
|
"""detect threshold crossings in 'ssf' signal."""
|
||||||
|
i_holdoff = int(self.t_holdoff * fs)
|
||||||
|
# threshold crossing
|
||||||
|
ssf_pk = np.pad((ssf > ssf_th).astype(int), (0,1))
|
||||||
|
ssf_pks = np.pad(ssf_pk[:-1], (1,0))
|
||||||
|
ssf_z = (ssf_pk - ssf_pks) == 1
|
||||||
|
# holdoff
|
||||||
|
for i in np.arange(ssf_z.shape[0] - i_holdoff):
|
||||||
|
if ssf_z[i]:
|
||||||
|
ssf_z[i+1:i+1+i_holdoff] = 0
|
||||||
|
# rise amplitude filter (check in 2-3 samples [0.024 sec or so] and compare vs. threshold)
|
||||||
|
i_range = int(self.t_range * fs)
|
||||||
|
for i in np.arange(i_range, ssf_z.shape[0]-i_range-1):
|
||||||
|
if ssf_z[i]:
|
||||||
|
rise = np.max(ssf[i:i+i_range]) - np.min(ssf[i-i_range:i])
|
||||||
|
if rise < ssf_th * self.ssf_rel_rise:
|
||||||
|
ssf_z[i] = 0
|
||||||
|
ssf_z[-i_range:] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
||||||
|
ssf_z[:i_range] = 0 # force-zero the bounds where we cannot check the amplitude rise
|
||||||
|
ssf_zxings = np.where(ssf_z)[0]
|
||||||
|
# only integer-index resolution (no interpolation)
|
||||||
|
self.ssf_zxings = ssf_zxings
|
||||||
|
return ssf_zxings
|
||||||
|
|
||||||
|
def _ssf_function(self, fs, y):
|
||||||
|
"""sum-slope function."""
|
||||||
|
sw = int(self.sw_sec*fs)
|
||||||
|
duk = np.clip(np.diff(np.pad(y, (1,0))), a_min=0, a_max=np.inf) # left-looking window
|
||||||
|
#ssf = np.convolve(duk, slope_filter, mode='same') # centered window (acausal!)
|
||||||
|
duks = np.pad(np.cumsum(duk), (0, sw))
|
||||||
|
duks_r = np.roll(duks, sw)
|
||||||
|
ssf = (duks - duks_r)[:-sw]
|
||||||
|
# compute threshold
|
||||||
|
# TODO: check if we need lowpass instead of mean for 'ssf_th'
|
||||||
|
ssf_th = self.ssf_rel_thres * np.mean(ssf)
|
||||||
|
self.ssf, self.ssf_th = ssf, ssf_th
|
||||||
|
return ssf, ssf_th
|
||||||
|
|
||||||
|
def debug_plot(self, i1, i2):
|
||||||
|
ssf, ssf_th, ssf_zxings = self.ssf, self.ssf_th, self.ssf_zxings
|
||||||
|
ssf_slice = ssf[i1:i2]
|
||||||
|
ssf_th_slice = ssf_th[i1:i2] if isinstance(ssf_th, np.ndarray) else ssf_th
|
||||||
|
plt.figure(figsize=(8, 2))
|
||||||
|
plt.plot(ssf_slice)
|
||||||
|
plt.plot(np.arange(ssf_slice.shape[0]), np.ones(ssf_slice.shape[0]) * ssf_th_slice)
|
||||||
|
plt.scatter(ssf_zxings[i1:i2], np.ones(ssf_zxings[i1:i2].shape[0]) * ssf_th_slice, c='r')
|
||||||
|
|
||||||
|
def get_mae_dist(ibis):
|
||||||
|
"""make triangular wave between beats, representing absolute beat placement error."""
|
||||||
|
dist = np.zeros(np.sum(ibis)+1)
|
||||||
|
# fill with distances
|
||||||
|
p_i, i = 0, 0
|
||||||
|
for ibi in ibis:
|
||||||
|
i += ibi
|
||||||
|
l_c = ibi // 2
|
||||||
|
dist[p_i:p_i+l_c+ibi%2] = np.arange(l_c+ibi%2)
|
||||||
|
dist[p_i+l_c+ibi%2:i] = 1 + np.arange(ibi-l_c-ibi%2)[::-1]
|
||||||
|
p_i = i
|
||||||
|
return dist
|
||||||
|
|
||||||
|
|
||||||
|
def get_mae_err_1(fs, freq, phase, act_ibis, debug=False):
|
||||||
|
"""
|
||||||
|
compute beat placement error between zero crossings of given sine wave (estimate)
|
||||||
|
and actually detected beats. computes 'dist' from 'act_ibis' (direction 1).
|
||||||
|
"""
|
||||||
|
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
|
||||||
|
# 2 pi freq n / fs + phase = k * pi
|
||||||
|
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
|
||||||
|
# n = (k - phase/pi) * fs / (2 freq)
|
||||||
|
# k_max = 2 freq n_max / fs + phase/pi
|
||||||
|
N = np.sum(act_ibis)+1
|
||||||
|
phase %= 2*np.pi
|
||||||
|
k_max = 2 * freq * N / fs + phase/np.pi + 1
|
||||||
|
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
||||||
|
est_ibis = np.diff(i_est_zxing)
|
||||||
|
dist = get_mae_dist(est_ibis)
|
||||||
|
assert np.sum(est_ibis) > np.sum(act_ibis)
|
||||||
|
if debug:
|
||||||
|
plt.figure(figsize=(8,2))
|
||||||
|
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
|
||||||
|
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
|
||||||
|
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
|
||||||
|
mae_errs = dist[np.cumsum(act_ibis)]
|
||||||
|
return np.sum(mae_errs)
|
||||||
|
|
||||||
|
|
||||||
|
def get_mae_err_2(fs, freq, phase, act_ibis, debug=False):
|
||||||
|
"""
|
||||||
|
compute beat placement error between zero crossings of given sine wave (estimate)
|
||||||
|
and actually detected beats. computes 'dist' from est_zxings (direction 2)
|
||||||
|
"""
|
||||||
|
dist = np.pad(get_mae_dist(act_ibis), (0,1))
|
||||||
|
dist[-1] = 1 # sentinel for rounding errors in np.round()
|
||||||
|
# sin(2 pi freq n / fs + phase) ... 0, pi, 2*pi, ...
|
||||||
|
# 2 pi freq n / fs + phase = k * pi
|
||||||
|
# 2 freq n / fs + phase/pi = k (= 0, 1, 2, ...)
|
||||||
|
# n = (k - phase/pi) * fs / (2 freq)
|
||||||
|
# k_max = 2 freq n_max / fs + phase/pi
|
||||||
|
N = np.sum(act_ibis)+1
|
||||||
|
phase %= 2*np.pi
|
||||||
|
k_max = 2 * freq * N / fs + phase/np.pi
|
||||||
|
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
||||||
|
if debug:
|
||||||
|
plt.figure(figsize=(8,2))
|
||||||
|
plt.plot(np.sin(2 * np.pi * freq * np.arange(N) / fs + phase))
|
||||||
|
plt.stem(np.cumsum(act_ibis), np.ones(act_ibis.shape[0]), markerfmt='r')
|
||||||
|
plt.stem(i_est_zxing, np.ones(i_est_zxing.shape[0]), markerfmt='y')
|
||||||
|
mae_errs = dist[i_est_zxing]
|
||||||
|
return np.sum(mae_errs)
|
||||||
|
|
||||||
|
|
||||||
|
def get_mae_err(fs, freq, phase, act_ibis, debug=False):
|
||||||
|
"""
|
||||||
|
compute beat placement error between zero crossings of given sine wave
|
||||||
|
and actually detected beats.
|
||||||
|
"""
|
||||||
|
mae = 0
|
||||||
|
# we compute both directions, to properly handle "missing beats"
|
||||||
|
# (in direction 1, an optimal solution is "fully dense", "freq = fs/2", because each 'act_beat' will align to the next index)
|
||||||
|
# (in direction 2, an optimal solution is "fully sparse", "freq = 1/L", because those are the only 'est_beats' which are aligned)
|
||||||
|
mae += get_mae_err_1(fs, freq, phase, act_ibis, debug)
|
||||||
|
mae += get_mae_err_2(fs, freq, phase, act_ibis, debug)
|
||||||
|
# TODO: may need to weight these two differently
|
||||||
|
# TODO: see "2027-04-27 TestApi_0b" vs "2027-04-27 TestApi" plots [24]
|
||||||
|
# TODO: (check: is match always slightly to the left of the trough / smooth minimum?)
|
||||||
|
# TODO (if so, we may need to weight dir1 and dir2 differently -- or maybe norm by pts density??)
|
||||||
|
# (or even penalize differently instead of adding dir1 and dir2)
|
||||||
|
return mae
|
||||||
|
|
||||||
|
class RegularBeatFinder:
|
||||||
|
"""
|
||||||
|
Optimize a beat frequency by placing a regular beat over the detected beats.
|
||||||
|
Always finds a beat within f1..f2 range,
|
||||||
|
ignoring whether the detected beat is an upper harmonic.
|
||||||
|
"""
|
||||||
|
num_freqs = 200 #: number of freq steps to evaluate
|
||||||
|
range_f1 = 0.5 #: lowest detection frequency in Hz
|
||||||
|
range_f2 = 4.0 #: highest detection frequency in Hz
|
||||||
|
f_bias_width = 0.2 #: gaussian std relative to num_freqs within f1..f2
|
||||||
|
|
||||||
|
def __init__(self): pass
|
||||||
|
|
||||||
|
def find_beat(self, fs, ssf_zxings, f_hint=None, debug_fe=False, debug_i=None):
|
||||||
|
"""Find the optimal beat frequency."""
|
||||||
|
act_ibis = np.diff(ssf_zxings)
|
||||||
|
# nice-to: may be interesting to also use as score: the ssf amplitude info at the beats to which we aligned
|
||||||
|
# evaluate mean absolute errors for all frequencies
|
||||||
|
freqs, freq_errs = self._get_opt_ibi_freq_2(fs, act_ibis, debug_i)
|
||||||
|
# bias with f_hint - once we know the beat freq, make it more likely for it to be found everywhere
|
||||||
|
if f_hint is not None:
|
||||||
|
nf, f1, f2 = RegularBeatFinder.num_freqs, RegularBeatFinder.range_f1, RegularBeatFinder.range_f2
|
||||||
|
bias = gauss(
|
||||||
|
nf,
|
||||||
|
(f_hint - f1) / (f2 - f1) * nf,
|
||||||
|
RegularBeatFinder.f_bias_width * nf
|
||||||
|
)
|
||||||
|
freqs_bias = 1.0 / (np.max(bias)+bias) # make 'f_hint' at most 2x more likely -- (1+bias) if normalized
|
||||||
|
freq_errs *= freqs_bias
|
||||||
|
#
|
||||||
|
if debug_fe:
|
||||||
|
plt.figure(figsize=(8,2))
|
||||||
|
plt.plot(freqs, freq_errs)
|
||||||
|
# get optimal frequency
|
||||||
|
i_freq = np.argmin(freq_errs)
|
||||||
|
# compute normalized error (mean beat placement error in samples)
|
||||||
|
N = np.sum(act_ibis)+1
|
||||||
|
k_max = 2 * freqs[i_freq] * N / fs # + phase/np.pi
|
||||||
|
norm_err = freq_errs[i_freq] / k_max
|
||||||
|
#
|
||||||
|
return freqs[i_freq], norm_err
|
||||||
|
|
||||||
|
def freq_to_est_ibis(self, fs, freq, N):
|
||||||
|
phase = 0
|
||||||
|
k_max = 2 * freq * N / fs + phase/np.pi
|
||||||
|
i_est_zxing = np.round((np.arange(k_max) - phase/np.pi) * fs / (2 * freq)).astype(int)
|
||||||
|
return np.diff(i_est_zxing)
|
||||||
|
|
||||||
|
def _get_opt_ibi_freq_2(self, fs, act_ibis, debug_i=None):
|
||||||
|
"""get mean absolute errors for a range of frequencies."""
|
||||||
|
assert self.range_f2 < fs/8, "it is at most useful to go until f = fs/8" # (and even then, get_mae_dist() triangle will not be very useful)
|
||||||
|
t_freqs = np.linspace(self.range_f1, self.range_f2, self.num_freqs)
|
||||||
|
fe = np.zeros(self.num_freqs)
|
||||||
|
for i, f in enumerate(t_freqs):
|
||||||
|
fe[i] = get_mae_err(fs, f, 0.0, act_ibis)
|
||||||
|
if debug_i is not None:
|
||||||
|
# plot colors:
|
||||||
|
# y: estimate
|
||||||
|
# r: actual
|
||||||
|
get_mae_err(fs, t_freqs[debug_i], 0.0, act_ibis, debug=True)
|
||||||
|
return t_freqs, fe
|
||||||
104
docs/playlists.md
Normal file
104
docs/playlists.md
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
# Playlist endpoints
|
||||||
|
|
||||||
|
All routes require an authenticated session (`spotify_user_id` after Spotify login). Responses are JSON (`application/json`).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## `GET /playlists`
|
||||||
|
|
||||||
|
Returns every playlist for the current user by following Spotify’s paginated [`GET /v1/me/playlists`](https://developer.spotify.com/documentation/web-api/reference/get-a-list-of-current-users-playlists) until all pages are loaded.
|
||||||
|
|
||||||
|
### Success (200)
|
||||||
|
|
||||||
|
| Field | Type | Description |
|
||||||
|
| --- | --- | --- |
|
||||||
|
| `ok` | `boolean` | Always `true` on success. |
|
||||||
|
| `total` | `number` | Count of playlists in `items`. |
|
||||||
|
| `items` | `array` | Each element is a **simplified playlist object** from Spotify |
|
||||||
|
|
||||||
|
**Typical fields on each element of `items`** (Spotify `SimplifiedPlaylistObject`):
|
||||||
|
|
||||||
|
| Field | Type |
|
||||||
|
| --- | --- |
|
||||||
|
| `description` | `string` \| `null` |
|
||||||
|
| `id` | `string` |
|
||||||
|
| `images` | `array` of `{ "url": string, "height": number \| null, "width": number \| null }` |
|
||||||
|
| `name` | `string` |
|
||||||
|
| `primary_color` | `string` \| `null` |
|
||||||
|
| `snapshot_id` | `string` |
|
||||||
|
| `tracks` | `object` — e.g. `{ "href": string, "total": number }` (track list stub, not full tracks) |
|
||||||
|
|
||||||
|
### Errors
|
||||||
|
|
||||||
|
`{ "ok": false, "error": string, ... }`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## `GET /playlists/<playlist_id>`
|
||||||
|
|
||||||
|
`<playlist_id>` is the Spotify playlist ID (the same id as in playlist URLs / `items[].id`).
|
||||||
|
|
||||||
|
Fetches [`GET /v1/playlists/{playlist_id}`](https://developer.spotify.com/documentation/web-api/reference/get-playlist) following pagination.
|
||||||
|
|
||||||
|
### Success (200)
|
||||||
|
|
||||||
|
| Field | Type | Description |
|
||||||
|
| --- | --- | --- |
|
||||||
|
| `ok` | `boolean` | Always `true` on success. |
|
||||||
|
| `playlist` | `object` | **Full playlist object** from Spotify, with `tracks` possibly expanded to every track as described above. |
|
||||||
|
|
||||||
|
**Typical fields on `playlist`** (Spotify `PlaylistObject`):
|
||||||
|
|
||||||
|
| Field | Type |
|
||||||
|
| --- | --- |
|
||||||
|
| `description` | `string` \| `null` |
|
||||||
|
| `id` | `string` |
|
||||||
|
| `images` | `array` (image objects, as above) |
|
||||||
|
| `name` | `string` |
|
||||||
|
| `primary_color` | `string` \| `null` |
|
||||||
|
| `snapshot_id` | `string` |
|
||||||
|
| `tracks` | `{"items": [Track, ...], ...}` |
|
||||||
|
|
||||||
|
**Typical fields on each element of `playlist.tracks.items`** (Spotify playlist track wrapper):
|
||||||
|
|
||||||
|
| Field | Type |
|
||||||
|
| --- | --- |
|
||||||
|
| `track` | `object` \| `null` — full or linked track; `null` if removed |
|
||||||
|
|
||||||
|
Nested objects use Spotify’s **Track**, **Artist** (simplified), and **Album** (simplified) shapes below (field availability can vary by market or API version; see Spotify’s reference).
|
||||||
|
|
||||||
|
#### `track` — Spotify `TrackObject`
|
||||||
|
|
||||||
|
Returned as the non-`null` value of `playlist.tracks.items[].track` (playlist context usually includes a **full** track with **simplified** `album` and `artists` entries).
|
||||||
|
|
||||||
|
| Field | Type |
|
||||||
|
| --- | --- |
|
||||||
|
| `album` | `object` — **SimplifiedAlbumObject** (see below) |
|
||||||
|
| `artists` | `array` of **SimplifiedArtistObject** (see below) |
|
||||||
|
| `duration_ms` | `number` |
|
||||||
|
| `id` | `string` |
|
||||||
|
| `name` | `string` |
|
||||||
|
|
||||||
|
#### `track.artists[]` — Spotify **SimplifiedArtistObject**
|
||||||
|
|
||||||
|
| Field | Type |
|
||||||
|
| --- | --- |
|
||||||
|
| `id` | `string` |
|
||||||
|
| `name` | `string` |
|
||||||
|
|
||||||
|
#### `track.album` — Spotify **SimplifiedAlbumObject**
|
||||||
|
|
||||||
|
| Field | Type |
|
||||||
|
| --- | --- |
|
||||||
|
| `artists` | `array` of **SimplifiedArtistObject** (album-level credits) |
|
||||||
|
| `id` | `string` |
|
||||||
|
| `images` | `array` of `{ "url": string, "height": number \| null, "width": number \| null }` |
|
||||||
|
| `name` | `string` |
|
||||||
|
|
||||||
|
### Errors
|
||||||
|
|
||||||
|
Same as `/playlists`: **401** when not logged in; otherwise Spotify errors and network errors per the global error handlers.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
For authoritative field lists and edge cases, see [Spotify Web API reference](https://developer.spotify.com/documentation/web-api).
|
||||||
@@ -4,3 +4,5 @@ authomatic
|
|||||||
requests
|
requests
|
||||||
numpy
|
numpy
|
||||||
scipy
|
scipy
|
||||||
|
scikit-learn
|
||||||
|
hsh-signal
|
||||||
152
rhythm.py
152
rhythm.py
@@ -13,8 +13,10 @@
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from numpy.fft import fft
|
from numpy.fft import fft
|
||||||
from scipy.signal import fftconvolve
|
import matplotlib.pyplot as plt # for debug only
|
||||||
from scipy.io import wavfile
|
|
||||||
|
from hsh_signal.signal import lowpass_fft
|
||||||
|
import time
|
||||||
|
|
||||||
def viterbi_highest_frequency_path_vectorized(Scp2, jump_penalty=2.0, use_log_amplitude=True):
|
def viterbi_highest_frequency_path_vectorized(Scp2, jump_penalty=2.0, use_log_amplitude=True):
|
||||||
Scp2 = np.asarray(Scp2, dtype=float)
|
Scp2 = np.asarray(Scp2, dtype=float)
|
||||||
@@ -80,7 +82,30 @@ def gabor_wavelet(omega, nu, fs, T, tt=None):
|
|||||||
psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega)
|
psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega)
|
||||||
return psi
|
return psi
|
||||||
|
|
||||||
class BassAnalyzer:
|
class Analyzer:
|
||||||
|
def __init__(self): pass
|
||||||
|
def debug_plot(self, i1, i2):
|
||||||
|
Scp2, path = self.Scp2, self.path
|
||||||
|
fs, Dp = self.fs, self.Dp
|
||||||
|
|
||||||
|
ss, omega, nu, fsp, Wp, I, J, freqs = self.pms
|
||||||
|
|
||||||
|
Scp2_slice = np.abs(Scp2[i1:i2])
|
||||||
|
|
||||||
|
plt.figure(figsize=(8,2))
|
||||||
|
plt.imshow(Scp2_slice.T, origin='lower')
|
||||||
|
x_positions = np.arange(Scp2_slice.shape[0]//250+1)*250
|
||||||
|
if x_positions[-1] == Scp2_slice.shape[0]:
|
||||||
|
x_positions[-1] -= 1 # so last tick is shown properly
|
||||||
|
t1 = i1 / (fs / Dp)
|
||||||
|
x_labels = ['{:.1f}'.format(t1+x*Dp/fs) for x in x_positions]
|
||||||
|
plt.xticks(x_positions, x_labels)
|
||||||
|
y_positions = np.arange(Scp2_slice.shape[1]//50)*50
|
||||||
|
y_labels = ['{:.1f}'.format((nu/(omega*ss[y]))) for y in y_positions] # Hz equivalents of wavelet scale
|
||||||
|
plt.yticks(y_positions, y_labels)
|
||||||
|
plt.plot(np.arange(Scp2_slice.shape[0]), path[i1:i2], c='r')
|
||||||
|
|
||||||
|
class BassAnalyzer(Analyzer):
|
||||||
"""
|
"""
|
||||||
Rhythm analysis from songs.
|
Rhythm analysis from songs.
|
||||||
Provides a beat amplitude signal from the audio signal.
|
Provides a beat amplitude signal from the audio signal.
|
||||||
@@ -103,14 +128,24 @@ class BassAnalyzer:
|
|||||||
wavelet_win_sec = 0.175
|
wavelet_win_sec = 0.175
|
||||||
k_omega, k_nu = 0.12, 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B')
|
k_omega, k_nu = 0.12, 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B')
|
||||||
viterbi_jump_penalty = 5000.0
|
viterbi_jump_penalty = 5000.0
|
||||||
|
Wp_force = None
|
||||||
|
I_force = None
|
||||||
|
|
||||||
def __init__(self, fs, sig):
|
def __init__(self, fs, sig, Wp_force=None, I_force=None):
|
||||||
"""
|
"""
|
||||||
:param fs: sampling rate
|
:param fs: sampling rate
|
||||||
:param sig: audio signal normalized to [-1,1]
|
:param sig: audio signal normalized to [-1,1]
|
||||||
"""
|
"""
|
||||||
|
super(BassAnalyzer, self).__init__()
|
||||||
self.D = int(self.shift_sec * fs) #: spectrogram step
|
self.D = int(self.shift_sec * fs) #: spectrogram step
|
||||||
|
if Wp_force:
|
||||||
|
self.Wp = Wp_force
|
||||||
|
elif self.Wp_force:
|
||||||
|
self.Wp = self.Wp_force
|
||||||
|
else:
|
||||||
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
|
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
|
||||||
|
if I_force:
|
||||||
|
self.I_force = I_force
|
||||||
self.U = self.Wp // self.W # ratio
|
self.U = self.Wp // self.W # ratio
|
||||||
|
|
||||||
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
|
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
|
||||||
@@ -120,21 +155,38 @@ class BassAnalyzer:
|
|||||||
self.M = (self.L-self.W) // self.D + 1 #: number of time steps
|
self.M = (self.L-self.W) // self.D + 1 #: number of time steps
|
||||||
self.fs = fs
|
self.fs = fs
|
||||||
|
|
||||||
def viterbi_wavelet_scalogram_amplitudes(self):
|
def viterbi_wavelet_scalogram_amplitudes(self, dbg_time=False):
|
||||||
"""
|
"""
|
||||||
Compute scalogram amplitudes from Viterbi path of highest-power frequencies.
|
Compute scalogram amplitudes from Viterbi path of highest-power frequencies.
|
||||||
NOTE: downsampled from the original 'fs'.
|
NOTE: downsampled from the original 'fs'.
|
||||||
:returns: (fsd, sig): sampling rate, amplitude signal
|
:returns: (fsd, sig): sampling rate, amplitude signal
|
||||||
"""
|
"""
|
||||||
|
t1 = time.time()
|
||||||
Spf = self._spectrogram()
|
Spf = self._spectrogram()
|
||||||
|
t2 = time.time()
|
||||||
pto = self._pulse_train(Spf)
|
pto = self._pulse_train(Spf)
|
||||||
|
t3 = time.time()
|
||||||
Spf2 = self._spectrogram_2()
|
Spf2 = self._spectrogram_2()
|
||||||
|
t4 = time.time()
|
||||||
pms = self._scalogram_params(pto)
|
pms = self._scalogram_params(pto)
|
||||||
|
t5 = time.time()
|
||||||
Spsi_ss = self._scalogram_wavelets(pms)
|
Spsi_ss = self._scalogram_wavelets(pms)
|
||||||
|
t6 = time.time()
|
||||||
Scp2 = self._scalogram(Spf2, Spsi_ss)
|
Scp2 = self._scalogram(Spf2, Spsi_ss)
|
||||||
|
t7 = time.time()
|
||||||
path = self._viterbi_path(Scp2)
|
path = self._viterbi_path(Scp2)
|
||||||
|
t8 = time.time()
|
||||||
ampl = self._viterbi_ampl(Scp2, path)
|
ampl = self._viterbi_ampl(Scp2, path)
|
||||||
|
t9 = time.time()
|
||||||
|
|
||||||
|
self.Scp2 = Scp2
|
||||||
|
self.path = path
|
||||||
|
self.pms = pms
|
||||||
|
|
||||||
|
if not dbg_time:
|
||||||
return ampl
|
return ampl
|
||||||
|
else:
|
||||||
|
return ampl, np.diff([t1, t2, t3, t4, t5, t6, t7, t8, t9])
|
||||||
|
|
||||||
def _spectrogram(self):
|
def _spectrogram(self):
|
||||||
"""W-FFT (STFTs) to determine scalogram parameters"""
|
"""W-FFT (STFTs) to determine scalogram parameters"""
|
||||||
@@ -162,13 +214,21 @@ class BassAnalyzer:
|
|||||||
g = np.abs(Spf) # (M x W)
|
g = np.abs(Spf) # (M x W)
|
||||||
g_bar = np.mean(g, axis=1) # (M)
|
g_bar = np.mean(g, axis=1) # (M)
|
||||||
# TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const.
|
# TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const.
|
||||||
A = np.mean(g_bar) # amplitude cutoff for pulse train
|
#A = np.mean(g_bar) # amplitude cutoff for pulse train
|
||||||
|
ip = int(fs)
|
||||||
|
g_bar_l = lowpass_fft(np.pad(g_bar, (ip, ip), mode='edge'), fps=fs/self.D, cf=0.5, tw=0.05)[ip:-ip]
|
||||||
|
A = g_bar_l
|
||||||
|
|
||||||
# compute transitions (pulse train)
|
# compute transitions (pulse train)
|
||||||
pt = (g_bar > A).astype(int) # pulse train
|
pt = (g_bar > A).astype(int) # pulse train
|
||||||
pt_re = (np.diff(pt) == 1).astype(int) # rising edge
|
pt_re = (np.diff(pt) == 1).astype(int) # rising edge
|
||||||
self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal
|
self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal
|
||||||
|
|
||||||
|
# clip B, to force **reasonable** frequency range for wavelets
|
||||||
|
# (noise will otherwise cause many transitions -> high B -> bass falls below freq range -> algo fail)
|
||||||
|
B_min, B_max = 0.5 * M / (fs / self.D), 5.0 * M / (fs / self.D)
|
||||||
|
self.B = np.clip(self.B, a_min=B_min, a_max=B_max)
|
||||||
|
|
||||||
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
|
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
|
||||||
squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int)
|
squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int)
|
||||||
ptr = pt[squashed_idxs]
|
ptr = pt[squashed_idxs]
|
||||||
@@ -192,6 +252,8 @@ class BassAnalyzer:
|
|||||||
f2 = self.f2
|
f2 = self.f2
|
||||||
Wp, Mp, Dp = self.Wp, self.Mp, self.Dp
|
Wp, Mp, Dp = self.Wp, self.Mp, self.Dp
|
||||||
|
|
||||||
|
# TODO(perf): 5.0 sec runtime
|
||||||
|
|
||||||
#
|
#
|
||||||
# compute spectrogram: 'Spf2' (M x Wp) <- from 'f'
|
# compute spectrogram: 'Spf2' (M x Wp) <- from 'f'
|
||||||
#
|
#
|
||||||
@@ -220,6 +282,9 @@ class BassAnalyzer:
|
|||||||
T = (Lp - Wp) / fsp # un-padded sample count -> time length
|
T = (Lp - Wp) / fsp # un-padded sample count -> time length
|
||||||
#omega = p * T / B # width parameter of Gabor wavelet
|
#omega = p * T / B # width parameter of Gabor wavelet
|
||||||
#nu = B / (p * T) # frequency parameter of Gabor wavelet
|
#nu = B / (p * T) # frequency parameter of Gabor wavelet
|
||||||
|
if self.I_force:
|
||||||
|
I = self.I_force
|
||||||
|
else:
|
||||||
I = int(np.log2(p**2 * T**2 / (delta * B**2)) - 3/2) # number of octaves
|
I = int(np.log2(p**2 * T**2 / (delta * B**2)) - 3/2) # number of octaves
|
||||||
J = int(256 / I) # number of voices per octave
|
J = int(256 / I) # number of voices per octave
|
||||||
|
|
||||||
@@ -253,12 +318,18 @@ class BassAnalyzer:
|
|||||||
# T, Lp, Wp
|
# T, Lp, Wp
|
||||||
T, Lp, Wp = self.T, self.Lp, self.Wp
|
T, Lp, Wp = self.T, self.Lp, self.Wp
|
||||||
|
|
||||||
|
# TODO(perf): reduce num of wavelets, and/or parallelize into freq slices
|
||||||
|
# TODO(perf): 3.5 sec runtime
|
||||||
|
|
||||||
# compute convolution with wavelets, by multiplication in freq-domain
|
# compute convolution with wavelets, by multiplication in freq-domain
|
||||||
# 'Scp2' (M x I*J)
|
# 'Scp2' (M x I*J)
|
||||||
Scp2 = np.matmul(Spf2, Spsi_ss.T) * (T/(Lp-Wp))
|
Scp2 = np.matmul(Spf2, Spsi_ss.T) * (T/(Lp-Wp))
|
||||||
return Scp2
|
return Scp2
|
||||||
|
|
||||||
def _viterbi_path(self, Scp2):
|
def _viterbi_path(self, Scp2):
|
||||||
|
# TODO(perf): parallelize into time slices
|
||||||
|
# TODO(perf): 4.5 sec runtime
|
||||||
|
|
||||||
# TODO: check if we should re-weight freq-jumps, because of log-scale frequencies
|
# TODO: check if we should re-weight freq-jumps, because of log-scale frequencies
|
||||||
path, dp, backptr = viterbi_highest_frequency_path_vectorized(
|
path, dp, backptr = viterbi_highest_frequency_path_vectorized(
|
||||||
(np.abs(Scp2)**2).T,
|
(np.abs(Scp2)**2).T,
|
||||||
@@ -271,3 +342,72 @@ class BassAnalyzer:
|
|||||||
def _viterbi_ampl(self, Scp2, path):
|
def _viterbi_ampl(self, Scp2, path):
|
||||||
max_amplitudes = np.array([np.abs(Scp2[i, path[i]]) for i in range(Scp2.shape[0])])
|
max_amplitudes = np.array([np.abs(Scp2[i, path[i]]) for i in range(Scp2.shape[0])])
|
||||||
return max_amplitudes
|
return max_amplitudes
|
||||||
|
|
||||||
|
|
||||||
|
class GuitarAnalyzer:
|
||||||
|
"""
|
||||||
|
Rhythm analysis from songs.
|
||||||
|
Provides a beat amplitude signal from the audio signal.
|
||||||
|
|
||||||
|
Performs short-time Fourier Transform on the signal,
|
||||||
|
then returns the f1..f2 band power for each window.
|
||||||
|
|
||||||
|
For low-frequency instruments like percussion,
|
||||||
|
use BassAnalyzer instead.
|
||||||
|
"""
|
||||||
|
W = 1024 #: window size (must be even, so that right padding W/2-1 works)
|
||||||
|
shift_sec = 0.008 #: window shift in sec ('delta_tau') between subsequent windows
|
||||||
|
target_band_f1 = 800.0 #: lower bound of target freq band in Hz
|
||||||
|
target_band_f2 = 4300.0 #: upper bound of target freq band in Hz
|
||||||
|
|
||||||
|
def __init__(self, fs, sig):
|
||||||
|
"""
|
||||||
|
:param fs: sampling rate
|
||||||
|
:param sig: audio signal normalized to [-1,1]
|
||||||
|
"""
|
||||||
|
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
|
||||||
|
self.fs = fs
|
||||||
|
|
||||||
|
self.D = int(self.shift_sec * fs) #: spectrogram step
|
||||||
|
self.L = self.f.shape[0]
|
||||||
|
self.M = (self.L-self.W) // self.D + 1 #: number of time steps
|
||||||
|
self.fs = fs
|
||||||
|
|
||||||
|
def spectrogram_power_amplitudes(self):
|
||||||
|
"""
|
||||||
|
Compute spectrogram power from a target frequency range.
|
||||||
|
NOTE: downsampled from the original 'fs'.
|
||||||
|
:returns: (fsd, sig): sampling rate, amplitude signal
|
||||||
|
"""
|
||||||
|
Spf = self._spectrogram()
|
||||||
|
ampl = self._spectrogram_power(Spf)
|
||||||
|
return ampl
|
||||||
|
|
||||||
|
def _spectrogram_power(self, Spf):
|
||||||
|
# Spf
|
||||||
|
# fs, W
|
||||||
|
fs, W = self.fs, self.W
|
||||||
|
k1, k2 = int(self.target_band_f1/fs*W), int(self.target_band_f2/fs*W) # freq band range in W-FFT
|
||||||
|
#
|
||||||
|
# spectrum power in f1..f2 bands
|
||||||
|
#
|
||||||
|
#hp_slice = highpass(np.sum(np.abs(Spf_slice[:, k1:k2]), axis=1), fps=fs/Dp, cf=2.0, tw=0.2)
|
||||||
|
hp_slice = np.sum(np.abs(Spf[:, k1:k2]), axis=1)-np.mean(np.sum(np.abs(Spf[:, k1:k2]), axis=1))
|
||||||
|
return hp_slice
|
||||||
|
|
||||||
|
def _spectrogram(self):
|
||||||
|
"""W-FFT (STFTs) to determine scalogram parameters"""
|
||||||
|
# *f
|
||||||
|
# M, W, D
|
||||||
|
f = self.f
|
||||||
|
M, W, D = self.M, self.W, self.D
|
||||||
|
|
||||||
|
#
|
||||||
|
# compute spectrogram: 'Spf' (M x W) <- from 'f'
|
||||||
|
#
|
||||||
|
iwss = np.linspace(W//2, W//2 + (M-1)*D, M, dtype=int) # 'D'-spaced start time indices of windows on 's'
|
||||||
|
Spf = np.zeros((M, W), dtype=np.complex128)
|
||||||
|
for i, iw in zip(range(M), iwss):
|
||||||
|
iws, iwe = iw-W//2, iw+W//2
|
||||||
|
Spf[i,:] = fft(f[iws:iwe])
|
||||||
|
return Spf
|
||||||
|
|||||||
70
segmenter.py
Normal file
70
segmenter.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
import numpy as np
|
||||||
|
from sklearn.cluster import KMeans
|
||||||
|
|
||||||
|
def median_filter(a, w):
|
||||||
|
ap = np.pad(a, (w//2, w//2), mode='edge')
|
||||||
|
o = np.zeros(a.shape[0])
|
||||||
|
for i in np.arange(a.shape[0]):
|
||||||
|
sl = ap[i:i+w]
|
||||||
|
o[i] = np.median(sl)
|
||||||
|
return o
|
||||||
|
|
||||||
|
# nice-to: split longer segments (above 30 sec), merge very-short segments
|
||||||
|
|
||||||
|
class Segmenter:
|
||||||
|
seg_win_size_sec = 4.0 #: window size for stat. measures for segmentation, in sec
|
||||||
|
seg_win_step_sec = 1.0 #: step for segmentation, in sec
|
||||||
|
n_clusters = 8 #: clusters for KMeans algorithm
|
||||||
|
seg_filt_win_sec = 20.0 #: median filter width for smoothing segments
|
||||||
|
|
||||||
|
def __init__(self): pass
|
||||||
|
|
||||||
|
def get_segments(self, fs, guitar):
|
||||||
|
i_stxs = self.get_segment_boundaries(fs, guitar)
|
||||||
|
i_stxs = np.pad(i_stxs, (1, 0))
|
||||||
|
return i_stxs
|
||||||
|
|
||||||
|
def get_segment_boundaries(self, fs, guitar):
|
||||||
|
"""split the spectral power signal 'guitar' into stochastically similar segments."""
|
||||||
|
segment_ids = self._get_segments(fs, guitar)
|
||||||
|
stxs = np.diff(segment_ids) != 0
|
||||||
|
i_stxs = np.where(stxs)[0]
|
||||||
|
return i_stxs
|
||||||
|
|
||||||
|
def _get_segments(self, fs, guitar):
|
||||||
|
"""split the spectral power signal 'guitar' into stochastically similar segments."""
|
||||||
|
seg_filt_win = int(self.seg_filt_win_sec / self.seg_win_step_sec)
|
||||||
|
seg_guitar_data = self._sig_stochastics(fs, guitar)
|
||||||
|
X = np.vstack((
|
||||||
|
seg_guitar_data[:,0]*1.4,
|
||||||
|
np.sqrt(np.sum(seg_guitar_data[:,1:]**2, axis=1))
|
||||||
|
)).T
|
||||||
|
# cluster by stochastic characteristics
|
||||||
|
kmeans = KMeans(n_clusters=self.n_clusters, random_state=0, n_init="auto").fit(X)
|
||||||
|
segment_ids = np.floor(median_filter(kmeans.labels_, seg_filt_win)).astype(int)
|
||||||
|
# up-sample segment id assignment
|
||||||
|
iidx = np.linspace(0, segment_ids.shape[0], guitar.shape[0], endpoint=False).astype(int)
|
||||||
|
return segment_ids[iidx]
|
||||||
|
|
||||||
|
def _sig_stochastics(self, fs, y):
|
||||||
|
"""compute the stochastic moments of the signal. normalized."""
|
||||||
|
seg_win_size = int(self.seg_win_size_sec * fs)
|
||||||
|
seg_win_step = int(self.seg_win_step_sec * fs)
|
||||||
|
#
|
||||||
|
seg_y_data = np.zeros((y.shape[0] // seg_win_step, 4))
|
||||||
|
y_pad = np.pad(y, (seg_win_size // 2, seg_win_size // 2))
|
||||||
|
y_0_max, y_0_mean = np.max(y), np.mean(y)
|
||||||
|
y_1_max = np.max(np.mean((y - y_0_mean)**2))
|
||||||
|
y_2_max = np.max(np.mean(np.abs((y - y_0_mean)**3)))
|
||||||
|
y_3_max = np.max(np.mean((y - y_0_mean)**4))
|
||||||
|
wo = int(self.seg_win_size_sec/self.seg_win_step_sec)
|
||||||
|
for i in np.arange(wo//2, y.shape[0] // seg_win_step - wo//2):
|
||||||
|
i_c = int((i+0.5)*seg_win_step)
|
||||||
|
y_slice = y_pad[i_c-seg_win_size//2:i_c+seg_win_size//2]
|
||||||
|
mean = np.mean(y_slice)
|
||||||
|
seg_y_data[i,0] = mean / y_0_max
|
||||||
|
seg_y_data[i,1] = np.mean((y_slice - mean)**2) / y_1_max
|
||||||
|
seg_y_data[i,2] = np.mean(np.abs((y_slice - mean)**3)) / y_2_max / 2
|
||||||
|
seg_y_data[i,3] = np.mean((y_slice - mean)**4) / y_3_max / 4
|
||||||
|
#
|
||||||
|
return seg_y_data
|
||||||
179
song.py
Normal file
179
song.py
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from rhythm import BassAnalyzer, GuitarAnalyzer
|
||||||
|
from segmenter import Segmenter
|
||||||
|
from beat import SsfZxing, RegularBeatFinder
|
||||||
|
from sqi import gauss, shift
|
||||||
|
|
||||||
|
class SongBeatDetector:
|
||||||
|
SEGMENT_SLICE_LEN_SEC = 8.0 #: slice length for processing (long enough to contain bar structure; short enough for a constant freq. beat placement)
|
||||||
|
SSF_REL_THRES = 1.5 #: optimize for slope of error (mae) function over beat frequency
|
||||||
|
NE_THRES = 30.0 #: normalized error threshold for 'good' slices
|
||||||
|
def __init__(self): pass
|
||||||
|
def detect(self, fs, sig, use_f_hint=True, debug_fe_idx=None):
|
||||||
|
self.fs = fs
|
||||||
|
#self.sig = sig
|
||||||
|
|
||||||
|
self.ba = BassAnalyzer(fs, sig)
|
||||||
|
self.bass, times = self.ba.viterbi_wavelet_scalogram_amplitudes(dbg_time=True)
|
||||||
|
# times: durations of different stages
|
||||||
|
|
||||||
|
self.ga = GuitarAnalyzer(fs, sig)
|
||||||
|
self.guitar = self.ga.spectrogram_power_amplitudes()
|
||||||
|
|
||||||
|
fsd = fs / self.ga.D # <- guitar ('ga')
|
||||||
|
self.D = self.ga.D # <- guitar ('ga')
|
||||||
|
|
||||||
|
# self.bass, self.guitar: functions on windowed spectrum 0.008 sec apart (125 Hz)
|
||||||
|
self.sg = Segmenter()
|
||||||
|
self.i_seg = self.sg.get_segments(fsd, self.guitar) # <- guitar
|
||||||
|
self.t_seg = self.i_seg / fsd
|
||||||
|
self.fsd = fsd # reciprocal window step size
|
||||||
|
|
||||||
|
# we segment on 'guitar' info, but process 'bass' later
|
||||||
|
|
||||||
|
if use_f_hint:
|
||||||
|
# initial estimate (without 'f_hint')
|
||||||
|
zds_initial = self._estimate_segments(debug_fe_idx=None)
|
||||||
|
self.zds_initial = zds_initial
|
||||||
|
ifbs_good = np.array([zdd['ne'] < SongBeatDetector.NE_THRES for zdd in zds_initial])
|
||||||
|
fbs = np.array([zdd['fb'] for zdd in zds_initial])[np.where(ifbs_good)[0]]
|
||||||
|
bins, hfreq = np.histogram(fbs)
|
||||||
|
ih = np.argmax(bins)
|
||||||
|
self.f_hint = np.mean((hfreq[ih], hfreq[ih+1])) # center freq of bin
|
||||||
|
else:
|
||||||
|
self.f_hint = None
|
||||||
|
|
||||||
|
# actual estimate (using 'f_hint' to bias each segment)
|
||||||
|
self.zds = self._estimate_segments(f_hint=self.f_hint, debug_fe_idx=debug_fe_idx)
|
||||||
|
|
||||||
|
return self.zds
|
||||||
|
|
||||||
|
def _estimate_segments(self, f_hint=None, debug_fe_idx=None):
|
||||||
|
zds = []
|
||||||
|
fsd = self.fsd
|
||||||
|
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
|
||||||
|
# for each segment
|
||||||
|
for i in np.arange(self.i_seg.shape[0]-1):
|
||||||
|
i1, i2 = self.i_seg[i], self.i_seg[i+1]
|
||||||
|
t1, t2 = i1 / fsd, i2 / fsd
|
||||||
|
# split segment into slices
|
||||||
|
if i2-i1 < seg_sl: continue
|
||||||
|
num_sl = (i2-i1) // seg_sl
|
||||||
|
for m in np.arange(num_sl):
|
||||||
|
j1, j2 = i1+m*seg_sl, i1+(m+1)*seg_sl
|
||||||
|
sig_slice = self.bass[slice(j1, j2)] # <- bass
|
||||||
|
|
||||||
|
if debug_fe_idx is not None:
|
||||||
|
# there will be many (upto 50) different slices - do not debug-plot them all
|
||||||
|
debug_fe_sidx = debug_fe_idx / fs * fsd
|
||||||
|
debug_fe = i1 <= debug_fe_sidx < i2
|
||||||
|
else:
|
||||||
|
debug_fe = False
|
||||||
|
zdd = self._process_slice(j1, j2, m, sig_slice, f_hint=f_hint, debug_fe=debug_fe)
|
||||||
|
zds.append(zdd)
|
||||||
|
|
||||||
|
return zds
|
||||||
|
|
||||||
|
def _process_slice(self, j1, j2, m, sig_slice, f_hint=None, debug_fe=False):
|
||||||
|
"""
|
||||||
|
:param j1: lower index into 'sig_slice'
|
||||||
|
:param j2: upper index into 'sig_slice'
|
||||||
|
:param m: slice number (used to check if debugging)
|
||||||
|
:param debug_fe: show plots for SSF and raw/reg beat placement
|
||||||
|
"""
|
||||||
|
# TODO: C++ impl of SsfZxing._ssf_det_zxings() has diverged.
|
||||||
|
# - refractory period changes
|
||||||
|
# - ssf_th filter with 6-points
|
||||||
|
# - ?? others ??
|
||||||
|
# NOTE: SsfZxing here is always getting short 8-sec slices only (nb. for 'ssf_th' comput.)
|
||||||
|
|
||||||
|
fsd = self.fsd # reciprocal window step size
|
||||||
|
seg_sl = int(SongBeatDetector.SEGMENT_SLICE_LEN_SEC * fsd) # segment slice length in 1/fsd units
|
||||||
|
|
||||||
|
SsfZxing.ssf_rel_thres = SongBeatDetector.SSF_REL_THRES
|
||||||
|
zd = SsfZxing()
|
||||||
|
ssf, ssf_th = zd._ssf_function(fsd, sig_slice)
|
||||||
|
ssf_zxings = zd._ssf_det_zxings(fsd, ssf, ssf_th)
|
||||||
|
|
||||||
|
zdd = {
|
||||||
|
'i1': j1 * self.D, 'i2': j2 * self.D,
|
||||||
|
# ssf_zxings: raw beats (relative to slice)
|
||||||
|
'zd': zd, 'ssf': ssf, 'ssf_zxings': ssf_zxings,
|
||||||
|
'sig_slice': sig_slice, 'sig_source': 'bass',
|
||||||
|
'ssf_th': np.ones(ssf.shape[0]) * ssf_th
|
||||||
|
}
|
||||||
|
|
||||||
|
# (only plot first slice of a wider segment)
|
||||||
|
#if num_sl > 2 and m == 0:
|
||||||
|
if debug_fe:
|
||||||
|
#
|
||||||
|
# scalogram image, with viterbi path
|
||||||
|
self.ba.debug_plot(j1, j2) # TODO: adapt 'bass'
|
||||||
|
plt.title(f'scalogram & viterbi path, slice [{j1}:{j2}]')
|
||||||
|
|
||||||
|
# SSF function and detected raw beats
|
||||||
|
zd.debug_plot(0, seg_sl)
|
||||||
|
plt.title(f'raw beats, slice [{j1}:{j2}]')
|
||||||
|
|
||||||
|
# nice-to: optimize phase, (maybe iteratively, optimize phase and freq each)
|
||||||
|
bf = RegularBeatFinder()
|
||||||
|
fb, ne = bf.find_beat(fsd, ssf_zxings, f_hint=f_hint, debug_fe=debug_fe, debug_i=None)
|
||||||
|
if debug_fe: plt.title(f'regular-beat placement error (mae), slice [{j1}:{j2}]')
|
||||||
|
# mae is unnurmalized here, as returned from RegularBeatFinder._get_opt_ibi_freq_2()
|
||||||
|
zdd.update({
|
||||||
|
# bf: beat finder
|
||||||
|
# fb: beat frequency, in Hz
|
||||||
|
# ne: normalized mae error
|
||||||
|
'bf': bf, 'fb': fb, 'ne': ne
|
||||||
|
})
|
||||||
|
# TODO: ne > 30 is suspiciously bad - filter those "detections" out eventually
|
||||||
|
# TODO: # catch basic errors: ne == 0, or len(est_zxings) == 0, means slice is bad
|
||||||
|
# NOTE: since 2x the zero-crossings, we get twice the frequency here.
|
||||||
|
# NOTE: this means 0.5 lower freq bound of RegularBeatFinder will find at most 60 bpm in the song.
|
||||||
|
|
||||||
|
# TODO: RegularBeatFinder currently not using 'phase' info, but should be optimized
|
||||||
|
# TODO: (currently we start the pattern at the first detected beat, may or may not be good)
|
||||||
|
est_zxings = np.cumsum(np.pad(bf.freq_to_est_ibis(fsd, fb, j2-j1), (1,0))) # rel. to slice
|
||||||
|
if ssf_zxings.shape[0] > 0:
|
||||||
|
est_zxings += ssf_zxings[0] # add phase = currently we just start at first detected beat
|
||||||
|
# nice-to: median-filter the freq, etc.pp.
|
||||||
|
# nice-to: avoid adding len(est_zxings)=0 entries later
|
||||||
|
|
||||||
|
# trim back to max. index
|
||||||
|
est_zxings = est_zxings[np.where(est_zxings < ssf.shape[0])[0]]
|
||||||
|
|
||||||
|
zdd.update({
|
||||||
|
# est_zxings: regular beats (relative to slice)
|
||||||
|
'est_zxings': est_zxings
|
||||||
|
})
|
||||||
|
|
||||||
|
if debug_fe:
|
||||||
|
plt.figure(figsize=(8,2))
|
||||||
|
plt.plot(ssf)
|
||||||
|
plt.plot(np.arange(ssf.shape[0]), np.ones(ssf.shape[0]) * ssf_th); None
|
||||||
|
plt.scatter(ssf_zxings, np.ones(ssf_zxings.shape[0]) * ssf_th, c='r')
|
||||||
|
plt.scatter(est_zxings, np.ones(est_zxings.shape[0]) * ssf_th, c='g')
|
||||||
|
plt.title(f'ssf, ssf_th, raw beats (r), reg beats (g), slice [{j1}:{j2}]')
|
||||||
|
|
||||||
|
return zdd
|
||||||
|
|
||||||
|
# _debug_fmt_est_zxings
|
||||||
|
def _place_fmt_zxings(self, fsd, ssf, ssf_zxings):
|
||||||
|
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
|
||||||
|
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
|
||||||
|
#gauss_amplitude = 2.0
|
||||||
|
|
||||||
|
#def get_snr(self, fsd, ssf, ssf_threshold, ssf_zxings):
|
||||||
|
# """Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
|
||||||
|
sigma = fsd * gauss_beat_template_sigma_sec
|
||||||
|
W = int(fsd * gauss_beat_template_win_sec)
|
||||||
|
gb = gauss(W, W//2, sigma)
|
||||||
|
# place gaussians on estimated beat locations
|
||||||
|
ssf_est = np.zeros(ssf.shape[0])
|
||||||
|
for i in ssf_zxings:
|
||||||
|
ssf_est += shift(ssf.shape[0], i, gb)
|
||||||
|
ssf_est /= gb[W//2] # normalize amplitude to 1.0
|
||||||
|
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
|
||||||
|
return ssf_est
|
||||||
|
|
||||||
56
sqi.py
Normal file
56
sqi.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def gauss(N, mu, sigma):
|
||||||
|
x = np.arange(N)
|
||||||
|
norm = sigma * np.sqrt(2*np.pi)
|
||||||
|
return np.exp(-1/2 * (x - mu)**2 / sigma**2) / norm
|
||||||
|
|
||||||
|
|
||||||
|
def shift(N, n, x):
|
||||||
|
"""shift the center of the signal 'x' to time index 'n'."""
|
||||||
|
y = np.zeros(N)
|
||||||
|
xl = x.shape[0]
|
||||||
|
s = n - xl // 2
|
||||||
|
x_bi, x_ei = np.clip(xl // 2 - n, a_min=0, a_max=xl), np.clip(N + xl - xl // 2 - n - xl % 2, a_min=0, a_max=xl)
|
||||||
|
y_bi, y_ei = np.clip(n - xl // 2, a_min=0, a_max=N), np.clip(n + xl // 2 + xl % 2, a_min=0, a_max=N)
|
||||||
|
y[y_bi:y_ei] = x[x_bi:x_ei]
|
||||||
|
return y
|
||||||
|
|
||||||
|
|
||||||
|
class SigQuality:
|
||||||
|
"""
|
||||||
|
Compute the Signal-to-Noise Ratio of beats
|
||||||
|
"""
|
||||||
|
gauss_beat_template_win_sec = 0.25542 #: gauss window width (as compared to beats in ssf function)
|
||||||
|
gauss_beat_template_sigma_sec = 0.027 #: gauss bump half-width parameter (as compared to beats in ssf function)
|
||||||
|
#gauss_amplitude = 2.0
|
||||||
|
|
||||||
|
def __init__(self): pass
|
||||||
|
|
||||||
|
def get_snr(self, fs, ssf, ssf_threshold, est_zxings):
|
||||||
|
"""Compute the Signal-to-Noise Ratio of beats, based on SSF function and detected beat locations."""
|
||||||
|
sigma = fs * self.gauss_beat_template_sigma_sec
|
||||||
|
W = int(fs * self.gauss_beat_template_win_sec)
|
||||||
|
gb = gauss(W, W//2, sigma)
|
||||||
|
# place gaussians on estimated beat locations
|
||||||
|
ssf_est = np.zeros(ssf.shape[0])
|
||||||
|
for i in est_zxings:
|
||||||
|
ssf_est += shift(ssf.shape[0], i, gb)
|
||||||
|
ssf_est /= gb[W//2] # normalize amplitude to 1.0
|
||||||
|
ssf_est = np.roll(ssf_est, int(sigma)) # shift to right (beat loc = gauss beginning, not center)
|
||||||
|
#sqi_ref = ssf_est * (self.gauss_amplitude * ssf_threshold) # set amplitude
|
||||||
|
|
||||||
|
# penalty term = (where there should be no amplitude, according to gaussians)
|
||||||
|
sqi_pen = 1.0 - ssf_est
|
||||||
|
|
||||||
|
# sqi = ratio of signal energy in goal vs. in noise
|
||||||
|
sqi_goal = np.sum(ssf_est * (ssf**2))
|
||||||
|
sqi_noise = np.sum(sqi_pen * (ssf**2))
|
||||||
|
|
||||||
|
# noise is everywhere, while signal is only around detected peaks - correct for this.
|
||||||
|
goal_density = np.mean(np.clip(2*sigma / np.diff(est_zxings), a_min=0, a_max=1))
|
||||||
|
sqi_goal /= goal_density
|
||||||
|
sqi = 10 * (np.log10(sqi_goal) - np.log10(sqi_noise))
|
||||||
|
|
||||||
|
return sqi
|
||||||
Reference in New Issue
Block a user