2 Commits

Author SHA1 Message Date
e823fc9d94 docs: ai-generated runbuddy mode plan 2026-05-31 20:00:54 +02:00
965bec72d2 feat: NTP client 2026-05-31 20:00:41 +02:00
2 changed files with 426 additions and 0 deletions

View File

@@ -0,0 +1,102 @@
package at.lockstep.player.util
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.InetAddress
import java.net.SocketTimeoutException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
object NtpClient {
private const val NTP_HOST = "pool.ntp.org"
private const val NTP_PORT = 123
private const val NTP_PACKET_SIZE = 48
private const val TIMEOUT_MS = 5000
private const val NTP_EPOCH_OFFSET_SECONDS = 2208988800L
/**
* Queries [NTP_HOST] and returns how many seconds the system clock is ahead of the server.
* Positive = local clock is fast; negative = local clock is slow.
*/
@Throws(Exception::class)
fun clockOffsetSeconds(): Double {
val socket = DatagramSocket()
try {
socket.soTimeout = TIMEOUT_MS
val requestBuffer = ByteArray(NTP_PACKET_SIZE)
requestBuffer[0] = 0x1B // LI=0, VN=3, mode=client
val t1Millis = System.currentTimeMillis()
writeNtpTimestamp(requestBuffer, 40, t1Millis)
val address = InetAddress.getByName(NTP_HOST)
val requestPacket = DatagramPacket(requestBuffer, requestBuffer.size, address, NTP_PORT)
socket.send(requestPacket)
val responseBuffer = ByteArray(NTP_PACKET_SIZE)
val responsePacket = DatagramPacket(responseBuffer, responseBuffer.size)
val errorRef = AtomicReference<Exception?>(null)
val receiveTimeRef = AtomicReference<Long?>(null)
val latch = CountDownLatch(1)
Thread {
try {
socket.receive(responsePacket)
receiveTimeRef.set(System.currentTimeMillis())
} catch (e: Exception) {
errorRef.set(e)
} finally {
latch.countDown()
}
}.start()
if (!latch.await((TIMEOUT_MS + 1000).toLong(), TimeUnit.MILLISECONDS)) {
throw SocketTimeoutException("NTP request timed out")
}
errorRef.get()?.let { throw it }
val t4Millis = receiveTimeRef.get()
?: throw IllegalStateException("NTP response received without timestamp")
val t1 = t1Millis / 1000.0
val t2 = ntpBytesToSeconds(responseBuffer, 32)
val t3 = ntpBytesToSeconds(responseBuffer, 40)
val t4 = t4Millis / 1000.0
return ((t1 - t2) + (t4 - t3)) / 2.0
} finally {
socket.close()
}
}
private fun ntpBytesToSeconds(buffer: ByteArray, offset: Int): Double {
val seconds = readUint32(buffer, offset)
val fraction = readUint32(buffer, offset + 4)
return (seconds - NTP_EPOCH_OFFSET_SECONDS) + fraction / 4294967296.0
}
private fun writeNtpTimestamp(buffer: ByteArray, offset: Int, millis: Long) {
val unixSeconds = millis / 1000.0
val ntpSeconds = (unixSeconds + NTP_EPOCH_OFFSET_SECONDS).toLong()
val fraction = ((unixSeconds - unixSeconds.toLong()) * 4294967296.0).toLong()
writeUint32(buffer, offset, ntpSeconds)
writeUint32(buffer, offset + 4, fraction)
}
private fun readUint32(buffer: ByteArray, offset: Int): Long =
((buffer[offset].toLong() and 0xFF) shl 24) or
((buffer[offset + 1].toLong() and 0xFF) shl 16) or
((buffer[offset + 2].toLong() and 0xFF) shl 8) or
(buffer[offset + 3].toLong() and 0xFF)
private fun writeUint32(buffer: ByteArray, offset: Int, value: Long) {
buffer[offset] = ((value shr 24) and 0xFF).toByte()
buffer[offset + 1] = ((value shr 16) and 0xFF).toByte()
buffer[offset + 2] = ((value shr 8) and 0xFF).toByte()
buffer[offset + 3] = (value and 0xFF).toByte()
}
}

