# app.py # running: # $ cd /var/sites/api.lockstep.at # $ source .venv/bin/activate # $ source .env # $ flask --app api run --port 8000 # Authomatic 1.3.0 is not compatible with Python 3.12+ # patch: /var/sites/api.lockstep.at/.venv/lib/python3.13/site-packages/authomatic/providers/__init__.py # def _fetch() # HTTPSConnection( # # cert_file=cert_file, # <-- comment this # sync continuously: # $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done import os import sqlite3 from base64 import b64encode from datetime import datetime, timedelta, timezone from urllib.parse import urlencode from urllib.request import Request, urlopen import json from flask import Flask, request, session, jsonify, make_response from werkzeug.middleware.proxy_fix import ProxyFix from authomatic import Authomatic from authomatic.adapters import WerkzeugAdapter from authomatic.providers import oauth2 from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP # ---------------------------- # Custom Spotify provider # ---------------------------- class Spotify(oauth2.OAuth2): """ Custom Authomatic provider for Spotify Web API Authorization Code Flow. Spotify docs: - Authorization endpoint: https://accounts.spotify.com/authorize - Token endpoint: https://accounts.spotify.com/api/token - Current user profile: https://api.spotify.com/v1/me """ name = "spotify" # Provider endpoints user_authorization_url = "https://accounts.spotify.com/authorize" access_token_url = "https://accounts.spotify.com/api/token" user_info_url = "https://api.spotify.com/v1/me" # Helpful preset scopes spotify_scopes = ["playlist-read-private"] def update_user(self): """ Fetch Spotify profile and map it onto Authomatic's generic user object. """ response = self.access(self.user_info_url) if response.status != 200: return response data = response.data or {} # Common Authomatic user fields self.user.id = data.get("id") self.user.name = data.get("display_name") or data.get("id") #self.user.email = data.get("email") # Spotify has images as a list; use the first one if present. images = data.get("images") or [] if images: self.user.picture = images[0].get("url") # Optional extra fields if your Authomatic version exposes them self.user.username = data.get("id") self.user.link = (data.get("external_urls") or {}).get("spotify") return response # "must exist in the same module as Spotify (Authomatic provider class)" PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify] # ---------------------------- # App config # ---------------------------- SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"] SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"] # Must exactly match a Redirect URI configured in your Spotify app settings. REDIRECT_URI = os.environ.get( "SPOTIFY_REDIRECT_URI", #"https://api.lockstep.at/spotify/callback" "https://api.lockstep.at/login/spotify/" ) DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db") app = Flask(__name__) app.secret_key = os.environ["FLASK_SECRET_KEY"] app.wsgi_app = ProxyFix( app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1 ) CONFIG = { "spotify": { "class_": Spotify, "consumer_key": SPOTIFY_CLIENT_ID, "consumer_secret": SPOTIFY_CLIENT_SECRET, "scope": Spotify.spotify_scopes } } authomatic = Authomatic(CONFIG, app.secret_key) # ---------------------------- # SQLite helpers # ---------------------------- def db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): conn = db() conn.execute(""" CREATE TABLE IF NOT EXISTS spotify_tokens ( spotify_user_id TEXT PRIMARY KEY, access_token TEXT NOT NULL, refresh_token TEXT NOT NULL, token_type TEXT, scope TEXT, expires_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) conn.commit() conn.close() def save_token_record( spotify_user_id: str, access_token: str, refresh_token: str, expires_in: int, token_type: str | None = None, scope: str | None = None, ): now = datetime.now(timezone.utc) expires_at = now + timedelta(seconds=int(expires_in)) conn = db() conn.execute(""" INSERT INTO spotify_tokens ( spotify_user_id, access_token, refresh_token, token_type, scope, expires_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(spotify_user_id) DO UPDATE SET access_token = excluded.access_token, refresh_token = excluded.refresh_token, token_type = excluded.token_type, scope = excluded.scope, expires_at = excluded.expires_at, updated_at = excluded.updated_at """, ( spotify_user_id, access_token, refresh_token, token_type, scope, expires_at.isoformat(), now.isoformat(), )) conn.commit() conn.close() def get_token_record(spotify_user_id: str): conn = db() row = conn.execute(""" SELECT * FROM spotify_tokens WHERE spotify_user_id = ? """, (spotify_user_id,)).fetchone() conn.close() return row # skew_seconds: refresh every 10 min def token_is_expired_or_soon(expires_at_iso: str, skew_seconds: int = 3600-600) -> bool: expires_at = datetime.fromisoformat(expires_at_iso) now = datetime.now(timezone.utc) return now + timedelta(seconds=skew_seconds) >= expires_at # ---------------------------- # Spotify token refresh # ---------------------------- def spotify_basic_auth_header() -> str: raw = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8") return "Basic " + b64encode(raw).decode("ascii") def refresh_spotify_token(refresh_token: str) -> dict: body = urlencode({ "grant_type": "refresh_token", "refresh_token": refresh_token, }).encode("utf-8") req = Request( "https://accounts.spotify.com/api/token", data=body, method="POST", headers={ "Authorization": spotify_basic_auth_header(), "Content-Type": "application/x-www-form-urlencoded", }, ) with urlopen(req) as resp: payload = json.loads(resp.read().decode("utf-8")) return payload def get_valid_access_token(spotify_user_id: str) -> str: row = get_token_record(spotify_user_id) if not row: raise RuntimeError(f"No stored token for spotify_user_id={spotify_user_id}") if not token_is_expired_or_soon(row["expires_at"]): return row["access_token"] refreshed = refresh_spotify_token(row["refresh_token"]) new_access_token = refreshed["access_token"] #print("refreshed token. new_access_token={}".format(new_access_token), flush=True) new_refresh_token = refreshed.get("refresh_token") or row["refresh_token"] new_expires_in = int(refreshed.get("expires_in", 3600)) new_token_type = refreshed.get("token_type") or row["token_type"] new_scope = refreshed.get("scope") or row["scope"] save_token_record( spotify_user_id=spotify_user_id, access_token=new_access_token, refresh_token=new_refresh_token, expires_in=new_expires_in, token_type=new_token_type, scope=new_scope, ) return new_access_token # ---------------------------- # Simple Spotify API call # ---------------------------- def spotify_get_me(access_token: str) -> dict: req = Request( "https://api.spotify.com/v1/me", headers={"Authorization": f"Bearer {access_token}"}, method="GET", ) with urlopen(req) as resp: return json.loads(resp.read().decode("utf-8")) # ---------------------------- # Routes # ---------------------------- @app.route("/") def index(): return """

