/** * Session-keyed message store. * * Holds per-session state in a Map keyed by sessionId. * Session switch = change activeSessionId pointer. No clearing. Old data stays. * WebSocket handler = store.appendRealtime(msg.sessionId, msg). One line. * No localStorage for messages. Backend JSONL is the source of truth. */ import { useCallback, useMemo, useRef, useState } from 'react'; import { authenticatedFetch } from '../utils/api'; import type { LLMProvider } from '../types/app'; // ─── NormalizedMessage (mirrors server/adapters/types.js) ──────────────────── export type MessageKind = | 'text' | 'tool_use' | 'tool_result' | 'thinking' | 'stream_delta' | 'stream_end' | 'error' | 'complete' | 'status' | 'permission_request' | 'permission_cancelled' | 'session_created' | 'interactive_prompt' | 'task_notification'; export interface NormalizedMessage { id: string; sessionId: string; timestamp: string; provider: LLMProvider; kind: MessageKind; /** * Per-run monotonic sequence number assigned by the backend to live * websocket events. Used to compute `lastSeq` for `chat.subscribe` replay; * REST history messages do not carry it. */ seq?: number; // kind-specific fields (flat for simplicity) role?: 'user' | 'assistant'; content?: string; /** * Mirrors optional transcript metadata from the server. * * These fields are currently used by Claude history normalization so local * slash commands, local stdout, and compact summaries do not disappear when * the session store hydrates from REST history. */ displayText?: string; commandName?: string; commandMessage?: string; commandArgs?: string; isLocalCommand?: boolean; isLocalCommandStdout?: boolean; isCompactSummary?: boolean; images?: string[]; toolName?: string; toolInput?: unknown; toolId?: string; toolResult?: { content: string; isError: boolean; toolUseResult?: unknown } | null; isError?: boolean; text?: string; tokens?: number; canInterrupt?: boolean; tokenBudget?: unknown; requestId?: string; input?: unknown; context?: unknown; newSessionId?: string; status?: string; summary?: string; exitCode?: number; actualSessionId?: string; parentToolUseId?: string; subagentTools?: unknown[]; isFinal?: boolean; // Cursor-specific ordering sequence?: number; rowid?: number; } // ─── Per-session slot ──────────────────────────────────────────────────────── export type SessionStatus = 'idle' | 'loading' | 'streaming' | 'error'; export interface SessionSlot { serverMessages: NormalizedMessage[]; realtimeMessages: NormalizedMessage[]; merged: NormalizedMessage[]; /** @internal Cache-invalidation refs for computeMerged */ _lastServerRef: NormalizedMessage[]; _lastRealtimeRef: NormalizedMessage[]; status: SessionStatus; fetchedAt: number; total: number; hasMore: boolean; offset: number; tokenUsage: unknown; } const EMPTY: NormalizedMessage[] = []; function createEmptySlot(): SessionSlot { return { serverMessages: EMPTY, realtimeMessages: EMPTY, merged: EMPTY, _lastServerRef: EMPTY, _lastRealtimeRef: EMPTY, status: 'idle', fetchedAt: 0, total: 0, hasMore: false, offset: 0, tokenUsage: null, }; } /** * Compute merged messages: server + realtime, deduped by id and adjacent * assistant echo (same trimmed text), so finalized stream rows do not stack * on top of the persisted copy before realtime is cleared. */ const LOCAL_USER_DEDUPE_WINDOW_MS = 5 * 60 * 1000; const LOCAL_USER_DEDUPE_CLOCK_SKEW_MS = 10_000; function userTextFingerprint(m: NormalizedMessage): string | null { if (m.kind !== 'text' || m.role !== 'user') return null; const t = (m.content || '').trim(); return t.length > 0 ? t : null; } function readMessageTime(m: NormalizedMessage): number | null { const time = Date.parse(m.timestamp); return Number.isFinite(time) ? time : null; } function hasServerEchoForLocalUser( localMessage: NormalizedMessage, serverMessages: NormalizedMessage[], ): boolean { const localText = userTextFingerprint(localMessage); const localTime = readMessageTime(localMessage); if (!localText || localTime === null) { return false; } return serverMessages.some((serverMessage) => { if (userTextFingerprint(serverMessage) !== localText) { return false; } const serverTime = readMessageTime(serverMessage); return ( serverTime !== null && serverTime >= localTime - LOCAL_USER_DEDUPE_CLOCK_SKEW_MS && serverTime - localTime <= LOCAL_USER_DEDUPE_WINDOW_MS ); }); } function compareMessagesChronologically(a: NormalizedMessage, b: NormalizedMessage): number { const timeA = readMessageTime(a) ?? 0; const timeB = readMessageTime(b) ?? 0; if (timeA !== timeB) { return timeA - timeB; } return 0; } /** * Count how many user turns precede `message` in a chronologically merged view * of server + realtime rows. Used to match a realtime row to the correct turn * on disk when several turns share identical assistant text. */ function getUserTurnOrdinalBefore( message: NormalizedMessage, serverMessages: NormalizedMessage[], realtimeMessages: NormalizedMessage[], ): number { const messageTime = readMessageTime(message); let userCount = 0; for (const candidate of [...serverMessages, ...realtimeMessages].sort(compareMessagesChronologically)) { if (candidate.id === message.id) { break; } const candidateTime = readMessageTime(candidate); if ( messageTime !== null && candidateTime !== null && candidateTime > messageTime ) { break; } if (candidate.kind === 'text' && candidate.role === 'user') { userCount++; } } return Math.max(0, userCount - 1); } function findServerTurnRangeByOrdinal( serverMessages: NormalizedMessage[], turnOrdinal: number, ): { start: number; end: number } | null { let userCount = -1; let start = -1; for (let index = 0; index < serverMessages.length; index++) { const message = serverMessages[index]; if (message.kind === 'text' && message.role === 'user') { userCount++; if (userCount === turnOrdinal) { start = index; break; } } } if (start < 0) { return null; } let end = serverMessages.length; for (let index = start + 1; index < serverMessages.length; index++) { if (serverMessages[index].kind === 'text' && serverMessages[index].role === 'user') { end = index; break; } } return { start, end }; } function isAssistantTextEchoedInSameTurnOnServer( message: NormalizedMessage, serverMessages: NormalizedMessage[], realtimeMessages: NormalizedMessage[], ): boolean { const assistantText = (message.content || '').trim(); if (!assistantText) { return false; } const turnOrdinal = getUserTurnOrdinalBefore(message, serverMessages, realtimeMessages); const turnRange = findServerTurnRangeByOrdinal(serverMessages, turnOrdinal); if (!turnRange) { return false; } return serverMessages .slice(turnRange.start + 1, turnRange.end) .some((serverMessage) => serverMessage.kind === 'text' && serverMessage.role === 'assistant' && (serverMessage.content || '').trim() === assistantText, ); } /** * After `finalizeStreaming`, the client holds a synthetic assistant `text` row * while the sessions API soon returns the same reply with a different id. * Those sit back-to-back in merged order and look like duplicate bubbles until * `refreshFromServer` clears realtime. Collapse same-text assistant rows and * stream_placeholder → text when content matches. */ function dedupeAdjacentAssistantEchoes(merged: NormalizedMessage[]): NormalizedMessage[] { const out: NormalizedMessage[] = []; for (const m of merged) { const prev = out[out.length - 1]; if (prev) { if (prev.kind === 'stream_delta' && m.kind === 'text' && m.role === 'assistant') { const ps = (prev.content || '').trim(); const ms = (m.content || '').trim(); if (ps.length > 0 && ps === ms) { out[out.length - 1] = m; continue; } } if ( prev.kind === 'text' && m.kind === 'text' && prev.role === 'assistant' && m.role === 'assistant' ) { const ms = (m.content || '').trim(); if (ms.length > 0 && ms === (prev.content || '').trim()) { continue; } } } out.push(m); } return out; } /** * After a server refresh, drop only the realtime rows the persisted transcript * already owns. Anything not yet on disk (common right after `complete`, while * JSONL indexing lags) stays in `realtimeMessages` so the chat pane never * flashes the empty "Continue your conversation" state. */ function pruneRealtimeSupersededByServer( serverMessages: NormalizedMessage[], realtimeMessages: NormalizedMessage[], ): NormalizedMessage[] { if (realtimeMessages.length === 0) { return realtimeMessages; } const serverIds = new Set(serverMessages.map((message) => message.id)); return realtimeMessages.filter((message) => { if (serverIds.has(message.id)) { return false; } if (message.id.startsWith('local_') && hasServerEchoForLocalUser(message, serverMessages)) { return false; } if (message.kind === 'stream_delta' || message.id === `__streaming_${message.sessionId}`) { if (isAssistantTextEchoedInSameTurnOnServer(message, serverMessages, realtimeMessages)) { return false; } return true; } if (message.kind === 'text' && message.role === 'assistant') { if (isAssistantTextEchoedInSameTurnOnServer(message, serverMessages, realtimeMessages)) { return false; } return true; } if (message.kind === 'text' && message.role === 'user') { return !hasServerEchoForLocalUser(message, serverMessages); } if (message.kind === 'tool_use' && message.toolId) { if (serverMessages.some((serverMessage) => serverMessage.kind === 'tool_use' && serverMessage.toolId === message.toolId)) { return false; } } return true; }); } function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] { if (realtime.length === 0) { return dedupeAdjacentAssistantEchoes(server); } if (server.length === 0) { return dedupeAdjacentAssistantEchoes(realtime); } const serverIds = new Set(server.map((message) => message.id)); const extra = realtime.filter((message) => { if (serverIds.has(message.id)) { return false; } // Optimistic user rows use `local_*` ids; once the same text exists on the // server-backed copy from the same send window, drop the realtime echo to // avoid duplicate bubbles without hiding repeated prompts from history. if (message.id.startsWith('local_')) { if (hasServerEchoForLocalUser(message, server)) { return false; } } return true; }); if (extra.length === 0) { return dedupeAdjacentAssistantEchoes(server); } // Interleave by timestamp so live rows stay with their turn instead of // piling up at the bottom after every refresh. return dedupeAdjacentAssistantEchoes( [...server, ...extra].sort(compareMessagesChronologically), ); } /** * Recompute slot.merged only when the input arrays have actually changed * (by reference). Returns true if merged was recomputed. */ function recomputeMergedIfNeeded(slot: SessionSlot): boolean { if (slot.serverMessages === slot._lastServerRef && slot.realtimeMessages === slot._lastRealtimeRef) { return false; } slot._lastServerRef = slot.serverMessages; slot._lastRealtimeRef = slot.realtimeMessages; slot.merged = computeMerged(slot.serverMessages, slot.realtimeMessages); return true; } // ─── Stale threshold ───────────────────────────────────────────────────────── const STALE_THRESHOLD_MS = 30_000; const MAX_REALTIME_MESSAGES = 500; // ─── Hook ──────────────────────────────────────────────────────────────────── export function useSessionStore() { const storeRef = useRef(new Map()); const activeSessionIdRef = useRef(null); // Bump to force re-render — only when the active session's data changes. // Session ids are stable for the whole conversation lifetime (the backend // allocates them before the first send), so slots are keyed directly with // no alias/redirect indirection. const [, setTick] = useState(0); const notify = useCallback((sessionId: string) => { if (sessionId === activeSessionIdRef.current) { setTick(n => n + 1); } }, []); const setActiveSession = useCallback((sessionId: string | null) => { activeSessionIdRef.current = sessionId; }, []); const getSlot = useCallback((sessionId: string): SessionSlot => { const store = storeRef.current; if (!store.has(sessionId)) { store.set(sessionId, createEmptySlot()); } return store.get(sessionId)!; }, []); const has = useCallback((sessionId: string) => { return storeRef.current.has(sessionId); }, []); /** * Fetch messages from the provider sessions endpoint and populate serverMessages. * * Provider and project metadata are resolved server-side from `sessionId`. * The endpoint returns the standard `{ success, data }` envelope. */ const fetchFromServer = useCallback(async ( sessionId: string, opts: { limit?: number | null; offset?: number; } = {}, ) => { const slot = getSlot(sessionId); slot.status = 'loading'; notify(sessionId); try { const params = new URLSearchParams(); if (opts.limit !== null && opts.limit !== undefined) { params.append('limit', String(opts.limit)); params.append('offset', String(opts.offset ?? 0)); } const qs = params.toString(); const url = `/api/providers/sessions/${encodeURIComponent(sessionId)}/messages${qs ? `?${qs}` : ''}`; const response = await authenticatedFetch(url); if (!response.ok) { throw new Error(`HTTP ${response.status}`); } const body = await response.json(); const data = body?.data ?? body; const messages: NormalizedMessage[] = data.messages || []; slot.serverMessages = messages; slot.total = data.total ?? messages.length; slot.hasMore = Boolean(data.hasMore); slot.offset = (opts.offset ?? 0) + messages.length; slot.fetchedAt = Date.now(); slot.status = 'idle'; recomputeMergedIfNeeded(slot); if (data.tokenUsage) { slot.tokenUsage = data.tokenUsage; } notify(sessionId); return slot; } catch (error) { console.error(`[SessionStore] fetch failed for ${sessionId}:`, error); slot.status = 'error'; notify(sessionId); return slot; } }, [getSlot, notify]); /** * Load older (paginated) messages and prepend to serverMessages. */ const fetchMore = useCallback(async ( sessionId: string, opts: { limit?: number; } = {}, ) => { const slot = getSlot(sessionId); if (!slot.hasMore) return slot; const params = new URLSearchParams(); const limit = opts.limit ?? 20; params.append('limit', String(limit)); params.append('offset', String(slot.offset)); const qs = params.toString(); const url = `/api/providers/sessions/${encodeURIComponent(sessionId)}/messages${qs ? `?${qs}` : ''}`; try { const response = await authenticatedFetch(url); if (!response.ok) throw new Error(`HTTP ${response.status}`); const body = await response.json(); const data = body?.data ?? body; const olderMessages: NormalizedMessage[] = data.messages || []; // Prepend older messages (they're earlier in the conversation) slot.serverMessages = [...olderMessages, ...slot.serverMessages]; slot.hasMore = Boolean(data.hasMore); slot.offset = slot.offset + olderMessages.length; recomputeMergedIfNeeded(slot); notify(sessionId); return slot; } catch (error) { console.error(`[SessionStore] fetchMore failed for ${sessionId}:`, error); return slot; } }, [getSlot, notify]); /** * Append a realtime (WebSocket) message to the correct session slot. * This works regardless of which session is actively viewed. */ const appendRealtime = useCallback((sessionId: string, msg: NormalizedMessage) => { const slot = getSlot(sessionId); const normalizedMessage = msg.sessionId === sessionId ? msg : { ...msg, sessionId }; let updated = [...slot.realtimeMessages, normalizedMessage]; if (updated.length > MAX_REALTIME_MESSAGES) { updated = updated.slice(-MAX_REALTIME_MESSAGES); } slot.realtimeMessages = updated; recomputeMergedIfNeeded(slot); notify(sessionId); }, [getSlot, notify]); /** * Append multiple realtime messages at once (batch). */ const appendRealtimeBatch = useCallback((sessionId: string, msgs: NormalizedMessage[]) => { if (msgs.length === 0) return; const slot = getSlot(sessionId); const normalizedMessages = msgs.map((msg) => msg.sessionId === sessionId ? msg : { ...msg, sessionId }, ); let updated = [...slot.realtimeMessages, ...normalizedMessages]; if (updated.length > MAX_REALTIME_MESSAGES) { updated = updated.slice(-MAX_REALTIME_MESSAGES); } slot.realtimeMessages = updated; recomputeMergedIfNeeded(slot); notify(sessionId); }, [getSlot, notify]); /** * Re-fetch serverMessages from the provider sessions endpoint. */ const refreshFromServer = useCallback(async ( sessionId: string, ) => { const slot = getSlot(sessionId); try { const url = `/api/providers/sessions/${encodeURIComponent(sessionId)}/messages`; const response = await authenticatedFetch(url); if (!response.ok) throw new Error(`HTTP ${response.status}`); const body = await response.json(); const data = body?.data ?? body; slot.serverMessages = data.messages || []; slot.total = data.total ?? slot.serverMessages.length; slot.hasMore = Boolean(data.hasMore); slot.fetchedAt = Date.now(); // Only drop realtime rows the server transcript now owns. A blind clear // here caused the chat pane to flash "Continue your conversation" after // `complete` while JSONL / provider_session_id indexing was still behind. slot.realtimeMessages = pruneRealtimeSupersededByServer( slot.serverMessages, slot.realtimeMessages, ); recomputeMergedIfNeeded(slot); notify(sessionId); } catch (error) { console.error(`[SessionStore] refresh failed for ${sessionId}:`, error); } }, [getSlot, notify]); /** * Update session status. */ const setStatus = useCallback((sessionId: string, status: SessionStatus) => { const slot = getSlot(sessionId); slot.status = status; notify(sessionId); }, [getSlot, notify]); /** * Check if a session's data is stale (>30s old). */ const isStale = useCallback((sessionId: string) => { const slot = storeRef.current.get(sessionId); if (!slot) return true; return Date.now() - slot.fetchedAt > STALE_THRESHOLD_MS; }, []); /** * Update or create a streaming message (accumulated text so far). * Uses a well-known ID so subsequent calls replace the same message. */ const updateStreaming = useCallback((sessionId: string, accumulatedText: string, msgProvider: LLMProvider) => { const slot = getSlot(sessionId); const streamId = `__streaming_${sessionId}`; const msg: NormalizedMessage = { id: streamId, sessionId, timestamp: new Date().toISOString(), provider: msgProvider, kind: 'stream_delta', content: accumulatedText, }; const idx = slot.realtimeMessages.findIndex(m => m.id === streamId); if (idx >= 0) { slot.realtimeMessages = [...slot.realtimeMessages]; slot.realtimeMessages[idx] = msg; } else { slot.realtimeMessages = [...slot.realtimeMessages, msg]; } recomputeMergedIfNeeded(slot); notify(sessionId); }, [getSlot, notify]); /** * Finalize streaming: convert the streaming message to a regular text message. * The well-known streaming ID is replaced with a unique text message ID. */ const finalizeStreaming = useCallback((sessionId: string) => { const slot = storeRef.current.get(sessionId); if (!slot) return; const streamId = `__streaming_${sessionId}`; const idx = slot.realtimeMessages.findIndex(m => m.id === streamId); if (idx >= 0) { const stream = slot.realtimeMessages[idx]; slot.realtimeMessages = [...slot.realtimeMessages]; slot.realtimeMessages[idx] = { ...stream, id: `text_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`, kind: 'text', role: 'assistant', }; recomputeMergedIfNeeded(slot); notify(sessionId); } }, [notify]); /** * Clear realtime messages for a session (e.g., after stream completes and server fetch catches up). */ const clearRealtime = useCallback((sessionId: string) => { const slot = storeRef.current.get(sessionId); if (slot) { slot.realtimeMessages = []; recomputeMergedIfNeeded(slot); notify(sessionId); } }, [notify]); /** * Get merged messages for a session (for rendering). */ const getMessages = useCallback((sessionId: string): NormalizedMessage[] => { return storeRef.current.get(sessionId)?.merged ?? []; }, []); /** * Get session slot (for status, pagination info, etc.). */ const getSessionSlot = useCallback((sessionId: string): SessionSlot | undefined => { return storeRef.current.get(sessionId); }, []); return useMemo(() => ({ getSlot, has, fetchFromServer, fetchMore, appendRealtime, appendRealtimeBatch, refreshFromServer, setActiveSession, setStatus, isStale, updateStreaming, finalizeStreaming, clearRealtime, getMessages, getSessionSlot, }), [ getSlot, has, fetchFromServer, fetchMore, appendRealtime, appendRealtimeBatch, refreshFromServer, setActiveSession, setStatus, isStale, updateStreaming, finalizeStreaming, clearRealtime, getMessages, getSessionSlot, ]); } export type SessionStore = ReturnType;