From fc0d35eb96a01ba66121659bc5c3bd82fb1af390 Mon Sep 17 00:00:00 2001 From: David Madl Date: Thu, 2 Apr 2026 00:51:11 +0200 Subject: [PATCH] feat: api token persistence and refreshing --- api.py | 245 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 240 insertions(+), 5 deletions(-) diff --git a/api.py b/api.py index 67c0250..36ecf39 100644 --- a/api.py +++ b/api.py @@ -16,6 +16,13 @@ # $ while sleep 1; do diff -q api.py /tmp/api.py; if [ $? -ne 0 ]; then scp api.py lockstep@api.lockstep.at:/var/sites/api.lockstep.at/; cp api.py /tmp/api.py; fi; done import os +import sqlite3 +from base64 import b64encode +from datetime import datetime, timedelta, timezone +from urllib.parse import urlencode +from urllib.request import Request, urlopen +import json + from flask import Flask, request, session, jsonify, make_response from werkzeug.middleware.proxy_fix import ProxyFix from authomatic import Authomatic @@ -24,6 +31,10 @@ from authomatic.providers import oauth2 from authomatic.providers import PROVIDER_ID_MAP as AUTHOMATIC_PROVIDER_ID_MAP +# ---------------------------- +# Custom Spotify provider +# ---------------------------- + class Spotify(oauth2.OAuth2): """ Custom Authomatic provider for Spotify Web API Authorization Code Flow. @@ -74,6 +85,10 @@ class Spotify(oauth2.OAuth2): PROVIDER_ID_MAP = list(AUTHOMATIC_PROVIDER_ID_MAP) + [Spotify] +# ---------------------------- +# App config +# ---------------------------- + SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"] SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"] @@ -84,6 +99,8 @@ REDIRECT_URI = os.environ.get( "https://api.lockstep.at/login/spotify/" ) +DB_PATH = os.environ.get("TOKEN_DB_PATH", "spotify_tokens.db") + app = Flask(__name__) app.secret_key = os.environ["FLASK_SECRET_KEY"] app.wsgi_app = ProxyFix( @@ -95,18 +112,174 @@ CONFIG = { "class_": Spotify, "consumer_key": SPOTIFY_CLIENT_ID, "consumer_secret": SPOTIFY_CLIENT_SECRET, - - # Spotify scopes are space-separated conceptually; Authomatic accepts a list. "scope": Spotify.spotify_scopes - - # Optional: force Spotify to show consent every time. - # "user_authorization_params": {"show_dialog": "true"}, } } authomatic = Authomatic(CONFIG, app.secret_key) +# ---------------------------- +# SQLite helpers +# ---------------------------- + +def db(): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def init_db(): + conn = db() + conn.execute(""" + CREATE TABLE IF NOT EXISTS spotify_tokens ( + spotify_user_id TEXT PRIMARY KEY, + access_token TEXT NOT NULL, + refresh_token TEXT NOT NULL, + token_type TEXT, + scope TEXT, + expires_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + conn.commit() + conn.close() + + +def save_token_record( + spotify_user_id: str, + access_token: str, + refresh_token: str, + expires_in: int, + token_type: str | None = None, + scope: str | None = None, +): + now = datetime.now(timezone.utc) + expires_at = now + timedelta(seconds=int(expires_in)) + + conn = db() + conn.execute(""" + INSERT INTO spotify_tokens ( + spotify_user_id, access_token, refresh_token, + token_type, scope, expires_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(spotify_user_id) DO UPDATE SET + access_token = excluded.access_token, + refresh_token = excluded.refresh_token, + token_type = excluded.token_type, + scope = excluded.scope, + expires_at = excluded.expires_at, + updated_at = excluded.updated_at + """, ( + spotify_user_id, + access_token, + refresh_token, + token_type, + scope, + expires_at.isoformat(), + now.isoformat(), + )) + conn.commit() + conn.close() + + +def get_token_record(spotify_user_id: str): + conn = db() + row = conn.execute(""" + SELECT * FROM spotify_tokens + WHERE spotify_user_id = ? + """, (spotify_user_id,)).fetchone() + conn.close() + return row + + +# skew_seconds: refresh every 10 min +def token_is_expired_or_soon(expires_at_iso: str, skew_seconds: int = 3600-600) -> bool: + expires_at = datetime.fromisoformat(expires_at_iso) + now = datetime.now(timezone.utc) + return now + timedelta(seconds=skew_seconds) >= expires_at + + +# ---------------------------- +# Spotify token refresh +# ---------------------------- + +def spotify_basic_auth_header() -> str: + raw = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8") + return "Basic " + b64encode(raw).decode("ascii") + + +def refresh_spotify_token(refresh_token: str) -> dict: + body = urlencode({ + "grant_type": "refresh_token", + "refresh_token": refresh_token, + }).encode("utf-8") + + req = Request( + "https://accounts.spotify.com/api/token", + data=body, + method="POST", + headers={ + "Authorization": spotify_basic_auth_header(), + "Content-Type": "application/x-www-form-urlencoded", + }, + ) + + with urlopen(req) as resp: + payload = json.loads(resp.read().decode("utf-8")) + + return payload + + +def get_valid_access_token(spotify_user_id: str) -> str: + row = get_token_record(spotify_user_id) + if not row: + raise RuntimeError(f"No stored token for spotify_user_id={spotify_user_id}") + + if not token_is_expired_or_soon(row["expires_at"]): + return row["access_token"] + + refreshed = refresh_spotify_token(row["refresh_token"]) + + new_access_token = refreshed["access_token"] + #print("refreshed token. new_access_token={}".format(new_access_token), flush=True) + + new_refresh_token = refreshed.get("refresh_token") or row["refresh_token"] + new_expires_in = int(refreshed.get("expires_in", 3600)) + new_token_type = refreshed.get("token_type") or row["token_type"] + new_scope = refreshed.get("scope") or row["scope"] + + save_token_record( + spotify_user_id=spotify_user_id, + access_token=new_access_token, + refresh_token=new_refresh_token, + expires_in=new_expires_in, + token_type=new_token_type, + scope=new_scope, + ) + + return new_access_token + + +# ---------------------------- +# Simple Spotify API call +# ---------------------------- + +def spotify_get_me(access_token: str) -> dict: + req = Request( + "https://api.spotify.com/v1/me", + headers={"Authorization": f"Bearer {access_token}"}, + method="GET", + ) + with urlopen(req) as resp: + return json.loads(resp.read().decode("utf-8")) + + +# ---------------------------- +# Routes +# ---------------------------- + @app.route("/") def index(): return """ @@ -164,6 +337,47 @@ def login(provider_name): "error": "Login succeeded, but no credentials were returned." }), 400 + # DAVID: EXAMPLE 2 + # DAVID ---------- + + token_response = None + if getattr(result.provider, "access_token_response", None): + token_response = result.provider.access_token_response.data or {} + + access_token = token_response.get("access_token") + refresh_token = token_response.get("refresh_token") + expires_in = int(token_response.get("expires_in", 3600)) + token_type = token_response.get("token_type") + scope = token_response.get("scope") + + if not access_token or not refresh_token: + return jsonify({ + "ok": False, + "error": "Spotify token response missing access_token or refresh_token", + "token_response": token_response, + }), 400 + + save_token_record( + spotify_user_id=result.user.id, + access_token=access_token, + refresh_token=refresh_token, + expires_in=expires_in, + token_type=token_type, + scope=scope, + ) + + # keep the Spotify user id in session + session["spotify_user_id"] = result.user.id + + return jsonify({ + "ok": True, + "spotify_user_id": result.user.id, + "display_name": result.user.name, + "token_response": token_response, + }) + + """ + old example 1: # Example protected-resource call using the OAuth access token: # Spotify current user's top tracks (requires user-top-read scope if enabled) # For a scope-free example, use /v1/me only, which we already used in update(). @@ -192,8 +406,10 @@ def login(provider_name): } return jsonify(payload) + """ """ +old example 1: { "credentials": { "access_token": "__TOKEN__", @@ -236,5 +452,24 @@ def login(provider_name): } """ +@app.route("/me") +def me(): + spotify_user_id = session.get("spotify_user_id") + if not spotify_user_id: + return jsonify({"ok": False, "error": "Not logged in"}), 401 + + access_token = get_valid_access_token(spotify_user_id) + profile = spotify_get_me(access_token) + + row = get_token_record(spotify_user_id) + + return jsonify({ + "ok": True, + "profile": profile, + "stored_expires_at": row["expires_at"], + }) + + if __name__ == "__main__": + init_db() app.run(host="127.0.0.1", port=8000, debug=True)