324
plans/runbuddy-mode.md Normal file
View File

@@ -0,0 +1,324 @@
---
name: Runbuddy Mode Plan
overview: "Add 1:1 RunBuddy sessions: leader creates a public session on the server, follower picks it from an in-app billboard, and both stay in sync via WebSocket events timestamped with NTP-corrected wall clock. Both devices use the same Spotify playlist and local MP3 pairings."
todos:
- id: server-schema-rest
content: Add runbuddy_sessions/participants tables + REST endpoints (create, billboard, join, end, leave) to lockstep-2-api
status: pending
- id: server-websocket
content: Add WebSocket room relay with token auth, snapshot persistence, and leader→follower fan-out
status: pending
- id: android-clients
content: Implement RunBuddyApiClient, RunBuddyWebSocketClient (OkHttp), RunBuddyClock wrapping NtpClient
status: pending
- id: sync-protocol
content: Define JSON event schema and follower drift correction math in RunBuddyCoordinator
status: pending
- id: playback-hooks
content: Extend PlaybackService with attachRunBuddy, applyRemoteTransport, leader emit hooks on transport changes
status: pending
- id: prejoin-validation
content: Follower pairing validation against session playlistId + pairedTrackCount before join
status: pending
- id: ui-flows
content: Add RunBuddy billboard screen, leader start/waiting UI, Now Playing role banner and follower control lockout
status: pending
- id: e2e-test
content: "Two-device test: create/join, play/pause/seek/skip/track-change, verify <50ms perceived drift while running"
status: pending
isProject: false
---
# RunBuddy Mode — Implementation Plan
## Scope (confirmed)
- **1 leader + 1 follower** per session (session locks when follower joins)
- **Same Spotify playlist + same local MP3 pairings** on both devices
- **Leader is source of truth** for playback transport and position
- **Position sync target:** millisecond-level via [`NtpClient.kt`](app/src/main/java/at/lockstep/player/util/NtpClient.kt) + timestamped realtime events
## Current baseline
| Layer | Today | Gap |
|-------|-------|-----|
| Playback | [`PlaybackService`](app/src/main/java/at/lockstep/player/playback/PlaybackService.kt) owns queue, play/pause/seek/skip locally | No remote command ingress |
| Networking | REST only ([`MetadataSyncClient`](app/src/main/java/at/lockstep/player/data/MetadataSyncClient.kt), jukebox) | No sessions, no WebSocket |
| Server | Flask monolith [`lockstep-2-api/api.py`](lockstep-2-api/api.py) | No session model, no realtime |
| Clock | `NtpClient` exists but unused | Wire into session lifecycle |
```mermaid
sequenceDiagram
participant Leader as LeaderApp
participant Server as api.lockstep.at
participant Follower as FollowerApp
participant NTP as pool.ntp.org
Leader->>NTP: NtpClient (offset)
Leader->>Server: POST /runbuddy/sessions
Server-->>Leader: sessionId, joinCode, wsToken
Leader->>Server: WS connect (leader)
Follower->>Server: GET /runbuddy/sessions (billboard)
Follower->>Server: POST /runbuddy/sessions/{id}/join
Follower->>NTP: NtpClient (offset)
Follower->>Server: WS connect (follower)
Leader->>Server: transport + position events (correctedMs)
Server->>Follower: fan-out events
Follower->>Follower: apply to PlaybackService
```
---
## 1. Server — session model + billboard + WebSocket
Extend [`lockstep-2-api/api.py`](lockstep-2-api/api.py) (or split into `runbuddy.py` imported by `api.py` to keep the monolith manageable).
### SQLite tables
**`runbuddy_sessions`**
- `id` (UUID, PK)
- `host_spotify_user_id`
- `playlist_id`, `playlist_name`
- `paired_track_count` (leader-reported, for follower pre-check)
- `status` (`waiting` | `active` | `ended`)
- `created_at`, `expires_at` (TTL, e.g. 2h idle)
**`runbuddy_participants`**
- `session_id`, `role` (`leader` | `follower`)
- `spotify_user_id`, `joined_at`
- Unique constraint: one follower per session
**`runbuddy_ws_tokens`**
- Short-lived join token per participant (avoid putting Spotify bearer in WS query string long-term)
### REST endpoints (all `@require_auth` with existing Bearer Spotify token)
| Method | Route | Purpose |
|--------|-------|---------|
| `POST` | `/runbuddy/sessions` | Leader creates session. Body: `{ playlistId, playlistName, pairedTrackCount }`. Returns `{ sessionId, joinCode, wsUrl, wsToken, snapshot }`. |
| `GET` | `/runbuddy/sessions` | **Billboard**: list `status=waiting` public sessions (host display name, playlist name, paired count, created_at). |
| `GET` | `/runbuddy/sessions/{id}` | Session snapshot for late join / reconnect. |
| `POST` | `/runbuddy/sessions/{id}/join` | Follower joins (409 if full). Returns `{ wsUrl, wsToken, snapshot }`. |
| `POST` | `/runbuddy/sessions/{id}/end` | Leader ends session. |
| `POST` | `/runbuddy/sessions/{id}/leave` | Follower leaves. |
**Snapshot** (stored server-side, updated by leader over WS) includes current transport state:
```json
{
"playlistId": "...",
"playlistPosition": 3,
"trackId": "spotify:track:...",
"positionMs": 45123,
"isPlaying": true,
"correctedMs": 1717171717171
}
```
### WebSocket channel
Add a realtime layer to the Flask deployment. Practical options:
- **Recommended for MVP:** [`flask-sock`](https://github.com/miguelgrinberg/flask-sock) (simple, works with existing Flask app) or a small **sidecar ASGI service** if production WSGI cannot hold long-lived connections
- Route: `GET /runbuddy/ws?sessionId=&token=`
- In-memory room registry keyed by `sessionId` (acceptable for 1:1 MVP; document Redis upgrade path if scaling)
**Server responsibilities:**
- Authenticate token → session + role
- Relay JSON messages leader → follower (follower messages limited to `ping`, `leave`, optional `ready`)
- Persist latest snapshot on leader `state` / `position` messages
- Close room and set `status=ended` on leader disconnect or explicit end
### Operational notes
- Add session TTL cleanup cron or lazy expiry on billboard GET
- Rate-limit session creation (already flagged in [`BUGS.md`](BUGS.md))
- WebSocket requires production config change (proxy timeout, upgrade headers) — document in deploy notes
---
## 2. Sync protocol — NTP + event timestamps
### Shared clock helper (new Android class)
[`RunBuddyClock`](app/src/main/java/at/lockstep/player/runbuddy/RunBuddyClock.kt) (proposed):
```kotlin
// offsetMs = local ahead of NTP (from NtpClient)
fun correctedNowMs(): Long = System.currentTimeMillis() - offsetMs
```
- Call `NtpClient.clockOffsetSeconds()` on **session join** (background thread) and optionally **refresh every ~5 min** during active session
- Both peers use the same reference (NTP), not server wall clock — satisfies your requirement and avoids adding a server time endpoint for MVP
### Event types (WebSocket JSON)
| Type | Sender | Fields | Purpose |
|------|--------|--------|---------|
| `transport` | leader | `action`: `play`/`pause`/`seek`/`skip_next`/`skip_prev`/`track_change`, `correctedMs`, `positionMs`, `playlistPosition`, `trackId` | Immediate state changes |
| `position` | leader | `correctedMs`, `positionMs`, `isPlaying`, `playlistPosition`, `trackId` | Heartbeat every **250ms** (match existing [`UPDATE_INTERVAL_MS`](app/src/main/java/at/lockstep/player/playback/PlaybackService.kt)) |
| `snapshot` | server | full snapshot | On follower connect / reconnect |
| `session_end` | server | reason | Tear down |
All timestamps in events use **`correctedMs`** (NTP-aligned epoch millis).
### Follower position correction
On each `position` or `transport` event:
```
targetMs = event.positionMs + (event.isPlaying ? followerClock.correctedNowMs() - event.correctedMs : 0)
drift = targetMs - localEngine.getCurrentPositionMs()
if |drift| > 30ms → engine.seekTo(targetMs)
if track mismatch → publishCurrentTrack by playlistPosition/trackId first, then seek
```
- **30ms threshold** is a starting point; tune with real devices
- When paused, do not extrapolate — hold `positionMs` exactly
- Ignore heartbeats while a transport event is being applied (short debounce)
### Leader behavior
- Local UI actions (`requestTogglePause`, `requestSeek`, skip) **also emit** corresponding `transport` events before/after applying locally
- Position heartbeats read `getPlaybackPositionMs()` + `RunBuddyClock.correctedNowMs()`
- Leader local controls remain enabled; follower controls **disabled or no-op** (show "following leader" UI)
---
## 3. Android — new modules and PlaybackService integration
### New packages / files
| File | Role |
|------|------|
| `data/RunBuddyApiClient.kt` | REST: create, list, join, end, leave |
| `runbuddy/RunBuddyWebSocketClient.kt` | OkHttp WebSocket (already on classpath) with reconnect + backoff |
| `runbuddy/RunBuddyClock.kt` | NTP offset cache |
| `runbuddy/RunBuddyCoordinator.kt` | Session lifecycle, role, event encode/decode, snapshot apply |
| `runbuddy/RunBuddySessionState.kt` | `Idle`, `Leading(sessionId)`, `Following(sessionId)` |
Wire clients from [`LockstepApplication`](app/src/main/java/at/lockstep/player/LockstepApplication.kt) (shared `OkHttpClient`).
### PlaybackService changes ([`PlaybackService.kt`](app/src/main/java/at/lockstep/player/playback/PlaybackService.kt))
Add a **`RunBuddyCoordinator` reference** (injected via new binder methods or constructor set from NavHost when session starts):
**New public API:**
- `attachRunBuddy(coordinator: RunBuddyCoordinator)`
- `detachRunBuddy()`
- `applyRemoteTransport(...)` — internal mirror of play/pause/seek/skip/track change **without** re-emitting leader events
- `isRunBuddyFollower(): Boolean` — UI uses this to disable controls
**Hook points (existing methods):**
```403:452:app/src/main/java/at/lockstep/player/playback/PlaybackService.kt
private fun setPlaying(playing: Boolean) { ... }
private fun skipDelta(delta: Int) { ... }
fun requestSeek(fraction: Float) { ... }
```
- Leader path: after local mutation, notify coordinator → WS send
- Follower path: coordinator calls `applyRemoteTransport` instead of user `request*` methods
**Track identity:** sync on `currentPlaylistPosition` + `currentTrackId` (not `currentQueueIndex`) to stay stable across paired-only queue indexing — aligns with existing [`PlaybackUiState`](app/src/main/java/at/lockstep/player/playback/PlaybackService.kt) fields and avoids the unpaired-track index bug called out in [`BUGS.md`](BUGS.md).
### Pre-join validation (follower)
Before `POST .../join`:
1. User must be signed in (existing Spotify token)
2. Local `pairingDao.countPaired(playlistId) >= session.pairedTrackCount` (and ideally equal)
3. Optionally verify each Spotify track id in leader snapshot queue is paired locally — fail fast with clear error if not
### ViewModel ([`LockstepViewModel.kt`](app/src/main/java/at/lockstep/player/LockstepViewModel.kt))
Add thin session orchestration (not playback logic):
- `createRunBuddySession(playlistId)`
- `listPublicSessions()`
- `joinRunBuddySession(sessionId)`
- `endRunBuddySession()`
- Expose `runBuddyState: StateFlow<RunBuddySessionState>` for UI
Keep playback sync inside `PlaybackService` + coordinator (consistent with [`DESIGN.md`](DESIGN.md) foreground-service ownership).
---
## 4. UI flows
### Navigation ([`Routes.kt`](app/src/main/java/at/lockstep/player/ui/navigation/Routes.kt))
Add routes:
- `runbuddy/billboard` — public session list
- Reuse `nowPlaying/{playlistId}` for both roles once connected
### Leader flow
1. Library → select playlist (must be fully paired)
2. **"Start RunBuddy"** action (new button on Library row or playlist detail)
3. Create session → show join code + "Waiting for buddy…"
4. On follower join → navigate to Now Playing, start playlist via existing `startPlaylistPlayback()`
5. Coordinator attaches as **leader**; heartbeats begin
### Follower flow
1. New entry point: **RunBuddy** from Library top bar or Settings
2. [`RunBuddyBillboardScreen`](app/src/main/java/at/lockstep/player/ui/runbuddy/RunBuddyBillboardScreen.kt) — poll or pull-to-refresh `GET /runbuddy/sessions`
3. Tap session → validate pairings → join → navigate to Now Playing + start same playlist
4. Coordinator attaches as **follower**; apply snapshot immediately
### Now Playing UI ([`NowPlayingScreen.kt`](app/src/main/java/at/lockstep/player/ui/NowPlayingScreen.kt))
- Banner: "Leading RunBuddy" / "Following [host name]"
- Follower: disable play/pause/skip/seek slider (or show read-only progress)
- Leader: **End session** button
- Show sync status indicator (clock synced / WS connected / drift ms for debug builds)
### Settings ([`SettingsScreen.kt`](app/src/main/java/at/lockstep/player/ui/settings/SettingsScreen.kt))
Optional toggle: **"Enable RunBuddy mode"** (feature flag) — can defer if always-on is fine for MVP.
---
## 5. Suggested implementation phases
### Phase A — Server foundation
- DB migration + REST CRUD + billboard list
- Manual test with curl/Postman
### Phase B — WebSocket relay
- Token auth, leader→follower fan-out, snapshot persistence
- Test with `websocat` or a minimal HTML client
### Phase C — Android clients
- `RunBuddyApiClient`, `RunBuddyWebSocketClient`, `RunBuddyClock`
- Unit-test event parsing + follower drift math (pure Kotlin)
### Phase D — Playback integration
- `RunBuddyCoordinator` + `PlaybackService` hooks
- End-to-end two-emulator or two-device test
### Phase E — UI + polish
- Billboard + leader start/waiting screens
- Reconnect (follower re-joins, applies snapshot, resumes heartbeats)
- Session expiry, error toasts, 409 when session full
---
## 6. Risks and mitigations
| Risk | Mitigation |
|------|------------|
| ExoPlayer seek latency causes drift | Periodic 250ms heartbeats + 30ms threshold; pause extrapolation |
| NTP blocked on mobile network | Fail gracefully; show "clock sync failed" and block join |
| Flask WS in production | Verify reverse-proxy WS upgrade; fallback sidecar if needed |
| Unpaired-track index bug | Sync on `playlistPosition`/`trackId`; fix BUGS.md issue in parallel |
| Leader backgrounded | Foreground service already running; WS kept alive in service process |
| Pairing mismatch | Pre-join validation against `pairedTrackCount` + track ids |
---
## 7. Out of scope (MVP)
- Group runs (1:N)
- Cross-playlist or streaming without local MP3s
- Syncing run-data sensor JSON over WS (only playback)
- Server-side beat/metadata sharing per session (reuse existing per-user metadata APIs later)