api: wrap Spotify playlists

This commit is contained in:
2026-05-13 11:40:50 +02:00
parent 4627786dc4
commit 378009f8b0
2 changed files with 271 additions and 27 deletions

194
api.py
View File

@@ -20,8 +20,6 @@ import os
import sqlite3
from base64 import b64encode
from datetime import datetime, timedelta, timezone
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import json
from flask import Flask, request, session, jsonify, make_response
@@ -214,25 +212,41 @@ def spotify_basic_auth_header() -> str:
def refresh_spotify_token(refresh_token: str) -> dict:
body = urlencode({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}).encode("utf-8")
req = Request(
r = requests.post(
"https://accounts.spotify.com/api/token",
data=body,
method="POST",
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
headers={
"Authorization": spotify_basic_auth_header(),
"Content-Type": "application/x-www-form-urlencoded",
},
)
with urlopen(req) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if not r.ok:
# The token endpoint returns errors as flat
# {"error": "<code>", "error_description": "<msg>"}, which differs from
# the Web API's {"error": {"status": ..., "message": ...}} shape. Rewrite
# the body so our error handler surfaces a consistent envelope to
# clients regardless of which Spotify endpoint failed.
try:
body = r.json()
except ValueError:
body = {}
message = (
body.get("error_description")
or body.get("error")
or r.reason
or "Token refresh failed"
)
r._content = json.dumps({
"error": {"status": r.status_code, "message": message}
}).encode("utf-8")
r.headers["Content-Type"] = "application/json"
return payload
r.raise_for_status()
return r.json()
def get_valid_access_token(spotify_user_id: str) -> str:
@@ -266,17 +280,100 @@ def get_valid_access_token(spotify_user_id: str) -> str:
# ----------------------------
# Simple Spotify API call
# Spotify Web API helpers
# ----------------------------
def spotify_get(url: str, access_token: str, params: dict | None = None) -> dict:
"""GET a Spotify Web API endpoint and return the parsed JSON body."""
headers = {"Authorization": f"Bearer {access_token}"}
r = requests.get(url, headers=headers, params=params)
r.raise_for_status()
return r.json()
def spotify_get_paginated(
url: str,
access_token: str,
limit: int = 50,
max_items: int | None = None,
) -> list:
"""
Fetch all items from a paginated Spotify Web API endpoint.
Spotify paging objects return up to `limit` items per page (50 max for most
endpoints) and a `next` URL. We follow `next` until it is null, collecting
items along the way.
"""
items: list = []
params: dict | None = {"limit": limit, "offset": 0}
next_url: str | None = url
while next_url is not None:
page = spotify_get(next_url, access_token, params=params)
items.extend(page.get("items", []))
if max_items is not None and len(items) >= max_items:
return items[:max_items]
# `next` is a fully-qualified URL with limit/offset already encoded,
# so we must not pass `params` again after the first request.
next_url = page.get("next")
params = None
return items
def spotify_get_me(access_token: str) -> dict:
req = Request(
"https://api.spotify.com/v1/me",
headers={"Authorization": f"Bearer {access_token}"},
method="GET",
)
with urlopen(req) as resp:
return json.loads(resp.read().decode("utf-8"))
return spotify_get("https://api.spotify.com/v1/me", access_token)
# ----------------------------
# Error handling
# ----------------------------
@app.errorhandler(requests.HTTPError)
def handle_spotify_http_error(e: requests.HTTPError):
"""
Translate a non-2xx upstream response from Spotify into a JSON envelope,
passing through the upstream HTTP status code.
Spotify error bodies look like {"error": {"status": ..., "message": ...}}
when JSON is returned; we surface that message when available.
"""
resp = e.response
status = resp.status_code if resp is not None else 502
spotify_error = None
error_message = str(e)
if resp is not None:
try:
spotify_error = resp.json()
except ValueError:
spotify_error = None
if (
isinstance(spotify_error, dict)
and isinstance(spotify_error.get("error"), dict)
):
error_message = (
spotify_error["error"].get("message") or error_message
)
return jsonify({
"ok": False,
"error": error_message,
"spotify": spotify_error,
}), status
@app.errorhandler(requests.RequestException)
def handle_spotify_request_error(e: requests.RequestException):
"""Network-level failures (DNS, connection, timeout) have no upstream status."""
return jsonify({
"ok": False,
"error": f"Upstream request failed: {e}",
}), 502
# ----------------------------
@@ -474,7 +571,10 @@ def me():
@app.route("/playlists")
def playlists():
spotify_user_id = "cidermole"
spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id:
return jsonify({"ok": False, "error": "Not logged in"}), 401
access_token = get_valid_access_token(spotify_user_id)
"""
@@ -482,14 +582,54 @@ def playlists():
url = f"https://api.spotify.com/v1/users/{user_id}/playlists"
# -> 403 Forbidden
"""
url = f"https://api.spotify.com/v1/me/playlists"
headers={"Authorization": f"Bearer {access_token}"}
r = requests.get(url, headers=headers)
# TODO: pagination (limit,offset)
items = spotify_get_paginated(
"https://api.spotify.com/v1/me/playlists",
access_token,
)
return jsonify({
"ok": True,
"response": r.json()
"total": len(items),
"items": items,
})
@app.route("/playlists/<playlist_id>")
def playlist(playlist_id):
spotify_user_id = session.get("spotify_user_id")
if not spotify_user_id:
return jsonify({"ok": False, "error": "Not logged in"}), 401
access_token = get_valid_access_token(spotify_user_id)
playlist_data = spotify_get(
f"https://api.spotify.com/v1/playlists/{playlist_id}",
access_token,
)
# The playlist response embeds a `tracks` paging object whose first page
# contains up to 100 items. For playlists larger than that, follow the
# dedicated tracks endpoint until exhausted and splice the full list back
# into the response.
tracks_obj = playlist_data.get("tracks") or {}
if tracks_obj.get("next"):
all_tracks = spotify_get_paginated(
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks",
access_token,
limit=100,
)
playlist_data["tracks"] = {
**tracks_obj,
"items": all_tracks,
"offset": 0,
"limit": len(all_tracks),
"next": None,
"previous": None,
}
return jsonify({
"ok": True,
"playlist": playlist_data,
})