Spotify + Authomatic example

Log in with Spotify

After login, this app fetches your Spotify profile and one extra API call.

""" @app.route("/login//", methods=["GET", "POST"]) def login(provider_name): response = make_response() # Let Authomatic handle the OAuth2 handshake. result = authomatic.login( WerkzeugAdapter(request, response), provider_name, session=session, session_saver=lambda: session.modified ) # If result is None, Authomatic is still redirecting/processing. if result is None: return response if result.error: return jsonify({ "ok": False, "stage": "auth", "error": getattr(result.error, "message", str(result.error)) }), 400 # Raw parsed JSON returned by Spotify token endpoint raw_token_data = None raw_token_body = None token_http_status = None if getattr(result.provider, "access_token_response", None): token_http_status = result.provider.access_token_response.status raw_token_data = result.provider.access_token_response.data raw_token_body = result.provider.access_token_response.content if not result.user: return jsonify({ "ok": False, "error": "No user returned from Spotify" }), 400 # Fetch Spotify user profile from /v1/me result.user.update() if result.user.credentials is None: return jsonify({ "ok": False, "error": "Login succeeded, but no credentials were returned." }), 400 # DAVID: EXAMPLE 2 # DAVID ---------- token_response = None if getattr(result.provider, "access_token_response", None): token_response = result.provider.access_token_response.data or {} access_token = token_response.get("access_token") refresh_token = token_response.get("refresh_token") expires_in = int(token_response.get("expires_in", 3600)) token_type = token_response.get("token_type") scope = token_response.get("scope") if not access_token or not refresh_token: return jsonify({ "ok": False, "error": "Spotify token response missing access_token or refresh_token", "token_response": token_response, }), 400 save_token_record( spotify_user_id=result.user.id, access_token=access_token, refresh_token=refresh_token, expires_in=expires_in, token_type=token_type, scope=scope, ) # keep the Spotify user id in session session["spotify_user_id"] = result.user.id return jsonify({ "ok": True, "spotify_user_id": result.user.id, "display_name": result.user.name, "token_response": token_response, }) """ old example 1: # Example protected-resource call using the OAuth access token: # Spotify current user's top tracks (requires user-top-read scope if enabled) # For a scope-free example, use /v1/me only, which we already used in update(). me_response = result.provider.access("https://api.spotify.com/v1/me") payload = { "ok": True, "provider": provider_name, "user": { "id": result.user.id, "name": result.user.name, "email": result.user.email, "picture": getattr(result.user, "picture", None), "profile_url": getattr(result.user, "link", None), }, "credentials": { # Keep this server-side in real apps. Returned here only as a demo. "access_token": result.user.credentials.token, "refresh_token": getattr(result.user.credentials, "refresh_token", None), "expires": str(getattr(result.user.credentials, "expires", None)), "token_http_status": token_http_status, "raw_token_data": raw_token_data, }, "spotify_me_status": me_response.status, "spotify_me_data": me_response.data, } return jsonify(payload) """ """ old example 1: { "credentials": { "access_token": "__TOKEN__", "expires": "None", "raw_token_data": { "access_token": "__TOKEN__", "expires_in": 3600, "refresh_token": "__TOKEN2__", "scope": "playlist-read-private", "token_type": "Bearer" }, "refresh_token": "__TOKEN2__", "token_http_status": 200 }, "ok": true, "provider": "spotify", "spotify_me_data": { "display_name": "cidermole", "external_urls": { "spotify": "https://open.spotify.com/user/cidermole" }, "followers": { "href": null, "total": 2 }, "href": "https://api.spotify.com/v1/users/cidermole", "id": "cidermole", "images": [], "type": "user", "uri": "spotify:user:cidermole" }, "spotify_me_status": 200, "user": { "email": null, "id": "cidermole", "name": "cidermole", "picture": null, "profile_url": "https://open.spotify.com/user/cidermole" } } """ @app.route("/me") def me(): spotify_user_id = session.get("spotify_user_id") if not spotify_user_id: return jsonify({"ok": False, "error": "Not logged in"}), 401 access_token = get_valid_access_token(spotify_user_id) profile = spotify_get_me(access_token) row = get_token_record(spotify_user_id) return jsonify({ "ok": True, "profile": profile, "stored_expires_at": row["expires_at"], }) if __name__ == "__main__": init_db() app.run(host="127.0.0.1", port=8000, debug=True)