# app.py # running: # $ cd /var/sites/api.lockstep.at # $ source .venv/bin/activate # $ source .env # $ # NO. flask --app api run --port 8000 # $ python3 api.py # runs __main__ and allows code reloading. # Authomatic 1.3.0 is not compatible with Python 3.12+ # patch: /var/sites/api.lockstep.at/.venv/lib/python3.13/site-packages/authomatic/providers/__init__.py # def _fetch() # HTTPSConnection( # # cert_file=cert_file, # <-- comment this # sync continuously: # $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done import os import sqlite3 from base64 import b64encode from datetime import datetime, timedelta, timezone import json from urllib.parse import urlencode from flask import Flask, request, session, jsonify, make_response, redirect, url_for from werkzeug.middleware.proxy_fix import ProxyFix from authomatic import Authomatic from authomatic.adapters import WerkzeugAdapter from authomatic.providers import oauth2 from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP import requests # ---------------------------- # Custom Spotify provider # ---------------------------- class Spotify(oauth2.OAuth2): """ Custom Authomatic provider for Spotify Web API Authorization Code Flow. Spotify docs: - Authorization endpoint: https://accounts.spotify.com/authorize - Token endpoint: https://accounts.spotify.com/api/token - Current user profile: https://api.spotify.com/v1/me """ name = "spotify" # Provider endpoints user_authorization_url = "https://accounts.spotify.com/authorize" access_token_url = "https://accounts.spotify.com/api/token" user_info_url = "https://api.spotify.com/v1/me" # Helpful preset scopes spotify_scopes = ["playlist-read-private"] def update_user(self): """ Fetch Spotify profile and map it onto Authomatic's generic user object. """ response = self.access(self.user_info_url) if response.status != 200: return response data = response.data or {} # Common Authomatic user fields self.user.id = data.get("id") self.user.name = data.get("display_name") or data.get("id") #self.user.email = data.get("email") # Spotify has images as a list; use the first one if present. images = data.get("images") or [] if images: self.user.picture = images[0].get("url") # Optional extra fields if your Authomatic version exposes them self.user.username = data.get("id") self.user.link = (data.get("external_urls") or {}).get("spotify") return response # "must exist in the same module as Spotify (Authomatic provider class)" PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify] # ---------------------------- # App config # ---------------------------- SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"] SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"] # OAuth redirect registered in the Spotify Developer Dashboard (Authorization Code # flow). Spotify sends the browser here with ?code=&state= — not to the Android app scheme. REDIRECT_URI = os.environ.get( "SPOTIFY_REDIRECT_URI", #"https://api.lockstep.at/spotify/callback" "https://api.lockstep.at/login/spotify/" ) # After server-side token exchange, the browser may be redirected to this URL with # tokens in the query string (mobile / Custom Tab). This is NOT registered with Spotify; # only REDIRECT_URI above goes in the dashboard for this flow. ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT = os.environ.get( "SPOTIFY_APP_POST_LOGIN_REDIRECT_URI", "at.lockstep.player://spotify/callback", ) DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db") app = Flask(__name__) app.secret_key = os.environ["FLASK_SECRET_KEY"] app.wsgi_app = ProxyFix( app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1 ) CONFIG = { "spotify": { "class_": Spotify, "consumer_key": SPOTIFY_CLIENT_ID, "consumer_secret": SPOTIFY_CLIENT_SECRET, "scope": Spotify.spotify_scopes } } authomatic = Authomatic(CONFIG, app.secret_key) # ---------------------------- # SQLite helpers # ---------------------------- def db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): conn = db() conn.execute(""" CREATE TABLE IF NOT EXISTS spotify_tokens ( spotify_user_id TEXT PRIMARY KEY, access_token TEXT NOT NULL, refresh_token TEXT NOT NULL, token_type TEXT, scope TEXT, expires_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) conn.commit() conn.close() def save_token_record( spotify_user_id: str, access_token: str, refresh_token: str, expires_in: int, token_type: str | None = None, scope: str | None = None, ): now = datetime.now(timezone.utc) expires_at = now + timedelta(seconds=int(expires_in)) conn = db() conn.execute(""" INSERT INTO spotify_tokens ( spotify_user_id, access_token, refresh_token, token_type, scope, expires_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(spotify_user_id) DO UPDATE SET access_token = excluded.access_token, refresh_token = excluded.refresh_token, token_type = excluded.token_type, scope = excluded.scope, expires_at = excluded.expires_at, updated_at = excluded.updated_at """, ( spotify_user_id, access_token, refresh_token, token_type, scope, expires_at.isoformat(), now.isoformat(), )) conn.commit() conn.close() def get_token_record(spotify_user_id: str): conn = db() row = conn.execute(""" SELECT * FROM spotify_tokens WHERE spotify_user_id = ? """, (spotify_user_id,)).fetchone() conn.close() return row # skew_seconds: refresh every 10 min def token_is_expired_or_soon(expires_at_iso: str, skew_seconds: int = 3600-600) -> bool: expires_at = datetime.fromisoformat(expires_at_iso) now = datetime.now(timezone.utc) return now + timedelta(seconds=skew_seconds) >= expires_at # ---------------------------- # Spotify token refresh # ---------------------------- def spotify_basic_auth_header() -> str: raw = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8") return "Basic " + b64encode(raw).decode("ascii") def refresh_spotify_token(refresh_token: str) -> dict: r = requests.post( "https://accounts.spotify.com/api/token", data={ "grant_type": "refresh_token", "refresh_token": refresh_token, }, headers={ "Authorization": spotify_basic_auth_header(), "Content-Type": "application/x-www-form-urlencoded", }, ) if not r.ok: # The token endpoint returns errors as flat # {"error": "", "error_description": ""}, which differs from # the Web API's {"error": {"status": ..., "message": ...}} shape. Rewrite # the body so our error handler surfaces a consistent envelope to # clients regardless of which Spotify endpoint failed. try: body = r.json() except ValueError: body = {} message = ( body.get("error_description") or body.get("error") or r.reason or "Token refresh failed" ) r._content = json.dumps({ "error": {"status": r.status_code, "message": message} }).encode("utf-8") r.headers["Content-Type"] = "application/json" r.raise_for_status() return r.json() def get_valid_access_token(spotify_user_id: str) -> str: row = get_token_record(spotify_user_id) if not row: raise RuntimeError(f"No stored token for spotify_user_id={spotify_user_id}") if not token_is_expired_or_soon(row["expires_at"]): return row["access_token"] refreshed = refresh_spotify_token(row["refresh_token"]) new_access_token = refreshed["access_token"] #print("refreshed token. new_access_token={}".format(new_access_token), flush=True) new_refresh_token = refreshed.get("refresh_token") or row["refresh_token"] new_expires_in = int(refreshed.get("expires_in", 3600)) new_token_type = refreshed.get("token_type") or row["token_type"] new_scope = refreshed.get("scope") or row["scope"] save_token_record( spotify_user_id=spotify_user_id, access_token=new_access_token, refresh_token=new_refresh_token, expires_in=new_expires_in, token_type=new_token_type, scope=new_scope, ) return new_access_token # ---------------------------- # Spotify Web API helpers # ---------------------------- def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict: """GET a Spotify Web API endpoint and return the parsed JSON body.""" headers = {"Authorization": f"Bearer {access_token}"} r = requests.get(url, headers=headers, params=params) r.raise_for_status() return r.json() def spotify_get_paginated( url: str, access_token: str, limit: int = 50, max_items: int | None = None, ) -> list: """ Fetch all items from a paginated Spotify Web API endpoint. Spotify paging objects return up to `limit` items per page (50 max for most endpoints) and a `next` URL. We follow `next` until it is null, collecting items along the way. """ items: list = [] params: dict | None = {"limit": limit, "offset": 0} next_url: str | None = url while next_url is not None: page = spotify_get(next_url, access_token, params=params) items.extend(page.get("items", [])) if max_items is not None and len(items) >= max_items: return items[:max_items] # `next` is a fully-qualified URL with limit/offset already encoded, # so we must not pass `params` again after the first request. next_url = page.get("next") params = None return items def spotify_get_me(access_token: str) -> dict: return spotify_get("https://api.spotify.com/v1/me", access_token) # ---------------------------- # Error handling # ---------------------------- @app.errorhandler(requests.HTTPError) def handle_spotify_http_error(e: requests.HTTPError): """ Translate a non-2xx upstream response from Spotify into a JSON envelope, passing through the upstream HTTP status code. Spotify error bodies look like {"error": {"status": ..., "message": ...}} when JSON is returned; we surface that message when available. """ resp = e.response status = resp.status_code if resp is not None else 502 spotify_error = None error_message = str(e) if resp is not None: try: spotify_error = resp.json() except ValueError: spotify_error = None if ( isinstance(spotify_error, dict) and isinstance(spotify_error.get("error"), dict) ): error_message = ( spotify_error["error"].get("message") or error_message ) return jsonify({ "ok": False, "error": error_message, "spotify": spotify_error, }), status @app.errorhandler(requests.RequestException) def handle_spotify_request_error(e: requests.RequestException): """Network-level failures (DNS, connection, timeout) have no upstream status.""" return jsonify({ "ok": False, "error": f"Upstream request failed: {e}", }), 502 # ---------------------------- # Routes # ---------------------------- @app.route("/") def index(): return """

