# app.py # running: # $ cd /var/sites/api.lockstep.at # $ source .venv/bin/activate # $ source .env # $ # NO. flask --app api run --port 8000 # $ python3 api.py # runs __main__ and allows code reloading. # Authomatic 1.3.0 is not compatible with Python 3.12+ # patch: /var/sites/api.lockstep.at/.venv/lib/python3.13/site-packages/authomatic/providers/__init__.py # def _fetch() # HTTPSConnection( # # cert_file=cert_file, # <-- comment this # sync continuously: # $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done import os import re import sqlite3 from base64 import b64encode from datetime import datetime, timedelta, timezone import json from functools import wraps from urllib.parse import urlencode from flask import Flask, g, request, session, jsonify, make_response, redirect, url_for from werkzeug.middleware.proxy_fix import ProxyFix from authomatic import Authomatic from authomatic.adapters import WerkzeugAdapter from authomatic.providers import oauth2 from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP import random import time import requests # ---------------------------- # Custom Spotify provider # ---------------------------- class Spotify(oauth2.OAuth2): """ Custom Authomatic provider for Spotify Web API Authorization Code Flow. Spotify docs: - Authorization endpoint: https://accounts.spotify.com/authorize - Token endpoint: https://accounts.spotify.com/api/token - Current user profile: https://api.spotify.com/v1/me """ name = "spotify" # Provider endpoints user_authorization_url = "https://accounts.spotify.com/authorize" access_token_url = "https://accounts.spotify.com/api/token" user_info_url = "https://api.spotify.com/v1/me" # Helpful preset scopes spotify_scopes = ["playlist-read-private"] def update_user(self): """ Fetch Spotify profile and map it onto Authomatic's generic user object. """ response = self.access(self.user_info_url) if response.status != 200: return response data = response.data or {} # Common Authomatic user fields self.user.id = data.get("id") self.user.name = data.get("display_name") or data.get("id") #self.user.email = data.get("email") # Spotify has images as a list; use the first one if present. images = data.get("images") or [] if images: self.user.picture = images[0].get("url") # Optional extra fields if your Authomatic version exposes them self.user.username = data.get("id") self.user.link = (data.get("external_urls") or {}).get("spotify") return response # "must exist in the same module as Spotify (Authomatic provider class)" PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify] # ---------------------------- # App config # ---------------------------- SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"] SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"] # OAuth redirect registered in the Spotify Developer Dashboard (Authorization Code # flow). Spotify sends the browser here with ?code=&state= — not to the Android app scheme. REDIRECT_URI = os.environ.get( "SPOTIFY_REDIRECT_URI", #"https://api.lockstep.at/spotify/callback" "https://api.lockstep.at/login/spotify/" ) # After server-side token exchange, the browser may be redirected to this URL with # tokens in the query string (mobile / Custom Tab). This is NOT registered with Spotify; # only REDIRECT_URI above goes in the dashboard for this flow. ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get( "SPOTIFY_APP_POST_LOGIN_REDIRECT_URI", "at.lockstep.player://spotify/callback", ) DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db") METADATA_UPLOAD_DIR = os.environ.get("METADATA_UPLOAD_DIR", "uploaded_collections") app = Flask(__name__) app.secret_key = os.environ["FLASK_SECRET_KEY"] app.wsgi_app = ProxyFix( app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1 ) CONFIG = { "spotify": { "class_": Spotify, "consumer_key": SPOTIFY_CLIENT_ID, "consumer_secret": SPOTIFY_CLIENT_SECRET, "scope": Spotify.spotify_scopes } } authomatic = Authomatic(CONFIG, app.secret_key) # ---------------------------- # SQLite helpers # ---------------------------- def db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): conn = db() conn.execute(""" CREATE TABLE IF NOT EXISTS spotify_tokens ( spotify_user_id TEXT PRIMARY KEY, access_token TEXT NOT NULL, refresh_token TEXT NOT NULL, token_type TEXT, scope TEXT, expires_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS uploaded_metadata ( id INTEGER PRIMARY KEY AUTOINCREMENT, spotify_user_id TEXT NOT NULL, track_id TEXT NOT NULL, type TEXT NOT NULL, version INTEGER NOT NULL, file_name TEXT NOT NULL, created_at TEXT NOT NULL ) """) conn.commit() conn.close() def save_token_record( spotify_user_id: str, access_token: str, refresh_token: str, expires_in: int, token_type: str | None = None, scope: str | None = None, ): now = datetime.now(timezone.utc) expires_at = now + timedelta(seconds=int(expires_in)) conn = db() conn.execute(""" INSERT INTO spotify_tokens ( spotify_user_id, access_token, refresh_token, token_type, scope, expires_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(spotify_user_id) DO UPDATE SET access_token = excluded.access_token, refresh_token = excluded.refresh_token, token_type = excluded.token_type, scope = excluded.scope, expires_at = excluded.expires_at, updated_at = excluded.updated_at """, ( spotify_user_id, access_token, refresh_token, token_type, scope, expires_at.isoformat(), now.isoformat(), )) conn.commit() conn.close() def get_token_record(spotify_user_id: str): conn = db() row = conn.execute(""" SELECT * FROM spotify_tokens WHERE spotify_user_id = ? """, (spotify_user_id,)).fetchone() conn.close() return row # skew_seconds: refresh every 10 min def token_is_expired_or_soon(expires_at_iso: str, skew_seconds: int = 3600-600) -> bool: expires_at = datetime.fromisoformat(expires_at_iso) now = datetime.now(timezone.utc) return now + timedelta(seconds=skew_seconds) >= expires_at # ---------------------------- # Spotify token refresh # ---------------------------- def spotify_basic_auth_header() -> str: raw = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8") return "Basic " + b64encode(raw).decode("ascii") def refresh_spotify_token(refresh_token: str) -> dict: r = requests.post( "https://accounts.spotify.com/api/token", data={ "grant_type": "refresh_token", "refresh_token": refresh_token, }, headers={ "Authorization": spotify_basic_auth_header(), "Content-Type": "application/x-www-form-urlencoded", }, ) if not r.ok: # The token endpoint returns errors as flat # {"error": "", "error_description": ""}, which differs from # the Web API's {"error": {"status": ..., "message": ...}} shape. Rewrite # the body so our error handler surfaces a consistent envelope to # clients regardless of which Spotify endpoint failed. try: body = r.json() except ValueError: body = {} message = ( body.get("error_description") or body.get("error") or r.reason or "Token refresh failed" ) r._content = json.dumps({ "error": {"status": r.status_code, "message": message} }).encode("utf-8") r.headers["Content-Type"] = "application/json" r.raise_for_status() return r.json() def get_valid_access_token(spotify_user_id: str) -> str: row = get_token_record(spotify_user_id) if not row: raise RuntimeError(f"No stored token for spotify_user_id={spotify_user_id}") if not token_is_expired_or_soon(row["expires_at"]): return row["access_token"] refreshed = refresh_spotify_token(row["refresh_token"]) new_access_token = refreshed["access_token"] #print("refreshed token. new_access_token={}".format(new_access_token), flush=True) new_refresh_token = refreshed.get("refresh_token") or row["refresh_token"] new_expires_in = int(refreshed.get("expires_in", 3600)) new_token_type = refreshed.get("token_type") or row["token_type"] new_scope = refreshed.get("scope") or row["scope"] save_token_record( spotify_user_id=spotify_user_id, access_token=new_access_token, refresh_token=new_refresh_token, expires_in=new_expires_in, token_type=new_token_type, scope=new_scope, ) return new_access_token # ---------------------------- # Spotify Web API helpers # ---------------------------- def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict: """ GET a Spotify Web API endpoint and return the parsed JSON body. Retries on HTTP 429 and 503 with exponential backoff and optional Retry-After, so brief Spotify rate limits often clear before we surface an error to the app. """ headers = {"Authorization": f"Bearer {access_token}"} backoff_sec = 1.0 max_attempts = 8 params_eff = params for attempt in range(max_attempts): r = requests.get(url, headers=headers, params=params_eff) if r.status_code in (429, 503) and attempt < max_attempts - 1: wait = backoff_sec ra = r.headers.get("Retry-After") if ra: try: wait = max(wait, float(ra)) except ValueError: pass wait = min(wait, 120.0) time.sleep(wait + random.random() * 0.25 * wait) backoff_sec = min(backoff_sec * 2.0, 60.0) continue r.raise_for_status() return r.json() def spotify_get_paginated( url: str, access_token: str, limit: int = 50, max_items: int | None = None, ) -> list: """ Fetch all items from a paginated Spotify Web API endpoint. Spotify paging objects return up to `limit` items per page (50 max for most endpoints) and a `next` URL. We follow `next` until it is null, collecting items along the way. """ items: list = [] params: dict | None = {"limit": limit, "offset": 0} next_url: str | None = url while next_url is not None: page = spotify_get(next_url, access_token, params=params) items.extend(page.get("items", [])) if max_items is not None and len(items) >= max_items: return items[:max_items] # `next` is a fully-qualified URL with limit/offset already encoded, # so we must not pass `params` again after the first request. next_url = page.get("next") params = None return items def spotify_get_me(access_token: str) -> dict: return spotify_get("https://api.spotify.com/v1/me", access_token) # ---------------------------- # Error handling # ---------------------------- @app.errorhandler(requests.HTTPError) def handle_spotify_http_error(e: requests.HTTPError): """ Translate a non-2xx upstream response from Spotify into a JSON envelope, passing through the upstream HTTP status code. Spotify error bodies look like {"error": {"status": ..., "message": ...}} when JSON is returned; we surface that message when available. """ resp = e.response status = resp.status_code if resp is not None else 502 spotify_error = None error_message = str(e) if resp is not None: try: spotify_error = resp.json() except ValueError: spotify_error = None if ( isinstance(spotify_error, dict) and isinstance(spotify_error.get("error"), dict) ): error_message = ( spotify_error["error"].get("message") or error_message ) return jsonify({ "ok": False, "error": error_message, "spotify": spotify_error, }), status @app.errorhandler(requests.RequestException) def handle_spotify_request_error(e: requests.RequestException): """Network-level failures (DNS, connection, timeout) have no upstream status.""" return jsonify({ "ok": False, "error": f"Upstream request failed: {e}", }), 502 # ---------------------------- # Routes # ---------------------------- @app.route("/") def index(): return """

