Compare commits
2 Commits
922cc5d82a
...
e823fc9d94
| Author | SHA1 | Date | |
|---|---|---|---|
| e823fc9d94 | |||
| 965bec72d2 |
102
app/src/main/java/at/lockstep/player/util/NtpClient.kt
Normal file
102
app/src/main/java/at/lockstep/player/util/NtpClient.kt
Normal file
@@ -0,0 +1,102 @@
|
||||
package at.lockstep.player.util
|
||||
|
||||
import java.net.DatagramPacket
|
||||
import java.net.DatagramSocket
|
||||
import java.net.InetAddress
|
||||
import java.net.SocketTimeoutException
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
object NtpClient {
|
||||
|
||||
private const val NTP_HOST = "pool.ntp.org"
|
||||
private const val NTP_PORT = 123
|
||||
private const val NTP_PACKET_SIZE = 48
|
||||
private const val TIMEOUT_MS = 5000
|
||||
private const val NTP_EPOCH_OFFSET_SECONDS = 2208988800L
|
||||
|
||||
/**
|
||||
* Queries [NTP_HOST] and returns how many seconds the system clock is ahead of the server.
|
||||
* Positive = local clock is fast; negative = local clock is slow.
|
||||
*/
|
||||
@Throws(Exception::class)
|
||||
fun clockOffsetSeconds(): Double {
|
||||
val socket = DatagramSocket()
|
||||
try {
|
||||
socket.soTimeout = TIMEOUT_MS
|
||||
|
||||
val requestBuffer = ByteArray(NTP_PACKET_SIZE)
|
||||
requestBuffer[0] = 0x1B // LI=0, VN=3, mode=client
|
||||
|
||||
val t1Millis = System.currentTimeMillis()
|
||||
writeNtpTimestamp(requestBuffer, 40, t1Millis)
|
||||
|
||||
val address = InetAddress.getByName(NTP_HOST)
|
||||
val requestPacket = DatagramPacket(requestBuffer, requestBuffer.size, address, NTP_PORT)
|
||||
socket.send(requestPacket)
|
||||
|
||||
val responseBuffer = ByteArray(NTP_PACKET_SIZE)
|
||||
val responsePacket = DatagramPacket(responseBuffer, responseBuffer.size)
|
||||
|
||||
val errorRef = AtomicReference<Exception?>(null)
|
||||
val receiveTimeRef = AtomicReference<Long?>(null)
|
||||
val latch = CountDownLatch(1)
|
||||
|
||||
Thread {
|
||||
try {
|
||||
socket.receive(responsePacket)
|
||||
receiveTimeRef.set(System.currentTimeMillis())
|
||||
} catch (e: Exception) {
|
||||
errorRef.set(e)
|
||||
} finally {
|
||||
latch.countDown()
|
||||
}
|
||||
}.start()
|
||||
|
||||
if (!latch.await((TIMEOUT_MS + 1000).toLong(), TimeUnit.MILLISECONDS)) {
|
||||
throw SocketTimeoutException("NTP request timed out")
|
||||
}
|
||||
errorRef.get()?.let { throw it }
|
||||
|
||||
val t4Millis = receiveTimeRef.get()
|
||||
?: throw IllegalStateException("NTP response received without timestamp")
|
||||
|
||||
val t1 = t1Millis / 1000.0
|
||||
val t2 = ntpBytesToSeconds(responseBuffer, 32)
|
||||
val t3 = ntpBytesToSeconds(responseBuffer, 40)
|
||||
val t4 = t4Millis / 1000.0
|
||||
|
||||
return ((t1 - t2) + (t4 - t3)) / 2.0
|
||||
} finally {
|
||||
socket.close()
|
||||
}
|
||||
}
|
||||
|
||||
private fun ntpBytesToSeconds(buffer: ByteArray, offset: Int): Double {
|
||||
val seconds = readUint32(buffer, offset)
|
||||
val fraction = readUint32(buffer, offset + 4)
|
||||
return (seconds - NTP_EPOCH_OFFSET_SECONDS) + fraction / 4294967296.0
|
||||
}
|
||||
|
||||
private fun writeNtpTimestamp(buffer: ByteArray, offset: Int, millis: Long) {
|
||||
val unixSeconds = millis / 1000.0
|
||||
val ntpSeconds = (unixSeconds + NTP_EPOCH_OFFSET_SECONDS).toLong()
|
||||
val fraction = ((unixSeconds - unixSeconds.toLong()) * 4294967296.0).toLong()
|
||||
writeUint32(buffer, offset, ntpSeconds)
|
||||
writeUint32(buffer, offset + 4, fraction)
|
||||
}
|
||||
|
||||
private fun readUint32(buffer: ByteArray, offset: Int): Long =
|
||||
((buffer[offset].toLong() and 0xFF) shl 24) or
|
||||
((buffer[offset + 1].toLong() and 0xFF) shl 16) or
|
||||
((buffer[offset + 2].toLong() and 0xFF) shl 8) or
|
||||
(buffer[offset + 3].toLong() and 0xFF)
|
||||
|
||||
private fun writeUint32(buffer: ByteArray, offset: Int, value: Long) {
|
||||
buffer[offset] = ((value shr 24) and 0xFF).toByte()
|
||||
buffer[offset + 1] = ((value shr 16) and 0xFF).toByte()
|
||||
buffer[offset + 2] = ((value shr 8) and 0xFF).toByte()
|
||||
buffer[offset + 3] = (value and 0xFF).toByte()
|
||||
}
|
||||
}
|
||||
324
plans/runbuddy-mode.md
Normal file
324
plans/runbuddy-mode.md
Normal file
@@ -0,0 +1,324 @@
|
||||
---
|
||||
name: Runbuddy Mode Plan
|
||||
overview: "Add 1:1 RunBuddy sessions: leader creates a public session on the server, follower picks it from an in-app billboard, and both stay in sync via WebSocket events timestamped with NTP-corrected wall clock. Both devices use the same Spotify playlist and local MP3 pairings."
|
||||
todos:
|
||||
- id: server-schema-rest
|
||||
content: Add runbuddy_sessions/participants tables + REST endpoints (create, billboard, join, end, leave) to lockstep-2-api
|
||||
status: pending
|
||||
- id: server-websocket
|
||||
content: Add WebSocket room relay with token auth, snapshot persistence, and leader→follower fan-out
|
||||
status: pending
|
||||
- id: android-clients
|
||||
content: Implement RunBuddyApiClient, RunBuddyWebSocketClient (OkHttp), RunBuddyClock wrapping NtpClient
|
||||
status: pending
|
||||
- id: sync-protocol
|
||||
content: Define JSON event schema and follower drift correction math in RunBuddyCoordinator
|
||||
status: pending
|
||||
- id: playback-hooks
|
||||
content: Extend PlaybackService with attachRunBuddy, applyRemoteTransport, leader emit hooks on transport changes
|
||||
status: pending
|
||||
- id: prejoin-validation
|
||||
content: Follower pairing validation against session playlistId + pairedTrackCount before join
|
||||
status: pending
|
||||
- id: ui-flows
|
||||
content: Add RunBuddy billboard screen, leader start/waiting UI, Now Playing role banner and follower control lockout
|
||||
status: pending
|
||||
- id: e2e-test
|
||||
content: "Two-device test: create/join, play/pause/seek/skip/track-change, verify <50ms perceived drift while running"
|
||||
status: pending
|
||||
isProject: false
|
||||
---
|
||||
|
||||
# RunBuddy Mode — Implementation Plan
|
||||
|
||||
## Scope (confirmed)
|
||||
|
||||
- **1 leader + 1 follower** per session (session locks when follower joins)
|
||||
- **Same Spotify playlist + same local MP3 pairings** on both devices
|
||||
- **Leader is source of truth** for playback transport and position
|
||||
- **Position sync target:** millisecond-level via [`NtpClient.kt`](app/src/main/java/at/lockstep/player/util/NtpClient.kt) + timestamped realtime events
|
||||
|
||||
## Current baseline
|
||||
|
||||
| Layer | Today | Gap |
|
||||
|-------|-------|-----|
|
||||
| Playback | [`PlaybackService`](app/src/main/java/at/lockstep/player/playback/PlaybackService.kt) owns queue, play/pause/seek/skip locally | No remote command ingress |
|
||||
| Networking | REST only ([`MetadataSyncClient`](app/src/main/java/at/lockstep/player/data/MetadataSyncClient.kt), jukebox) | No sessions, no WebSocket |
|
||||
| Server | Flask monolith [`lockstep-2-api/api.py`](lockstep-2-api/api.py) | No session model, no realtime |
|
||||
| Clock | `NtpClient` exists but unused | Wire into session lifecycle |
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Leader as LeaderApp
|
||||
participant Server as api.lockstep.at
|
||||
participant Follower as FollowerApp
|
||||
participant NTP as pool.ntp.org
|
||||
|
||||
Leader->>NTP: NtpClient (offset)
|
||||
Leader->>Server: POST /runbuddy/sessions
|
||||
Server-->>Leader: sessionId, joinCode, wsToken
|
||||
Leader->>Server: WS connect (leader)
|
||||
Follower->>Server: GET /runbuddy/sessions (billboard)
|
||||
Follower->>Server: POST /runbuddy/sessions/{id}/join
|
||||
Follower->>NTP: NtpClient (offset)
|
||||
Follower->>Server: WS connect (follower)
|
||||
Leader->>Server: transport + position events (correctedMs)
|
||||
Server->>Follower: fan-out events
|
||||
Follower->>Follower: apply to PlaybackService
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 1. Server — session model + billboard + WebSocket
|
||||
|
||||
Extend [`lockstep-2-api/api.py`](lockstep-2-api/api.py) (or split into `runbuddy.py` imported by `api.py` to keep the monolith manageable).
|
||||
|
||||
### SQLite tables
|
||||
|
||||
**`runbuddy_sessions`**
|
||||
- `id` (UUID, PK)
|
||||
- `host_spotify_user_id`
|
||||
- `playlist_id`, `playlist_name`
|
||||
- `paired_track_count` (leader-reported, for follower pre-check)
|
||||
- `status` (`waiting` | `active` | `ended`)
|
||||
- `created_at`, `expires_at` (TTL, e.g. 2h idle)
|
||||
|
||||
**`runbuddy_participants`**
|
||||
- `session_id`, `role` (`leader` | `follower`)
|
||||
- `spotify_user_id`, `joined_at`
|
||||
- Unique constraint: one follower per session
|
||||
|
||||
**`runbuddy_ws_tokens`**
|
||||
- Short-lived join token per participant (avoid putting Spotify bearer in WS query string long-term)
|
||||
|
||||
### REST endpoints (all `@require_auth` with existing Bearer Spotify token)
|
||||
|
||||
| Method | Route | Purpose |
|
||||
|--------|-------|---------|
|
||||
| `POST` | `/runbuddy/sessions` | Leader creates session. Body: `{ playlistId, playlistName, pairedTrackCount }`. Returns `{ sessionId, joinCode, wsUrl, wsToken, snapshot }`. |
|
||||
| `GET` | `/runbuddy/sessions` | **Billboard**: list `status=waiting` public sessions (host display name, playlist name, paired count, created_at). |
|
||||
| `GET` | `/runbuddy/sessions/{id}` | Session snapshot for late join / reconnect. |
|
||||
| `POST` | `/runbuddy/sessions/{id}/join` | Follower joins (409 if full). Returns `{ wsUrl, wsToken, snapshot }`. |
|
||||
| `POST` | `/runbuddy/sessions/{id}/end` | Leader ends session. |
|
||||
| `POST` | `/runbuddy/sessions/{id}/leave` | Follower leaves. |
|
||||
|
||||
**Snapshot** (stored server-side, updated by leader over WS) includes current transport state:
|
||||
|
||||
```json
|
||||
{
|
||||
"playlistId": "...",
|
||||
"playlistPosition": 3,
|
||||
"trackId": "spotify:track:...",
|
||||
"positionMs": 45123,
|
||||
"isPlaying": true,
|
||||
"correctedMs": 1717171717171
|
||||
}
|
||||
```
|
||||
|
||||
### WebSocket channel
|
||||
|
||||
Add a realtime layer to the Flask deployment. Practical options:
|
||||
|
||||
- **Recommended for MVP:** [`flask-sock`](https://github.com/miguelgrinberg/flask-sock) (simple, works with existing Flask app) or a small **sidecar ASGI service** if production WSGI cannot hold long-lived connections
|
||||
- Route: `GET /runbuddy/ws?sessionId=&token=`
|
||||
- In-memory room registry keyed by `sessionId` (acceptable for 1:1 MVP; document Redis upgrade path if scaling)
|
||||
|
||||
**Server responsibilities:**
|
||||
- Authenticate token → session + role
|
||||
- Relay JSON messages leader → follower (follower messages limited to `ping`, `leave`, optional `ready`)
|
||||
- Persist latest snapshot on leader `state` / `position` messages
|
||||
- Close room and set `status=ended` on leader disconnect or explicit end
|
||||
|
||||
### Operational notes
|
||||
|
||||
- Add session TTL cleanup cron or lazy expiry on billboard GET
|
||||
- Rate-limit session creation (already flagged in [`BUGS.md`](BUGS.md))
|
||||
- WebSocket requires production config change (proxy timeout, upgrade headers) — document in deploy notes
|
||||
|
||||
---
|
||||
|
||||
## 2. Sync protocol — NTP + event timestamps
|
||||
|
||||
### Shared clock helper (new Android class)
|
||||
|
||||
[`RunBuddyClock`](app/src/main/java/at/lockstep/player/runbuddy/RunBuddyClock.kt) (proposed):
|
||||
|
||||
```kotlin
|
||||
// offsetMs = local ahead of NTP (from NtpClient)
|
||||
fun correctedNowMs(): Long = System.currentTimeMillis() - offsetMs
|
||||
```
|
||||
|
||||
- Call `NtpClient.clockOffsetSeconds()` on **session join** (background thread) and optionally **refresh every ~5 min** during active session
|
||||
- Both peers use the same reference (NTP), not server wall clock — satisfies your requirement and avoids adding a server time endpoint for MVP
|
||||
|
||||
### Event types (WebSocket JSON)
|
||||
|
||||
| Type | Sender | Fields | Purpose |
|
||||
|------|--------|--------|---------|
|
||||
| `transport` | leader | `action`: `play`/`pause`/`seek`/`skip_next`/`skip_prev`/`track_change`, `correctedMs`, `positionMs`, `playlistPosition`, `trackId` | Immediate state changes |
|
||||
| `position` | leader | `correctedMs`, `positionMs`, `isPlaying`, `playlistPosition`, `trackId` | Heartbeat every **250ms** (match existing [`UPDATE_INTERVAL_MS`](app/src/main/java/at/lockstep/player/playback/PlaybackService.kt)) |
|
||||
| `snapshot` | server | full snapshot | On follower connect / reconnect |
|
||||
| `session_end` | server | reason | Tear down |
|
||||
|
||||
All timestamps in events use **`correctedMs`** (NTP-aligned epoch millis).
|
||||
|
||||
### Follower position correction
|
||||
|
||||
On each `position` or `transport` event:
|
||||
|
||||
```
|
||||
targetMs = event.positionMs + (event.isPlaying ? followerClock.correctedNowMs() - event.correctedMs : 0)
|
||||
drift = targetMs - localEngine.getCurrentPositionMs()
|
||||
if |drift| > 30ms → engine.seekTo(targetMs)
|
||||
if track mismatch → publishCurrentTrack by playlistPosition/trackId first, then seek
|
||||
```
|
||||
|
||||
- **30ms threshold** is a starting point; tune with real devices
|
||||
- When paused, do not extrapolate — hold `positionMs` exactly
|
||||
- Ignore heartbeats while a transport event is being applied (short debounce)
|
||||
|
||||
### Leader behavior
|
||||
|
||||
- Local UI actions (`requestTogglePause`, `requestSeek`, skip) **also emit** corresponding `transport` events before/after applying locally
|
||||
- Position heartbeats read `getPlaybackPositionMs()` + `RunBuddyClock.correctedNowMs()`
|
||||
- Leader local controls remain enabled; follower controls **disabled or no-op** (show "following leader" UI)
|
||||
|
||||
---
|
||||
|
||||
## 3. Android — new modules and PlaybackService integration
|
||||
|
||||
### New packages / files
|
||||
|
||||
| File | Role |
|
||||
|------|------|
|
||||
| `data/RunBuddyApiClient.kt` | REST: create, list, join, end, leave |
|
||||
| `runbuddy/RunBuddyWebSocketClient.kt` | OkHttp WebSocket (already on classpath) with reconnect + backoff |
|
||||
| `runbuddy/RunBuddyClock.kt` | NTP offset cache |
|
||||
| `runbuddy/RunBuddyCoordinator.kt` | Session lifecycle, role, event encode/decode, snapshot apply |
|
||||
| `runbuddy/RunBuddySessionState.kt` | `Idle`, `Leading(sessionId)`, `Following(sessionId)` |
|
||||
|
||||
Wire clients from [`LockstepApplication`](app/src/main/java/at/lockstep/player/LockstepApplication.kt) (shared `OkHttpClient`).
|
||||
|
||||
### PlaybackService changes ([`PlaybackService.kt`](app/src/main/java/at/lockstep/player/playback/PlaybackService.kt))
|
||||
|
||||
Add a **`RunBuddyCoordinator` reference** (injected via new binder methods or constructor set from NavHost when session starts):
|
||||
|
||||
**New public API:**
|
||||
- `attachRunBuddy(coordinator: RunBuddyCoordinator)`
|
||||
- `detachRunBuddy()`
|
||||
- `applyRemoteTransport(...)` — internal mirror of play/pause/seek/skip/track change **without** re-emitting leader events
|
||||
- `isRunBuddyFollower(): Boolean` — UI uses this to disable controls
|
||||
|
||||
**Hook points (existing methods):**
|
||||
|
||||
```403:452:app/src/main/java/at/lockstep/player/playback/PlaybackService.kt
|
||||
private fun setPlaying(playing: Boolean) { ... }
|
||||
private fun skipDelta(delta: Int) { ... }
|
||||
fun requestSeek(fraction: Float) { ... }
|
||||
```
|
||||
|
||||
- Leader path: after local mutation, notify coordinator → WS send
|
||||
- Follower path: coordinator calls `applyRemoteTransport` instead of user `request*` methods
|
||||
|
||||
**Track identity:** sync on `currentPlaylistPosition` + `currentTrackId` (not `currentQueueIndex`) to stay stable across paired-only queue indexing — aligns with existing [`PlaybackUiState`](app/src/main/java/at/lockstep/player/playback/PlaybackService.kt) fields and avoids the unpaired-track index bug called out in [`BUGS.md`](BUGS.md).
|
||||
|
||||
### Pre-join validation (follower)
|
||||
|
||||
Before `POST .../join`:
|
||||
1. User must be signed in (existing Spotify token)
|
||||
2. Local `pairingDao.countPaired(playlistId) >= session.pairedTrackCount` (and ideally equal)
|
||||
3. Optionally verify each Spotify track id in leader snapshot queue is paired locally — fail fast with clear error if not
|
||||
|
||||
### ViewModel ([`LockstepViewModel.kt`](app/src/main/java/at/lockstep/player/LockstepViewModel.kt))
|
||||
|
||||
Add thin session orchestration (not playback logic):
|
||||
- `createRunBuddySession(playlistId)`
|
||||
- `listPublicSessions()`
|
||||
- `joinRunBuddySession(sessionId)`
|
||||
- `endRunBuddySession()`
|
||||
- Expose `runBuddyState: StateFlow<RunBuddySessionState>` for UI
|
||||
|
||||
Keep playback sync inside `PlaybackService` + coordinator (consistent with [`DESIGN.md`](DESIGN.md) foreground-service ownership).
|
||||
|
||||
---
|
||||
|
||||
## 4. UI flows
|
||||
|
||||
### Navigation ([`Routes.kt`](app/src/main/java/at/lockstep/player/ui/navigation/Routes.kt))
|
||||
|
||||
Add routes:
|
||||
- `runbuddy/billboard` — public session list
|
||||
- Reuse `nowPlaying/{playlistId}` for both roles once connected
|
||||
|
||||
### Leader flow
|
||||
|
||||
1. Library → select playlist (must be fully paired)
|
||||
2. **"Start RunBuddy"** action (new button on Library row or playlist detail)
|
||||
3. Create session → show join code + "Waiting for buddy…"
|
||||
4. On follower join → navigate to Now Playing, start playlist via existing `startPlaylistPlayback()`
|
||||
5. Coordinator attaches as **leader**; heartbeats begin
|
||||
|
||||
### Follower flow
|
||||
|
||||
1. New entry point: **RunBuddy** from Library top bar or Settings
|
||||
2. [`RunBuddyBillboardScreen`](app/src/main/java/at/lockstep/player/ui/runbuddy/RunBuddyBillboardScreen.kt) — poll or pull-to-refresh `GET /runbuddy/sessions`
|
||||
3. Tap session → validate pairings → join → navigate to Now Playing + start same playlist
|
||||
4. Coordinator attaches as **follower**; apply snapshot immediately
|
||||
|
||||
### Now Playing UI ([`NowPlayingScreen.kt`](app/src/main/java/at/lockstep/player/ui/NowPlayingScreen.kt))
|
||||
|
||||
- Banner: "Leading RunBuddy" / "Following [host name]"
|
||||
- Follower: disable play/pause/skip/seek slider (or show read-only progress)
|
||||
- Leader: **End session** button
|
||||
- Show sync status indicator (clock synced / WS connected / drift ms for debug builds)
|
||||
|
||||
### Settings ([`SettingsScreen.kt`](app/src/main/java/at/lockstep/player/ui/settings/SettingsScreen.kt))
|
||||
|
||||
Optional toggle: **"Enable RunBuddy mode"** (feature flag) — can defer if always-on is fine for MVP.
|
||||
|
||||
---
|
||||
|
||||
## 5. Suggested implementation phases
|
||||
|
||||
### Phase A — Server foundation
|
||||
- DB migration + REST CRUD + billboard list
|
||||
- Manual test with curl/Postman
|
||||
|
||||
### Phase B — WebSocket relay
|
||||
- Token auth, leader→follower fan-out, snapshot persistence
|
||||
- Test with `websocat` or a minimal HTML client
|
||||
|
||||
### Phase C — Android clients
|
||||
- `RunBuddyApiClient`, `RunBuddyWebSocketClient`, `RunBuddyClock`
|
||||
- Unit-test event parsing + follower drift math (pure Kotlin)
|
||||
|
||||
### Phase D — Playback integration
|
||||
- `RunBuddyCoordinator` + `PlaybackService` hooks
|
||||
- End-to-end two-emulator or two-device test
|
||||
|
||||
### Phase E — UI + polish
|
||||
- Billboard + leader start/waiting screens
|
||||
- Reconnect (follower re-joins, applies snapshot, resumes heartbeats)
|
||||
- Session expiry, error toasts, 409 when session full
|
||||
|
||||
---
|
||||
|
||||
## 6. Risks and mitigations
|
||||
|
||||
| Risk | Mitigation |
|
||||
|------|------------|
|
||||
| ExoPlayer seek latency causes drift | Periodic 250ms heartbeats + 30ms threshold; pause extrapolation |
|
||||
| NTP blocked on mobile network | Fail gracefully; show "clock sync failed" and block join |
|
||||
| Flask WS in production | Verify reverse-proxy WS upgrade; fallback sidecar if needed |
|
||||
| Unpaired-track index bug | Sync on `playlistPosition`/`trackId`; fix BUGS.md issue in parallel |
|
||||
| Leader backgrounded | Foreground service already running; WS kept alive in service process |
|
||||
| Pairing mismatch | Pre-join validation against `pairedTrackCount` + track ids |
|
||||
|
||||
---
|
||||
|
||||
## 7. Out of scope (MVP)
|
||||
|
||||
- Group runs (1:N)
|
||||
- Cross-playlist or streaming without local MP3s
|
||||
- Syncing run-data sensor JSON over WS (only playback)
|
||||
- Server-side beat/metadata sharing per session (reuse existing per-user metadata APIs later)
|
||||
Reference in New Issue
Block a user