Files
lockstep-api/api.py

815 lines
25 KiB
Python

# app.py
# running:
# $ cd /var/sites/api.lockstep.at
# $ source .venv/bin/activate
# $ source .env
# $ # NO. flask --app api run --port 8000
# $ python3 api.py # runs __main__ and allows code reloading.
# Authomatic 1.3.0 is not compatible with Python 3.12+
# patch: /var/sites/api.lockstep.at/.venv/lib/python3.13/site-packages/authomatic/providers/__init__.py
# def _fetch()
# HTTPSConnection(
# # cert_file=cert_file, # <-- comment this
# sync continuously:
# $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done
import os
import re
import sqlite3
from base64 import b64encode
from datetime import datetime, timedelta, timezone
import json
from functools import wraps
from urllib.parse import urlencode
from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for
from werkzeug.middleware.proxy_fix import ProxyFix
from authomatic import Authomatic
from authomatic.adapters import WerkzeugAdapter
from authomatic.providers import oauth2
from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP
import random
import time
import requests
# ----------------------------
# Custom Spotify provider
# ----------------------------
class Spotify(oauth2.OAuth2):
"""
Custom Authomatic provider for Spotify Web API Authorization Code Flow.
Spotify docs:
- Authorization endpoint: https://accounts.spotify.com/authorize
- Token endpoint: https://accounts.spotify.com/api/token
- Current user profile: https://api.spotify.com/v1/me
"""
name = "spotify"
# Provider endpoints
user_authorization_url = "https://accounts.spotify.com/authorize"
access_token_url = "https://accounts.spotify.com/api/token"
user_info_url = "https://api.spotify.com/v1/me"
# Helpful preset scopes
spotify_scopes = ["playlist-read-private"]
def update_user(self):
"""
Fetch Spotify profile and map it onto Authomatic's generic user object.
"""
response = self.access(self.user_info_url)
if response.status != 200:
return response
data = response.data or {}
# Common Authomatic user fields
self.user.id = data.get("id")
self.user.name = data.get("display_name") or data.get("id")
#self.user.email = data.get("email")
# Spotify has images as a list; use the first one if present.
images = data.get("images") or []
if images:
self.user.picture = images[0].get("url")
# Optional extra fields if your Authomatic version exposes them
self.user.username = data.get("id")
self.user.link = (data.get("external_urls") or {}).get("spotify")
return response
# "must exist in the same module as Spotify (Authomatic provider class)"
PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify]
# ----------------------------
# App config
# ----------------------------
SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"]
SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"]
# OAuth redirect registered in the Spotify Developer Dashboard (Authorization Code
# flow). Spotify sends the browser here with ?code=&state= — not to the Android app scheme.
REDIRECT_URI = os.environ.get(
"SPOTIFY_REDIRECT_URI",
#"https://api.lockstep.at/spotify/callback"
"https://api.lockstep.at/login/spotify/"
)
# After server-side token exchange, the browser may be redirected to this URL with
# tokens in the query string (mobile / Custom Tab). This is NOT registered with Spotify;
# only REDIRECT_URI above goes in the dashboard for this flow.
ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get(
"SPOTIFY_APP_POST_LOGIN_REDIRECT_URI",
"at.lockstep.player://spotify/callback",
)
DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db")
METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections")
app = Flask(__name__)
app.secret_key = os.environ["FLASK_SECRET_KEY"]
app.wsgi_app = ProxyFix(
app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1
)
CONFIG = {
"spotify": {
"class_": Spotify,
"consumer_key": SPOTIFY_CLIENT_ID,
"consumer_secret": SPOTIFY_CLIENT_SECRET,
"scope": Spotify.spotify_scopes
}
}
authomatic = Authomatic(CONFIG, app.secret_key)
# ----------------------------
# SQLite helpers
# ----------------------------
def db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = db()
conn.execute("""
CREATE TABLE IF NOT EXISTS spotify_tokens (
spotify_user_id TEXT PRIMARY KEY,
access_token TEXT NOT NULL,
refresh_token TEXT NOT NULL,
token_type TEXT,
scope TEXT,
expires_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS uploaded_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
spotify_user_id TEXT NOT NULL,
track_id TEXT NOT NULL,
type TEXT NOT NULL,
version INTEGER NOT NULL,
file_name TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def save_token_record(
spotify_user_id: str,
access_token: str,
refresh_token: str,
expires_in: int,
token_type: str | None = None,
scope: str | None = None,
):
now = datetime.now(timezone.utc)
expires_at = now + timedelta(seconds=int(expires_in))
conn = db()
conn.execute("""
INSERT INTO spotify_tokens (
spotify_user_id, access_token, refresh_token,
token_type, scope, expires_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(spotify_user_id) DO UPDATE SET
access_token = excluded.access_token,
refresh_token = excluded.refresh_token,
token_type = excluded.token_type,
scope = excluded.scope,
expires_at = excluded.expires_at,
updated_at = excluded.updated_at
""", (
spotify_user_id,
access_token,
refresh_token,
token_type,
scope,
expires_at.isoformat(),
now.isoformat(),
))
conn.commit()
conn.close()
def get_token_record(spotify_user_id: str):
conn = db()
row = conn.execute("""
SELECT * FROM spotify_tokens
WHERE spotify_user_id = ?
""", (spotify_user_id,)).fetchone()
conn.close()
return row
# skew_seconds: refresh every 10 min
def token_is_expired_or_soon(expires_at_iso: str, skew_seconds: int = 3600-600) -> bool:
expires_at = datetime.fromisoformat(expires_at_iso)
now = datetime.now(timezone.utc)
return now + timedelta(seconds=skew_seconds) >= expires_at
# ----------------------------
# Spotify token refresh
# ----------------------------
def spotify_basic_auth_header() -> str:
raw = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8")
return "Basic " + b64encode(raw).decode("ascii")
def refresh_spotify_token(refresh_token: str) -> dict:
r = requests.post(
"https://accounts.spotify.com/api/token",
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
headers={
"Authorization": spotify_basic_auth_header(),
"Content-Type": "application/x-www-form-urlencoded",
},
)
if not r.ok:
# The token endpoint returns errors as flat
# {"error": "<code>", "error_description": "<msg>"}, which differs from
# the Web API's {"error": {"status": ..., "message": ...}} shape. Rewrite
# the body so our error handler surfaces a consistent envelope to
# clients regardless of which Spotify endpoint failed.
try:
body = r.json()
except ValueError:
body = {}
message = (
body.get("error_description")
or body.get("error")
or r.reason
or "Token refresh failed"
)
r._content = json.dumps({
"error": {"status": r.status_code, "message": message}
}).encode("utf-8")
r.headers["Content-Type"] = "application/json"
r.raise_for_status()
return r.json()
def get_valid_access_token(spotify_user_id: str) -> str:
row = get_token_record(spotify_user_id)
if not row:
raise RuntimeError(f"No stored token for spotify_user_id={spotify_user_id}")
if not token_is_expired_or_soon(row["expires_at"]):
return row["access_token"]
refreshed = refresh_spotify_token(row["refresh_token"])
new_access_token = refreshed["access_token"]
#print("refreshed token. new_access_token={}".format(new_access_token), flush=True)
new_refresh_token = refreshed.get("refresh_token") or row["refresh_token"]
new_expires_in = int(refreshed.get("expires_in", 3600))
new_token_type = refreshed.get("token_type") or row["token_type"]
new_scope = refreshed.get("scope") or row["scope"]
save_token_record(
spotify_user_id=spotify_user_id,
access_token=new_access_token,
refresh_token=new_refresh_token,
expires_in=new_expires_in,
token_type=new_token_type,
scope=new_scope,
)
return new_access_token
# ----------------------------
# Spotify Web API helpers
# ----------------------------
def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict:
"""
GET a Spotify Web API endpoint and return the parsed JSON body.
Retries on HTTP 429 and 503 with exponential backoff and optional Retry-After,
so brief Spotify rate limits often clear before we surface an error to the app.
"""
headers = {"Authorization": f"Bearer {access_token}"}
backoff_sec = 1.0
max_attempts = 8
params_eff = params
for attempt in range(max_attempts):
r = requests.get(url, headers=headers, params=params_eff)
if r.status_code in (429, 503) and attempt < max_attempts - 1:
wait = backoff_sec
ra = r.headers.get("Retry-After")
if ra:
try:
wait = max(wait, float(ra))
except ValueError:
pass
wait = min(wait, 120.0)
time.sleep(wait + random.random() * 0.25 * wait)
backoff_sec = min(backoff_sec * 2.0, 60.0)
continue
r.raise_for_status()
return r.json()
def spotify_get_paginated(
url: str,
access_token: str,
limit: int = 50,
max_items: int | None = None,
) -> list:
"""
Fetch all items from a paginated Spotify Web API endpoint.
Spotify paging objects return up to `limit` items per page (50 max for most
endpoints) and a `next` URL. We follow `next` until it is null, collecting
items along the way.
"""
items: list = []
params: dict | None = {"limit": limit, "offset": 0}
next_url: str | None = url
while next_url is not None:
page = spotify_get(next_url, access_token, params=params)
items.extend(page.get("items", []))
if max_items is not None and len(items) >= max_items:
return items[:max_items]
# `next` is a fully-qualified URL with limit/offset already encoded,
# so we must not pass `params` again after the first request.
next_url = page.get("next")
params = None
return items
def spotify_get_me(access_token: str) -> dict:
return spotify_get("https://api.spotify.com/v1/me", access_token)
# ----------------------------
# Error handling
# ----------------------------
@app.errorhandler(requests.HTTPError)
def handle_spotify_http_error(e: requests.HTTPError):
"""
Translate a non-2xx upstream response from Spotify into a JSON envelope,
passing through the upstream HTTP status code.
Spotify error bodies look like {"error": {"status": ..., "message": ...}}
when JSON is returned; we surface that message when available.
"""
resp = e.response
status = resp.status_code if resp is not None else 502
spotify_error = None
error_message = str(e)
if resp is not None:
try:
spotify_error = resp.json()
except ValueError:
spotify_error = None
if (
isinstance(spotify_error, dict)
and isinstance(spotify_error.get("error"), dict)
):
error_message = (
spotify_error["error"].get("message") or error_message
)
return jsonify({
"ok": False,
"error": error_message,
"spotify": spotify_error,
}), status
@app.errorhandler(requests.RequestException)
def handle_spotify_request_error(e: requests.RequestException):
"""Network-level failures (DNS, connection, timeout) have no upstream status."""
return jsonify({
"ok": False,
"error": f"Upstream request failed: {e}",
}), 502
# ----------------------------
# Routes
# ----------------------------
@app.route("/")
def index():
return """
<h1>Spotify + Authomatic example</h1>
<p><a href="/login/spotify/">Log in with Spotify</a></p>
<p>After login, this app fetches your Spotify profile and one extra API call.</p>
"""
@app.route("/login/<provider_name>/", methods=["GET", "POST"])
def login(provider_name):
# Authomatic 1.3.0 (oauth2.login) only runs "phase 1" — redirect to Spotify —
# when there are no query parameters, or only ``user_state``:
# elif (not self.params or (len(self.params) == 1 and 'user_state' in self.params))
# A mobile client opening ``/login/spotify/?redirect_uri=...`` therefore matches
# no branch; login() returns without calling redirect() → empty body and HTTP 200
# (white page). Stash the app callback in the session and reload without query args.
if (
provider_name == "spotify"
and request.args.get("redirect_uri")
and "code" not in request.args
and "error" not in request.args
):
requested = request.args.get("redirect_uri", "")
if requested != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
return jsonify({
"ok": False,
"error": "redirect_uri not allowed for this client",
}), 400
session["spotify_oauth_app_redirect_uri"] = requested
session.modified = True
return redirect(url_for("login", provider_name=provider_name), code=302)
response = make_response()
# Let Authomatic handle the OAuth2 handshake.
result = authomatic.login(
WerkzeugAdapter(request, response),
provider_name,
session=session,
session_saver=lambda: setattr(session, "modified", True),
)
# If result is None, Authomatic is still redirecting/processing.
if result is None:
return response
if result.error:
return jsonify({
"ok": False,
"stage": "auth",
"error": getattr(result.error, "message", str(result.error))
}), 400
# Raw parsed JSON returned by Spotify token endpoint
raw_token_data = None
raw_token_body = None
token_http_status = None
if getattr(result.provider, "access_token_response", None):
token_http_status = result.provider.access_token_response.status
raw_token_data = result.provider.access_token_response.data
raw_token_body = result.provider.access_token_response.content
if not result.user:
return jsonify({
"ok": False,
"error": "No user returned from Spotify"
}), 400
# Fetch Spotify user profile from /v1/me
result.user.update()
if result.user.credentials is None:
return jsonify({
"ok": False,
"error": "Login succeeded, but no credentials were returned."
}), 400
# DAVID: EXAMPLE 2
# DAVID ----------
token_response = None
if getattr(result.provider, "access_token_response", None):
token_response = result.provider.access_token_response.data or {}
access_token = token_response.get("access_token")
refresh_token = token_response.get("refresh_token")
expires_in = int(token_response.get("expires_in", 3600))
token_type = token_response.get("token_type")
scope = token_response.get("scope")
if not access_token or not refresh_token:
return jsonify({
"ok": False,
"error": "Spotify token response missing access_token or refresh_token",
"token_response": token_response,
}), 400
save_token_record(
spotify_user_id=result.user.id,
access_token=access_token,
refresh_token=refresh_token,
expires_in=expires_in,
token_type=token_type,
scope=scope,
)
# keep the Spotify user id in session
session["spotify_user_id"] = result.user.id
app_redirect = session.pop("spotify_oauth_app_redirect_uri", None)
if app_redirect:
if app_redirect != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT:
return jsonify({
"ok": False,
"error": "Stored app redirect_uri does not match allowlist",
}), 400
sep = "&" if ("?" in app_redirect) else "?"
target = (
f"{app_redirect}{sep}"
+ urlencode(
{
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": str(expires_in),
"token_type": token_type or "",
},
)
)
return redirect(target, code=302)
return jsonify({
"ok": True,
"spotify_user_id": result.user.id,
"display_name": result.user.name,
"token_response": token_response,
})
"""
old example 1:
# Example protected-resource call using the OAuth access token:
# Spotify current user's top tracks (requires user-top-read scope if enabled)
# For a scope-free example, use /v1/me only, which we already used in update().
me_response = result.provider.access("https://api.spotify.com/v1/me")
payload = {
"ok": True,
"provider": provider_name,
"user": {
"id": result.user.id,
"name": result.user.name,
"email": result.user.email,
"picture": getattr(result.user, "picture", None),
"profile_url": getattr(result.user, "link", None),
},
"credentials": {
# Keep this server-side in real apps. Returned here only as a demo.
"access_token": result.user.credentials.token,
"refresh_token": getattr(result.user.credentials, "refresh_token", None),
"expires": str(getattr(result.user.credentials, "expires", None)),
"token_http_status": token_http_status,
"raw_token_data": raw_token_data,
},
"spotify_me_status": me_response.status,
"spotify_me_data": me_response.data,
}
return jsonify(payload)
"""
"""
old example 1:
{
"credentials": {
"access_token": "__TOKEN__",
"expires": "None",
"raw_token_data": {
"access_token": "__TOKEN__",
"expires_in": 3600,
"refresh_token": "__TOKEN2__",
"scope": "playlist-read-private",
"token_type": "Bearer"
},
"refresh_token": "__TOKEN2__",
"token_http_status": 200
},
"ok": true,
"provider": "spotify",
"spotify_me_data": {
"display_name": "cidermole",
"external_urls": {
"spotify": "https://open.spotify.com/user/cidermole"
},
"followers": {
"href": null,
"total": 2
},
"href": "https://api.spotify.com/v1/users/cidermole",
"id": "cidermole",
"images": [],
"type": "user",
"uri": "spotify:user:cidermole"
},
"spotify_me_status": 200,
"user": {
"email": null,
"id": "cidermole",
"name": "cidermole",
"picture": null,
"profile_url": "https://open.spotify.com/user/cidermole"
}
}
"""
def spotify_access_token_from_authorization_header():
auth = request.headers.get("Authorization", "") or ""
if not auth.startswith("Bearer "):
return None
token = auth[7:].strip()
return token or None
def get_request_spotify_access_token():
"""
Prefer ``Authorization: Bearer <access_token>`` (mobile / jukebox).
Fallback to Flask session + stored refresh flow (browser).
"""
bearer = spotify_access_token_from_authorization_header()
if bearer:
return bearer
spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id:
return None
return get_valid_access_token(spotify_user_id)
def require_auth(f):
@wraps(f)
def wrapped(*args, **kwargs):
token = get_request_spotify_access_token()
if not token:
return jsonify({"ok": False, "error": "Not logged in"}), 401
g.spotify_access_token = token
return f(*args, **kwargs)
return wrapped
@app.route("/me")
@require_auth
def me():
access_token = g.spotify_access_token
profile = spotify_get_me(access_token)
row = get_token_record(profile["id"])
return jsonify({
"ok": True,
"profile": profile,
"stored_expires_at": row["expires_at"] if row else None,
})
@app.route("/playlists")
@require_auth
def playlists():
access_token = g.spotify_access_token
"""
user_id = "Sara"
url = f"https://api.spotify.com/v1/users/{user_id}/playlists"
# -> 403 Forbidden
"""
items = spotify_get_paginated(
"https://api.spotify.com/v1/me/playlists",
access_token,
)
return jsonify({
"ok": True,
"total": len(items),
"items": items,
})
@app.route("/playlists/<playlist_id>")
@require_auth
def playlist(playlist_id):
access_token = g.spotify_access_token
playlist_data = spotify_get(
f"https://api.spotify.com/v1/playlists/{playlist_id}",
access_token,
)
# Full playlist objects use a paging object at `items` (current Spotify shape)
# or legacy `tracks`. Follow `next` on whichever is present.
paging_key = (
"items"
if isinstance(playlist_data.get("items"), dict)
else "tracks"
)
paging = playlist_data.get(paging_key) or {}
if paging.get("next"):
all_items = spotify_get_paginated(
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks",
access_token,
limit=100,
)
playlist_data[paging_key] = {
**paging,
"items": all_items,
"offset": 0,
"limit": len(all_items),
"next": None,
"previous": None,
}
return jsonify({
"ok": True,
"playlist": playlist_data,
})
@app.route("/metadata", methods=["POST"])
@require_auth
def upload_metadata():
access_token = g.spotify_access_token
if not request.is_json:
return jsonify({"ok": False, "error": "Expected application/json"}), 400
body = request.get_json(silent=True)
if not isinstance(body, dict):
return jsonify({"ok": False, "error": "Invalid JSON body"}), 400
track_id = body.get("trackId")
meta_type = body.get("type")
version = body.get("version")
collection = body.get("collection")
if not track_id or not meta_type or version is None or collection is None:
return jsonify({
"ok": False,
"error": "Missing trackId, type, version, or collection",
}), 400
try:
version = int(version)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "version must be an integer"}), 400
if not isinstance(collection, dict):
return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400
profile = spotify_get_me(access_token)
spotify_user_id = profile["id"]
os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ")
safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120]
file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json"
file_path = os.path.join(METADATA_UPLOAD_DIR, file_name)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(collection, f, ensure_ascii=False)
now = datetime.now(timezone.utc).isoformat()
conn = db()
conn.execute("""
INSERT INTO uploaded_metadata (
spotify_user_id, track_id, type, version, file_name, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (spotify_user_id, track_id, meta_type, version, file_name, now))
conn.commit()
conn.close()
return jsonify({"ok": True, "file_name": file_name}), 201
if __name__ == "__main__":
init_db()
app.run(host="127.0.0.1", port=8000, debug=True)