Spotify + Authomatic example

Log in with Spotify

After login, this app fetches your Spotify profile and one extra API call.

""" @app.route("/login//", methods=["GET", "POST"]) def login(provider_name): # Authomatic 1.3.0 (oauth2.login) only runs "phase 1" — redirect to Spotify — # when there are no query parameters, or only ``user_state``: # elif (not self.params or (len(self.params) == 1 and 'user_state' in self.params)) # A mobile client opening ``/login/spotify/?redirect_uri=...`` therefore matches # no branch; login() returns without calling redirect() → empty body and HTTP 200 # (white page). Stash the app callback in the session and reload without query args. if ( provider_name == "spotify" and request.args.get("redirect_uri") and "code" not in request.args and "error" not in request.args ): requested = request.args.get("redirect_uri", "") if requested != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT: return jsonify({ "ok": False, "error": "redirect_uri not allowed for this client", }), 400 session["spotify_oauth_app_redirect_uri"] = requested session.modified = True return redirect(url_for("login", provider_name=provider_name), code=302) response = make_response() # Let Authomatic handle the OAuth2 handshake. result = authomatic.login( WerkzeugAdapter(request, response), provider_name, session=session, session_saver=lambda: setattr(session, "modified", True), ) # If result is None, Authomatic is still redirecting/processing. if result is None: return response if result.error: return jsonify({ "ok": False, "stage": "auth", "error": getattr(result.error, "message", str(result.error)) }), 400 # Raw parsed JSON returned by Spotify token endpoint raw_token_data = None raw_token_body = None token_http_status = None if getattr(result.provider, "access_token_response", None): token_http_status = result.provider.access_token_response.status raw_token_data = result.provider.access_token_response.data raw_token_body = result.provider.access_token_response.content if not result.user: return jsonify({ "ok": False, "error": "No user returned from Spotify" }), 400 # Fetch Spotify user profile from /v1/me result.user.update() if result.user.credentials is None: return jsonify({ "ok": False, "error": "Login succeeded, but no credentials were returned." }), 400 # DAVID: EXAMPLE 2 # DAVID ---------- token_response = None if getattr(result.provider, "access_token_response", None): token_response = result.provider.access_token_response.data or {} access_token = token_response.get("access_token") refresh_token = token_response.get("refresh_token") expires_in = int(token_response.get("expires_in", 3600)) token_type = token_response.get("token_type") scope = token_response.get("scope") if not access_token or not refresh_token: return jsonify({ "ok": False, "error": "Spotify token response missing access_token or refresh_token", "token_response": token_response, }), 400 save_token_record( spotify_user_id=result.user.id, access_token=access_token, refresh_token=refresh_token, expires_in=expires_in, token_type=token_type, scope=scope, ) # keep the Spotify user id in session session["spotify_user_id"] = result.user.id app_redirect = session.pop("spotify_oauth_app_redirect_uri", None) if app_redirect: if app_redirect != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT: return jsonify({ "ok": False, "error": "Stored app redirect_uri does not match allowlist", }), 400 sep = "&" if ("?" in app_redirect) else "?" target = ( f"{app_redirect}{sep}" + urlencode( { "access_token": access_token, "refresh_token": refresh_token, "expires_in": str(expires_in), "token_type": token_type or "", }, ) ) return redirect(target, code=302) return jsonify({ "ok": True, "spotify_user_id": result.user.id, "display_name": result.user.name, "token_response": token_response, }) """ old example 1: # Example protected-resource call using the OAuth access token: # Spotify current user's top tracks (requires user-top-read scope if enabled) # For a scope-free example, use /v1/me only, which we already used in update(). me_response = result.provider.access("https://api.spotify.com/v1/me") payload = { "ok": True, "provider": provider_name, "user": { "id": result.user.id, "name": result.user.name, "email": result.user.email, "picture": getattr(result.user, "picture", None), "profile_url": getattr(result.user, "link", None), }, "credentials": { # Keep this server-side in real apps. Returned here only as a demo. "access_token": result.user.credentials.token, "refresh_token": getattr(result.user.credentials, "refresh_token", None), "expires": str(getattr(result.user.credentials, "expires", None)), "token_http_status": token_http_status, "raw_token_data": raw_token_data, }, "spotify_me_status": me_response.status, "spotify_me_data": me_response.data, } return jsonify(payload) """ """ old example 1: { "credentials": { "access_token": "__TOKEN__", "expires": "None", "raw_token_data": { "access_token": "__TOKEN__", "expires_in": 3600, "refresh_token": "__TOKEN2__", "scope": "playlist-read-private", "token_type": "Bearer" }, "refresh_token": "__TOKEN2__", "token_http_status": 200 }, "ok": true, "provider": "spotify", "spotify_me_data": { "display_name": "cidermole", "external_urls": { "spotify": "https://open.spotify.com/user/cidermole" }, "followers": { "href": null, "total": 2 }, "href": "https://api.spotify.com/v1/users/cidermole", "id": "cidermole", "images": [], "type": "user", "uri": "spotify:user:cidermole" }, "spotify_me_status": 200, "user": { "email": null, "id": "cidermole", "name": "cidermole", "picture": null, "profile_url": "https://open.spotify.com/user/cidermole" } } """ def spotify_access_token_from_authorization_header(): auth = request.headers.get("Authorization", "") or "" if not auth.startswith("Bearer "): return None token = auth[7:].strip() return token or None def get_request_spotify_access_token(): """ Prefer ``Authorization: Bearer `` (mobile / jukebox). Fallback to Flask session + stored refresh flow (browser). """ bearer = spotify_access_token_from_authorization_header() if bearer: return bearer spotify_user_id = session.get("spotify_user_id") if not spotify_user_id: return None return get_valid_access_token(spotify_user_id) def require_auth(f): @wraps(f) def wrapped(*args, **kwargs): token = get_request_spotify_access_token() if not token: return jsonify({"ok": False, "error": "Not logged in"}), 401 g.spotify_access_token = token return f(*args, **kwargs) return wrapped @app.route("/me") @require_auth def me(): access_token = g.spotify_access_token profile = spotify_get_me(access_token) row = get_token_record(profile["id"]) return jsonify({ "ok": True, "profile": profile, "stored_expires_at": row["expires_at"] if row else None, }) @app.route("/playlists") @require_auth def playlists(): access_token = g.spotify_access_token """ user_id = "Sara" url = f"https://api.spotify.com/v1/users/{user_id}/playlists" # -> 403 Forbidden """ items = spotify_get_paginated( "https://api.spotify.com/v1/me/playlists", access_token, ) return jsonify({ "ok": True, "total": len(items), "items": items, }) @app.route("/playlists/") @require_auth def playlist(playlist_id): access_token = g.spotify_access_token playlist_data = spotify_get( f"https://api.spotify.com/v1/playlists/{playlist_id}", access_token, ) # Full playlist objects use a paging object at `items` (current Spotify shape) # or legacy `tracks`. Follow `next` on whichever is present. paging_key = ( "items" if isinstance(playlist_data.get("items"), dict) else "tracks" ) paging = playlist_data.get(paging_key) or {} if paging.get("next"): all_items = spotify_get_paginated( f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", access_token, limit=100, ) playlist_data[paging_key] = { **paging, "items": all_items, "offset": 0, "limit": len(all_items), "next": None, "previous": None, } return jsonify({ "ok": True, "playlist": playlist_data, }) @app.route("/metadata", methods=["GET"]) @require_auth def get_metadata(): track_id = request.args.get("trackId") meta_type = request.args.get("type", "beats") if not track_id: return jsonify({"ok": False, "error": "Missing trackId"}), 400 access_token = g.spotify_access_token profile = spotify_get_me(access_token) spotify_user_id = profile["id"] conn = db() row = conn.execute(""" SELECT file_name FROM uploaded_metadata WHERE spotify_user_id = ? AND track_id = ? AND type = ? ORDER BY created_at DESC LIMIT 1 """, (spotify_user_id, track_id, meta_type)).fetchone() conn.close() if not row: return jsonify({"ok": False, "error": "Not found"}), 404 file_path = os.path.join(METADATA_UPLOAD_DIR, row["file_name"]) if not os.path.isfile(file_path): return jsonify({"ok": False, "error": "Not found"}), 404 with open(file_path, "r", encoding="utf-8") as f: collection = json.load(f) if not isinstance(collection, dict): return jsonify({"ok": False, "error": "Invalid metadata file"}), 500 return jsonify({"ok": True, "collection": collection}) @app.route("/metadata", methods=["POST"]) @require_auth def upload_metadata(): access_token = g.spotify_access_token if not request.is_json: return jsonify({"ok": False, "error": "Expected application/json"}), 400 body = request.get_json(silent=True) if not isinstance(body, dict): return jsonify({"ok": False, "error": "Invalid JSON body"}), 400 track_id = body.get("trackId") meta_type = body.get("type") version = body.get("version") collection = body.get("collection") if not track_id or not meta_type or version is None or collection is None: return jsonify({ "ok": False, "error": "Missing trackId, type, version, or collection", }), 400 try: version = int(version) except (TypeError, ValueError): return jsonify({"ok": False, "error": "version must be an integer"}), 400 if not isinstance(collection, dict): return jsonify({"ok": False, "error": "collection must be a JSON object"}), 400 profile = spotify_get_me(access_token) spotify_user_id = profile["id"] os.makedirs(METADATA_UPLOAD_DIR, exist_ok=True) ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ") safe_track = re.sub(r"[^\w\-]", "_", str(track_id))[:120] file_name = f"{spotify_user_id}_{safe_track}_{meta_type}_{version}_{ts}.json" file_path = os.path.join(METADATA_UPLOAD_DIR, file_name) with open(file_path, "w", encoding="utf-8") as f: json.dump(collection, f, ensure_ascii=False) now = datetime.now(timezone.utc).isoformat() conn = db() conn.execute(""" INSERT INTO uploaded_metadata ( spotify_user_id, track_id, type, version, file_name, created_at ) VALUES (?, ?, ?, ?, ?, ?) """, (spotify_user_id, track_id, meta_type, version, file_name, now)) conn.commit() conn.close() return jsonify({"ok": True, "file_name": file_name}), 201 if __name__ == "__main__": init_db() app.run(host="127.0.0.1", port=8000, debug=True)