import type { WebSocket } from 'ws'; import { sessionsDb } from '@/modules/database/index.js'; import { chatRunRegistry } from '@/modules/websocket/services/chat-run-registry.service.js'; import { connectedClients, WS_OPEN_STATE } from '@/modules/websocket/services/websocket-state.service.js'; import type { AnyRecord, AuthenticatedWebSocketRequest, LLMProvider, } from '@/shared/types.js'; import { parseIncomingJsonObject } from '@/shared/utils.js'; /** * One provider runtime entry point. All five runtimes share this signature, * which lets the chat handler dispatch through a provider-keyed map instead * of provider-specific branches. */ type ProviderSpawnFn = ( command: string, options: AnyRecord, writer: unknown ) => Promise; type ChatWebSocketDependencies = { /** Provider runtimes keyed by provider id. */ spawnFns: Record; /** * Abort functions keyed by provider id. They are addressed with the * provider-native session id (that is how runtimes key their process maps). * The Claude abort is async; the rest are sync — both shapes are accepted. */ abortFns: Record boolean | Promise>; resolveToolApproval: ( requestId: string, payload: { allow: boolean; updatedInput?: unknown; message?: string; rememberEntry?: unknown; } ) => void; /** Claude-only today: pending tool approvals included in `chat_subscribed`. */ getPendingApprovalsForSession: (providerSessionId: string) => unknown[]; }; /** * Extracts the authenticated request user id in the formats currently produced * by platform and OSS auth code paths. */ function readRequestUserId( request: AuthenticatedWebSocketRequest | undefined ): string | number | null { const user = request?.user; if (!user) { return null; } if (typeof user.id === 'string' || typeof user.id === 'number') { return user.id; } if (typeof user.userId === 'string' || typeof user.userId === 'number') { return user.userId; } return null; } function sendJson(ws: WebSocket, payload: unknown): void { if (ws.readyState === WS_OPEN_STATE) { ws.send(JSON.stringify(payload)); } } /** * Reports a protocol-level failure to the requesting client. * * Protocol errors deliberately use their own `kind` (instead of the provider * `error` message kind) so the frontend can distinguish "your request was * invalid" from "the model run produced an error" without inspecting text. */ function sendProtocolError( ws: WebSocket, code: string, error: string, sessionId?: string ): void { sendJson(ws, { kind: 'protocol_error', code, error, sessionId: sessionId ?? null, timestamp: new Date().toISOString(), }); } function readRequiredSessionId(data: AnyRecord): string | null { const sessionId = typeof data.sessionId === 'string' ? data.sessionId.trim() : ''; return sessionId.length > 0 ? sessionId : null; } /** * Handles `chat.send`: resolves the session row (provider, project path, and * provider-native id all come from the database — never from the client), * registers the run, and dispatches to the provider runtime. */ async function handleChatSend( ws: WebSocket, userId: string | number | null, data: AnyRecord, dependencies: ChatWebSocketDependencies ): Promise { const sessionId = readRequiredSessionId(data); if (!sessionId) { sendProtocolError(ws, 'SESSION_ID_REQUIRED', 'chat.send requires a sessionId.'); return; } const session = sessionsDb.getSessionById(sessionId); if (!session) { sendProtocolError( ws, 'SESSION_NOT_FOUND', `Session "${sessionId}" was not found. Create it via POST /api/providers/sessions first.`, sessionId ); return; } const provider = session.provider as LLMProvider; const spawnFn = dependencies.spawnFns[provider]; if (!spawnFn) { sendProtocolError(ws, 'UNSUPPORTED_PROVIDER', `Provider "${provider}" is not available.`, sessionId); return; } const run = chatRunRegistry.startRun({ appSessionId: sessionId, provider, providerSessionId: session.provider_session_id, connection: ws, userId, }); if (!run) { sendProtocolError( ws, 'RUN_IN_PROGRESS', `Session "${sessionId}" already has a run in progress.`, sessionId ); return; } const clientOptions = (data.options ?? {}) as AnyRecord; const command = typeof data.content === 'string' ? data.content : ''; // The provider runtimes receive the provider-native session id (that is the // id their CLI/SDK understands for resume). Brand-new sessions have no // provider id yet, so the runtime starts fresh and announces one, which the // gateway writer captures and maps back to the app session id. const runtimeOptions: AnyRecord = { ...clientOptions, sessionId: session.provider_session_id ?? undefined, resume: Boolean(session.provider_session_id), cwd: clientOptions.cwd ?? session.project_path ?? undefined, projectPath: session.project_path ?? clientOptions.projectPath, }; try { await spawnFn(command, runtimeOptions, run.writer); } catch (error) { const message = error instanceof Error ? error.message : String(error); console.error(`[Chat] Provider runtime "${provider}" failed`, { sessionId, error: message }); } finally { // Safety net: a runtime that crashed (or resolved) without emitting its // terminal `complete` would otherwise leave the session stuck in // "processing" forever on every connected client. chatRunRegistry.completeRun(sessionId, { exitCode: 1 }); } } /** * Handles `chat.abort`: cancels the run for one app session and emits the * terminal `complete` on its behalf (runtimes skip their own complete for * aborted runs, and the registry drops any duplicate). */ async function handleChatAbort( ws: WebSocket, data: AnyRecord, dependencies: ChatWebSocketDependencies ): Promise { const sessionId = readRequiredSessionId(data); if (!sessionId) { sendProtocolError(ws, 'SESSION_ID_REQUIRED', 'chat.abort requires a sessionId.'); return; } const run = chatRunRegistry.getRun(sessionId); if (!run || run.status !== 'running') { sendProtocolError(ws, 'NO_ACTIVE_RUN', `Session "${sessionId}" has no active run.`, sessionId); return; } const abortFn = dependencies.abortFns[run.provider]; let success = false; if (abortFn && run.providerSessionId) { success = Boolean(await abortFn(run.providerSessionId)); } chatRunRegistry.completeRun(sessionId, { exitCode: success ? 0 : 1, aborted: true, }); } /** * Handles `chat.subscribe`: for each requested session, reports whether a run * is processing, re-attaches the live stream to this socket, replays missed * events (seq > lastSeq), and includes pending permission requests. * * This single message replaces the old `check-session-status`, * `get-pending-permissions`, and Claude-only writer reconnect flows. */ function handleChatSubscribe( ws: WebSocket, data: AnyRecord, dependencies: ChatWebSocketDependencies ): void { const targets = Array.isArray(data.sessions) ? data.sessions : []; for (const target of targets) { if (!target || typeof target !== 'object') { continue; } const sessionId = typeof (target as AnyRecord).sessionId === 'string' ? ((target as AnyRecord).sessionId as string).trim() : ''; if (!sessionId) { continue; } const lastSeqRaw = (target as AnyRecord).lastSeq; const lastSeq = typeof lastSeqRaw === 'number' && Number.isFinite(lastSeqRaw) ? Math.max(0, Math.floor(lastSeqRaw)) : 0; const run = chatRunRegistry.getRun(sessionId); const isProcessing = chatRunRegistry.isProcessing(sessionId); // Future live events for this run should land on the socket that asked — // this is what makes mid-stream page refreshes work for all providers. if (isProcessing) { chatRunRegistry.attachConnection(sessionId, ws); } // Pending approvals are tracked under the provider-native id inside the // Claude runtime; remap their sessionId so the client only sees app ids. const pendingPermissions = (run?.providerSessionId ? dependencies.getPendingApprovalsForSession(run.providerSessionId) : [] ).map((approval) => approval && typeof approval === 'object' ? { ...(approval as AnyRecord), sessionId } : approval, ); sendJson(ws, { kind: 'chat_subscribed', sessionId, isProcessing, lastSeq: run?.lastSeq ?? 0, pendingPermissions, timestamp: new Date().toISOString(), }); // Replay only for RUNNING runs, strictly after the ack. Completed runs // are fully persisted to the provider transcript and served over REST — // replaying them (e.g. after a page reload where the client's lastSeq is // 0) would duplicate messages the history fetch already returned. if (isProcessing) { for (const event of chatRunRegistry.replayEvents(sessionId, lastSeq)) { sendJson(ws, event); } } } } /** * Handles `chat.permission-response`: forwards a tool-approval decision to the * pending approval resolver (Claude is the only provider with interactive * approvals today, but the message is intentionally provider-neutral). */ function handlePermissionResponse(data: AnyRecord, dependencies: ChatWebSocketDependencies): void { if (typeof data.requestId !== 'string' || data.requestId.length === 0) { return; } dependencies.resolveToolApproval(data.requestId, { allow: Boolean(data.allow), updatedInput: data.updatedInput, message: typeof data.message === 'string' ? data.message : undefined, rememberEntry: data.rememberEntry, }); } /** * Handles authenticated chat websocket messages used by the main chat panel. * * Inbound protocol (client to server): * - `chat.send` { sessionId, content, options? } * - `chat.abort` { sessionId } * - `chat.subscribe` { sessions: [{ sessionId, lastSeq? }] } * - `chat.permission-response` { requestId, allow, updatedInput?, message?, rememberEntry? } * * Outbound protocol (server to client): every frame is `kind`-based — either * a provider `NormalizedMessage` (with `seq`) or a gateway event * (`chat_subscribed`, `session_upserted`, `loading_progress`, * `protocol_error`). */ export function handleChatConnection( ws: WebSocket, request: AuthenticatedWebSocketRequest, dependencies: ChatWebSocketDependencies ): void { console.log('[INFO] Chat WebSocket connected'); connectedClients.add(ws); const userId = readRequestUserId(request); ws.on('message', async (rawMessage) => { try { const parsed = parseIncomingJsonObject(rawMessage); if (!parsed) { throw new Error('Invalid websocket payload'); } const data = parsed as AnyRecord; const messageType = typeof data.type === 'string' ? data.type : ''; switch (messageType) { case 'chat.send': await handleChatSend(ws, userId, data, dependencies); return; case 'chat.abort': await handleChatAbort(ws, data, dependencies); return; case 'chat.subscribe': handleChatSubscribe(ws, data, dependencies); return; case 'chat.permission-response': handlePermissionResponse(data, dependencies); return; default: sendProtocolError(ws, 'UNKNOWN_MESSAGE_TYPE', `Unknown message type "${messageType}".`); return; } } catch (error) { const message = error instanceof Error ? error.message : String(error); console.error('[ERROR] Chat WebSocket error:', message); sendProtocolError(ws, 'INTERNAL_ERROR', message); } }); ws.on('close', () => { console.log('[INFO] Chat client disconnected'); connectedClients.delete(ws); }); }