mirror of
https://github.com/siteboon/claudecodeui.git
synced 2026-06-13 01:22:06 +08:00
The frontend previously juggled placeholder IDs, provider-native IDs, and session_created handoffs, which caused race conditions and provider-specific branching. This introduces app-allocated session IDs, a chat run registry with event replay, delta sidebar updates, and one kind-based websocket contract so the UI can treat every provider the same while JSONL remains the source of truth.
371 lines
12 KiB
TypeScript
371 lines
12 KiB
TypeScript
import type { WebSocket } from 'ws';
|
|
|
|
import { sessionsDb } from '@/modules/database/index.js';
|
|
import { chatRunRegistry } from '@/modules/websocket/services/chat-run-registry.service.js';
|
|
import { connectedClients, WS_OPEN_STATE } from '@/modules/websocket/services/websocket-state.service.js';
|
|
import type {
|
|
AnyRecord,
|
|
AuthenticatedWebSocketRequest,
|
|
LLMProvider,
|
|
} from '@/shared/types.js';
|
|
import { parseIncomingJsonObject } from '@/shared/utils.js';
|
|
|
|
/**
|
|
* One provider runtime entry point. All five runtimes share this signature,
|
|
* which lets the chat handler dispatch through a provider-keyed map instead
|
|
* of provider-specific branches.
|
|
*/
|
|
type ProviderSpawnFn = (
|
|
command: string,
|
|
options: AnyRecord,
|
|
writer: unknown
|
|
) => Promise<unknown>;
|
|
|
|
type ChatWebSocketDependencies = {
|
|
/** Provider runtimes keyed by provider id. */
|
|
spawnFns: Record<LLMProvider, ProviderSpawnFn>;
|
|
/**
|
|
* Abort functions keyed by provider id. They are addressed with the
|
|
* provider-native session id (that is how runtimes key their process maps).
|
|
* The Claude abort is async; the rest are sync — both shapes are accepted.
|
|
*/
|
|
abortFns: Record<LLMProvider, (providerSessionId: string) => boolean | Promise<boolean>>;
|
|
resolveToolApproval: (
|
|
requestId: string,
|
|
payload: {
|
|
allow: boolean;
|
|
updatedInput?: unknown;
|
|
message?: string;
|
|
rememberEntry?: unknown;
|
|
}
|
|
) => void;
|
|
/** Claude-only today: pending tool approvals included in `chat_subscribed`. */
|
|
getPendingApprovalsForSession: (providerSessionId: string) => unknown[];
|
|
};
|
|
|
|
/**
|
|
* Extracts the authenticated request user id in the formats currently produced
|
|
* by platform and OSS auth code paths.
|
|
*/
|
|
function readRequestUserId(
|
|
request: AuthenticatedWebSocketRequest | undefined
|
|
): string | number | null {
|
|
const user = request?.user;
|
|
if (!user) {
|
|
return null;
|
|
}
|
|
|
|
if (typeof user.id === 'string' || typeof user.id === 'number') {
|
|
return user.id;
|
|
}
|
|
|
|
if (typeof user.userId === 'string' || typeof user.userId === 'number') {
|
|
return user.userId;
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
function sendJson(ws: WebSocket, payload: unknown): void {
|
|
if (ws.readyState === WS_OPEN_STATE) {
|
|
ws.send(JSON.stringify(payload));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reports a protocol-level failure to the requesting client.
|
|
*
|
|
* Protocol errors deliberately use their own `kind` (instead of the provider
|
|
* `error` message kind) so the frontend can distinguish "your request was
|
|
* invalid" from "the model run produced an error" without inspecting text.
|
|
*/
|
|
function sendProtocolError(
|
|
ws: WebSocket,
|
|
code: string,
|
|
error: string,
|
|
sessionId?: string
|
|
): void {
|
|
sendJson(ws, {
|
|
kind: 'protocol_error',
|
|
code,
|
|
error,
|
|
sessionId: sessionId ?? null,
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
}
|
|
|
|
function readRequiredSessionId(data: AnyRecord): string | null {
|
|
const sessionId = typeof data.sessionId === 'string' ? data.sessionId.trim() : '';
|
|
return sessionId.length > 0 ? sessionId : null;
|
|
}
|
|
|
|
/**
|
|
* Handles `chat.send`: resolves the session row (provider, project path, and
|
|
* provider-native id all come from the database — never from the client),
|
|
* registers the run, and dispatches to the provider runtime.
|
|
*/
|
|
async function handleChatSend(
|
|
ws: WebSocket,
|
|
userId: string | number | null,
|
|
data: AnyRecord,
|
|
dependencies: ChatWebSocketDependencies
|
|
): Promise<void> {
|
|
const sessionId = readRequiredSessionId(data);
|
|
if (!sessionId) {
|
|
sendProtocolError(ws, 'SESSION_ID_REQUIRED', 'chat.send requires a sessionId.');
|
|
return;
|
|
}
|
|
|
|
const session = sessionsDb.getSessionById(sessionId);
|
|
if (!session) {
|
|
sendProtocolError(
|
|
ws,
|
|
'SESSION_NOT_FOUND',
|
|
`Session "${sessionId}" was not found. Create it via POST /api/providers/sessions first.`,
|
|
sessionId
|
|
);
|
|
return;
|
|
}
|
|
|
|
const provider = session.provider as LLMProvider;
|
|
const spawnFn = dependencies.spawnFns[provider];
|
|
if (!spawnFn) {
|
|
sendProtocolError(ws, 'UNSUPPORTED_PROVIDER', `Provider "${provider}" is not available.`, sessionId);
|
|
return;
|
|
}
|
|
|
|
const run = chatRunRegistry.startRun({
|
|
appSessionId: sessionId,
|
|
provider,
|
|
providerSessionId: session.provider_session_id,
|
|
connection: ws,
|
|
userId,
|
|
});
|
|
|
|
if (!run) {
|
|
sendProtocolError(
|
|
ws,
|
|
'RUN_IN_PROGRESS',
|
|
`Session "${sessionId}" already has a run in progress.`,
|
|
sessionId
|
|
);
|
|
return;
|
|
}
|
|
|
|
const clientOptions = (data.options ?? {}) as AnyRecord;
|
|
const command = typeof data.content === 'string' ? data.content : '';
|
|
|
|
// The provider runtimes receive the provider-native session id (that is the
|
|
// id their CLI/SDK understands for resume). Brand-new sessions have no
|
|
// provider id yet, so the runtime starts fresh and announces one, which the
|
|
// gateway writer captures and maps back to the app session id.
|
|
const runtimeOptions: AnyRecord = {
|
|
...clientOptions,
|
|
sessionId: session.provider_session_id ?? undefined,
|
|
resume: Boolean(session.provider_session_id),
|
|
cwd: clientOptions.cwd ?? session.project_path ?? undefined,
|
|
projectPath: session.project_path ?? clientOptions.projectPath,
|
|
};
|
|
|
|
try {
|
|
await spawnFn(command, runtimeOptions, run.writer);
|
|
} catch (error) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
console.error(`[Chat] Provider runtime "${provider}" failed`, { sessionId, error: message });
|
|
} finally {
|
|
// Safety net: a runtime that crashed (or resolved) without emitting its
|
|
// terminal `complete` would otherwise leave the session stuck in
|
|
// "processing" forever on every connected client.
|
|
chatRunRegistry.completeRun(sessionId, { exitCode: 1 });
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handles `chat.abort`: cancels the run for one app session and emits the
|
|
* terminal `complete` on its behalf (runtimes skip their own complete for
|
|
* aborted runs, and the registry drops any duplicate).
|
|
*/
|
|
async function handleChatAbort(
|
|
ws: WebSocket,
|
|
data: AnyRecord,
|
|
dependencies: ChatWebSocketDependencies
|
|
): Promise<void> {
|
|
const sessionId = readRequiredSessionId(data);
|
|
if (!sessionId) {
|
|
sendProtocolError(ws, 'SESSION_ID_REQUIRED', 'chat.abort requires a sessionId.');
|
|
return;
|
|
}
|
|
|
|
const run = chatRunRegistry.getRun(sessionId);
|
|
if (!run || run.status !== 'running') {
|
|
sendProtocolError(ws, 'NO_ACTIVE_RUN', `Session "${sessionId}" has no active run.`, sessionId);
|
|
return;
|
|
}
|
|
|
|
const abortFn = dependencies.abortFns[run.provider];
|
|
let success = false;
|
|
if (abortFn && run.providerSessionId) {
|
|
success = Boolean(await abortFn(run.providerSessionId));
|
|
}
|
|
|
|
chatRunRegistry.completeRun(sessionId, {
|
|
exitCode: success ? 0 : 1,
|
|
aborted: true,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handles `chat.subscribe`: for each requested session, reports whether a run
|
|
* is processing, re-attaches the live stream to this socket, replays missed
|
|
* events (seq > lastSeq), and includes pending permission requests.
|
|
*
|
|
* This single message replaces the old `check-session-status`,
|
|
* `get-pending-permissions`, and Claude-only writer reconnect flows.
|
|
*/
|
|
function handleChatSubscribe(
|
|
ws: WebSocket,
|
|
data: AnyRecord,
|
|
dependencies: ChatWebSocketDependencies
|
|
): void {
|
|
const targets = Array.isArray(data.sessions) ? data.sessions : [];
|
|
|
|
for (const target of targets) {
|
|
if (!target || typeof target !== 'object') {
|
|
continue;
|
|
}
|
|
|
|
const sessionId = typeof (target as AnyRecord).sessionId === 'string'
|
|
? ((target as AnyRecord).sessionId as string).trim()
|
|
: '';
|
|
if (!sessionId) {
|
|
continue;
|
|
}
|
|
|
|
const lastSeqRaw = (target as AnyRecord).lastSeq;
|
|
const lastSeq = typeof lastSeqRaw === 'number' && Number.isFinite(lastSeqRaw)
|
|
? Math.max(0, Math.floor(lastSeqRaw))
|
|
: 0;
|
|
|
|
const run = chatRunRegistry.getRun(sessionId);
|
|
const isProcessing = chatRunRegistry.isProcessing(sessionId);
|
|
|
|
// Future live events for this run should land on the socket that asked —
|
|
// this is what makes mid-stream page refreshes work for all providers.
|
|
if (isProcessing) {
|
|
chatRunRegistry.attachConnection(sessionId, ws);
|
|
}
|
|
|
|
// Pending approvals are tracked under the provider-native id inside the
|
|
// Claude runtime; remap their sessionId so the client only sees app ids.
|
|
const pendingPermissions = (run?.providerSessionId
|
|
? dependencies.getPendingApprovalsForSession(run.providerSessionId)
|
|
: []
|
|
).map((approval) =>
|
|
approval && typeof approval === 'object'
|
|
? { ...(approval as AnyRecord), sessionId }
|
|
: approval,
|
|
);
|
|
|
|
sendJson(ws, {
|
|
kind: 'chat_subscribed',
|
|
sessionId,
|
|
isProcessing,
|
|
lastSeq: run?.lastSeq ?? 0,
|
|
pendingPermissions,
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
|
|
// Replay only for RUNNING runs, strictly after the ack. Completed runs
|
|
// are fully persisted to the provider transcript and served over REST —
|
|
// replaying them (e.g. after a page reload where the client's lastSeq is
|
|
// 0) would duplicate messages the history fetch already returned.
|
|
if (isProcessing) {
|
|
for (const event of chatRunRegistry.replayEvents(sessionId, lastSeq)) {
|
|
sendJson(ws, event);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handles `chat.permission-response`: forwards a tool-approval decision to the
|
|
* pending approval resolver (Claude is the only provider with interactive
|
|
* approvals today, but the message is intentionally provider-neutral).
|
|
*/
|
|
function handlePermissionResponse(data: AnyRecord, dependencies: ChatWebSocketDependencies): void {
|
|
if (typeof data.requestId !== 'string' || data.requestId.length === 0) {
|
|
return;
|
|
}
|
|
|
|
dependencies.resolveToolApproval(data.requestId, {
|
|
allow: Boolean(data.allow),
|
|
updatedInput: data.updatedInput,
|
|
message: typeof data.message === 'string' ? data.message : undefined,
|
|
rememberEntry: data.rememberEntry,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handles authenticated chat websocket messages used by the main chat panel.
|
|
*
|
|
* Inbound protocol (client to server):
|
|
* - `chat.send` { sessionId, content, options? }
|
|
* - `chat.abort` { sessionId }
|
|
* - `chat.subscribe` { sessions: [{ sessionId, lastSeq? }] }
|
|
* - `chat.permission-response` { requestId, allow, updatedInput?, message?, rememberEntry? }
|
|
*
|
|
* Outbound protocol (server to client): every frame is `kind`-based — either
|
|
* a provider `NormalizedMessage` (with `seq`) or a gateway event
|
|
* (`chat_subscribed`, `session_upserted`, `loading_progress`,
|
|
* `protocol_error`).
|
|
*/
|
|
export function handleChatConnection(
|
|
ws: WebSocket,
|
|
request: AuthenticatedWebSocketRequest,
|
|
dependencies: ChatWebSocketDependencies
|
|
): void {
|
|
console.log('[INFO] Chat WebSocket connected');
|
|
connectedClients.add(ws);
|
|
|
|
const userId = readRequestUserId(request);
|
|
|
|
ws.on('message', async (rawMessage) => {
|
|
try {
|
|
const parsed = parseIncomingJsonObject(rawMessage);
|
|
if (!parsed) {
|
|
throw new Error('Invalid websocket payload');
|
|
}
|
|
|
|
const data = parsed as AnyRecord;
|
|
const messageType = typeof data.type === 'string' ? data.type : '';
|
|
|
|
switch (messageType) {
|
|
case 'chat.send':
|
|
await handleChatSend(ws, userId, data, dependencies);
|
|
return;
|
|
case 'chat.abort':
|
|
await handleChatAbort(ws, data, dependencies);
|
|
return;
|
|
case 'chat.subscribe':
|
|
handleChatSubscribe(ws, data, dependencies);
|
|
return;
|
|
case 'chat.permission-response':
|
|
handlePermissionResponse(data, dependencies);
|
|
return;
|
|
default:
|
|
sendProtocolError(ws, 'UNKNOWN_MESSAGE_TYPE', `Unknown message type "${messageType}".`);
|
|
return;
|
|
}
|
|
} catch (error) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
console.error('[ERROR] Chat WebSocket error:', message);
|
|
sendProtocolError(ws, 'INTERNAL_ERROR', message);
|
|
}
|
|
});
|
|
|
|
ws.on('close', () => {
|
|
console.log('[INFO] Chat client disconnected');
|
|
connectedClients.delete(ws);
|
|
});
|
|
}
|