Spotify + Authomatic example

Log in with Spotify

After login, this app fetches your Spotify profile and one extra API call.

""" @app.route("/login//", methods=["GET", "POST"]) def login(provider_name): # Authomatic 1.3.0 (oauth2.login) only runs "phase 1" — redirect to Spotify — # when there are no query parameters, or only ``user_state``: # elif (not self.params or (len(self.params) == 1 and 'user_state' in self.params)) # A mobile client opening ``/login/spotify/?redirect_uri=...`` therefore matches # no branch; login() returns without calling redirect() → empty body and HTTP 200 # (white page). Stash the app callback in the session and reload without query args. if ( provider_name == "spotify" and request.args.get("redirect_uri") and "code" not in request.args and "error" not in request.args ): requested = request.args.get("redirect_uri", "") if requested != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT: return jsonify({ "ok": False, "error": "redirect_uri not allowed for this client", }), 400 session["spotify_oauth_app_redirect_uri"] = requested session.modified = True return redirect(url_for("login", provider_name=provider_name), code=302) response = make_response() # Let Authomatic handle the OAuth2 handshake. result = authomatic.login( WerkzeugAdapter(request, response), provider_name, session=session, session_saver=lambda: setattr(session, "modified", True), ) # If result is None, Authomatic is still redirecting/processing. if result is None: return response if result.error: return jsonify({ "ok": False, "stage": "auth", "error": getattr(result.error, "message", str(result.error)) }), 400 # Raw parsed JSON returned by Spotify token endpoint raw_token_data = None raw_token_body = None token_http_status = None if getattr(result.provider, "access_token_response", None): token_http_status = result.provider.access_token_response.status raw_token_data = result.provider.access_token_response.data raw_token_body = result.provider.access_token_response.content if not result.user: return jsonify({ "ok": False, "error": "No user returned from Spotify" }), 400 # Fetch Spotify user profile from /v1/me result.user.update() if result.user.credentials is None: return jsonify({ "ok": False, "error": "Login succeeded, but no credentials were returned." }), 400 # DAVID: EXAMPLE 2 # DAVID ---------- token_response = None if getattr(result.provider, "access_token_response", None): token_response = result.provider.access_token_response.data or {} access_token = token_response.get("access_token") refresh_token = token_response.get("refresh_token") expires_in = int(token_response.get("expires_in", 3600)) token_type = token_response.get("token_type") scope = token_response.get("scope") if not access_token or not refresh_token: return jsonify({ "ok": False, "error": "Spotify token response missing access_token or refresh_token", "token_response": token_response, }), 400 save_token_record( spotify_user_id=result.user.id, access_token=access_token, refresh_token=refresh_token, expires_in=expires_in, token_type=token_type, scope=scope, ) # keep the Spotify user id in session session["spotify_user_id"] = result.user.id app_redirect = session.pop("spotify_oauth_app_redirect_uri", None) if app_redirect: if app_redirect != ALLOWED_SPOTIFY_APP_POST_LOGIN_REDIRECT: return jsonify({ "ok": False, "error": "Stored app redirect_uri does not match allowlist", }), 400 sep = "&" if ("?" in app_redirect) else "?" target = ( f"{app_redirect}{sep}" + urlencode( { "access_token": access_token, "refresh_token": refresh_token, "expires_in": str(expires_in), "token_type": token_type or "", }, ) ) return redirect(target, code=302) return jsonify({ "ok": True, "spotify_user_id": result.user.id, "display_name": result.user.name, "token_response": token_response, }) """ old example 1: # Example protected-resource call using the OAuth access token: # Spotify current user's top tracks (requires user-top-read scope if enabled) # For a scope-free example, use /v1/me only, which we already used in update(). me_response = result.provider.access("https://api.spotify.com/v1/me") payload = { "ok": True, "provider": provider_name, "user": { "id": result.user.id, "name": result.user.name, "email": result.user.email, "picture": getattr(result.user, "picture", None), "profile_url": getattr(result.user, "link", None), }, "credentials": { # Keep this server-side in real apps. Returned here only as a demo. "access_token": result.user.credentials.token, "refresh_token": getattr(result.user.credentials, "refresh_token", None), "expires": str(getattr(result.user.credentials, "expires", None)), "token_http_status": token_http_status, "raw_token_data": raw_token_data, }, "spotify_me_status": me_response.status, "spotify_me_data": me_response.data, } return jsonify(payload) """ """ old example 1: { "credentials": { "access_token": "__TOKEN__", "expires": "None", "raw_token_data": { "access_token": "__TOKEN__", "expires_in": 3600, "refresh_token": "__TOKEN2__", "scope": "playlist-read-private", "token_type": "Bearer" }, "refresh_token": "__TOKEN2__", "token_http_status": 200 }, "ok": true, "provider": "spotify", "spotify_me_data": { "display_name": "cidermole", "external_urls": { "spotify": "https://open.spotify.com/user/cidermole" }, "followers": { "href": null, "total": 2 }, "href": "https://api.spotify.com/v1/users/cidermole", "id": "cidermole", "images": [], "type": "user", "uri": "spotify:user:cidermole" }, "spotify_me_status": 200, "user": { "email": null, "id": "cidermole", "name": "cidermole", "picture": null, "profile_url": "https://open.spotify.com/user/cidermole" } } """ @app.route("/me") def me(): spotify_user_id = session.get("spotify_user_id") if not spotify_user_id: return jsonify({"ok": False, "error": "Not logged in"}), 401 access_token = get_valid_access_token(spotify_user_id) profile = spotify_get_me(access_token) row = get_token_record(spotify_user_id) return jsonify({ "ok": True, "profile": profile, "stored_expires_at": row["expires_at"], }) @app.route("/playlists") def playlists(): spotify_user_id = session.get("spotify_user_id") if not spotify_user_id: return jsonify({"ok": False, "error": "Not logged in"}), 401 access_token = get_valid_access_token(spotify_user_id) """ user_id = "Sara" url = f"https://api.spotify.com/v1/users/{user_id}/playlists" # -> 403 Forbidden """ items = spotify_get_paginated( "https://api.spotify.com/v1/me/playlists", access_token, ) return jsonify({ "ok": True, "total": len(items), "items": items, }) @app.route("/playlists/") def playlist(playlist_id): spotify_user_id = session.get("spotify_user_id") if not spotify_user_id: return jsonify({"ok": False, "error": "Not logged in"}), 401 access_token = get_valid_access_token(spotify_user_id) playlist_data = spotify_get( f"https://api.spotify.com/v1/playlists/{playlist_id}", access_token, ) # The playlist response embeds a `tracks` paging object whose first page # contains up to 100 items. For playlists larger than that, follow the # dedicated tracks endpoint until exhausted and splice the full list back # into the response. tracks_obj = playlist_data.get("tracks") or {} if tracks_obj.get("next"): all_tracks = spotify_get_paginated( f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", access_token, limit=100, ) playlist_data["tracks"] = { **tracks_obj, "items": all_tracks, "offset": 0, "limit": len(all_tracks), "next": None, "previous": None, } return jsonify({ "ok": True, "playlist": playlist_data, }) if __name__ == "__main__": init_db() app.run(host="127.0.0.1", port=8000, debug=True)