feat(chat): unify session gateway with stable IDs and a single WS protocol

The frontend previously juggled placeholder IDs, provider-native IDs, and session_created handoffs, which caused race conditions and provider-specific branching. This introduces app-allocated session IDs, a chat run registry with event replay, delta sidebar updates, and one kind-based websocket contract so the UI can treat every provider the same while JSONL remains the source of truth.
This commit is contained in:
Haileyesus
2026-06-11 18:47:19 +03:00
parent 3d948217ef
commit f5eac2ec12
40 changed files with 2451 additions and 1226 deletions

View File

@@ -22,35 +22,24 @@ import { findAppRoot, getModuleDir } from './utils/runtime-paths.js';
import {
queryClaudeSDK,
abortClaudeSDKSession,
isClaudeSDKSessionActive,
getActiveClaudeSDKSessions,
resolveToolApproval,
getPendingApprovalsForSession,
reconnectSessionWriter,
} from './claude-sdk.js';
import {
spawnCursor,
abortCursorSession,
isCursorSessionActive,
getActiveCursorSessions,
} from './cursor-cli.js';
import {
queryCodex,
abortCodexSession,
isCodexSessionActive,
getActiveCodexSessions,
} from './openai-codex.js';
import {
spawnGemini,
abortGeminiSession,
isGeminiSessionActive,
getActiveGeminiSessions,
} from './gemini-cli.js';
import {
spawnOpenCode,
abortOpenCodeSession,
isOpenCodeSessionActive,
getActiveOpenCodeSessions,
} from './opencode-cli.js';
import sessionManager from './sessionManager.js';
import {
@@ -105,29 +94,22 @@ const wss = createWebSocketServer(server, {
authenticateWebSocket,
},
chat: {
queryClaudeSDK,
spawnCursor,
queryCodex,
spawnGemini,
spawnOpenCode,
abortClaudeSDKSession,
abortCursorSession,
abortCodexSession,
abortGeminiSession,
abortOpenCodeSession,
spawnFns: {
claude: queryClaudeSDK,
cursor: spawnCursor,
codex: queryCodex,
gemini: spawnGemini,
opencode: spawnOpenCode,
},
abortFns: {
claude: abortClaudeSDKSession,
cursor: abortCursorSession,
codex: abortCodexSession,
gemini: abortGeminiSession,
opencode: abortOpenCodeSession,
},
resolveToolApproval,
isClaudeSDKSessionActive,
isCursorSessionActive,
isCodexSessionActive,
isGeminiSessionActive,
isOpenCodeSessionActive,
reconnectSessionWriter,
getPendingApprovalsForSession,
getActiveClaudeSDKSessions,
getActiveCursorSessions,
getActiveCodexSessions,
getActiveGeminiSessions,
getActiveOpenCodeSessions,
},
shell: {
getSessionById: (sessionId) => sessionManager.getSession(sessionId),
@@ -1152,6 +1134,12 @@ app.get('/api/projects/:projectId/sessions/:sessionId/token-usage', authenticate
return res.status(400).json({ error: 'Invalid sessionId' });
}
// Provider artifacts on disk (JSONL file names, OpenCode sqlite rows)
// are keyed by the provider-native session id, while the caller sends
// the app-facing id. Resolve the mapping once for all branches below.
const sessionRow = sessionsDb.getSessionById(safeSessionId);
const providerNativeSessionId = sessionRow?.provider_session_id || safeSessionId;
// Handle Cursor sessions - they use SQLite and don't have token usage info
if (provider === 'cursor') {
return res.json({
@@ -1252,7 +1240,7 @@ app.get('/api/projects/:projectId/sessions/:sessionId/token-usage', authenticate
tokens_cache_write AS cacheWriteTokens
FROM session
WHERE id = ?
`).get(safeSessionId);
`).get(providerNativeSessionId);
if (!row) {
return res.status(404).json({ error: 'OpenCode session not found', sessionId: safeSessionId });
@@ -1293,7 +1281,7 @@ app.get('/api/projects/:projectId/sessions/:sessionId/token-usage', authenticate
if (entry.isDirectory()) {
const found = await findSessionFile(fullPath);
if (found) return found;
} else if (entry.name.includes(safeSessionId) && entry.name.endsWith('.jsonl')) {
} else if (entry.name.includes(providerNativeSessionId) && entry.name.endsWith('.jsonl')) {
return fullPath;
}
}
@@ -1377,12 +1365,19 @@ app.get('/api/projects/:projectId/sessions/:sessionId/token-usage', authenticate
const encodedPath = projectPath.replace(/[^a-zA-Z0-9-]/g, '-');
const projectDir = path.join(homeDir, '.claude', 'projects', encodedPath);
const jsonlPath = path.join(projectDir, `${safeSessionId}.jsonl`);
// Prefer the indexed transcript path (already produced by the trusted
// session synchronizer); fall back to the conventional location
// derived from the provider-native session id.
let jsonlPath = sessionRow?.jsonl_path;
if (!jsonlPath) {
jsonlPath = path.join(projectDir, `${providerNativeSessionId}.jsonl`);
// Constrain to projectDir
const rel = path.relative(path.resolve(projectDir), path.resolve(jsonlPath));
if (rel.startsWith('..') || path.isAbsolute(rel)) {
return res.status(400).json({ error: 'Invalid path' });
// Constrain the constructed path to projectDir (the id is
// caller-influenced in this fallback branch).
const rel = path.relative(path.resolve(projectDir), path.resolve(jsonlPath));
if (rel.startsWith('..') || path.isAbsolute(rel)) {
return res.status(400).json({ error: 'Invalid path' });
}
}
// Read and parse the JSONL file

View File

@@ -382,6 +382,25 @@ const rebuildSessionsTableWithProjectSchema = (db: Database): void => {
}
};
/**
* Adds the `provider_session_id` mapping column used by the session gateway.
*
* Rows that existed before this migration were always keyed directly by the
* provider-native session id, so backfilling `provider_session_id` with
* `session_id` keeps every legacy row resolvable through the new mapping.
*/
const addProviderSessionIdMapping = (db: Database): void => {
const sessionsTableInfo = getTableInfo(db, 'sessions');
const columnNames = sessionsTableInfo.map((column) => column.name);
addColumnToTableIfNotExists(db, 'sessions', columnNames, 'provider_session_id', 'TEXT');
db.exec(`
UPDATE sessions
SET provider_session_id = session_id
WHERE provider_session_id IS NULL
`);
};
const ensureProjectsForSessionPaths = (db: Database): void => {
if (!tableExists(db, 'sessions')) {
return;
@@ -428,9 +447,11 @@ export const runMigrations = (db: Database) => {
migrateLegacyWorkspaceTableIntoProjects(db);
rebuildSessionsTableWithProjectSchema(db);
migrateLegacySessionNames(db);
addProviderSessionIdMapping(db);
ensureProjectsForSessionPaths(db);
db.exec('CREATE INDEX IF NOT EXISTS idx_session_ids_lookup ON sessions(session_id)');
db.exec('CREATE INDEX IF NOT EXISTS idx_sessions_provider_session_id ON sessions(provider_session_id)');
db.exec('CREATE INDEX IF NOT EXISTS idx_sessions_project_path ON sessions(project_path)');
db.exec('CREATE INDEX IF NOT EXISTS idx_sessions_is_archived ON sessions(isArchived)');
db.exec('CREATE INDEX IF NOT EXISTS idx_projects_is_starred ON projects(isStarred)');

View File

@@ -5,6 +5,7 @@ import { normalizeProjectPath } from '@/shared/utils.js';
type SessionRow = {
session_id: string;
provider: string;
provider_session_id: string | null;
project_path: string | null;
jsonl_path: string | null;
custom_name: string | null;
@@ -13,10 +14,8 @@ type SessionRow = {
updated_at: string;
};
type SessionMetadataLookupRow = Pick<
SessionRow,
'session_id' | 'provider' | 'project_path' | 'jsonl_path' | 'custom_name' | 'isArchived' | 'created_at' | 'updated_at'
>;
const SESSION_ROW_COLUMNS =
'session_id, provider, provider_session_id, project_path, jsonl_path, custom_name, isArchived, created_at, updated_at';
function normalizeTimestamp(value?: string): string | null {
if (!value) return null;
@@ -35,8 +34,16 @@ function normalizeProjectPathForProvider(provider: string, projectPath: string):
}
export const sessionsDb = {
/**
* Upserts one session row discovered on disk by a provider synchronizer.
*
* The given id is the provider-native session id. Rows are keyed by
* `provider_session_id` so a session that was first created by the app
* (with an app-allocated `session_id`) is updated in place once its
* transcript shows up on disk, instead of producing a duplicate row.
*/
createSession(
sessionId: string,
providerSessionId: string,
provider: string,
projectPath: string,
customName?: string,
@@ -53,19 +60,54 @@ export const sessionsDb = {
// since it's a foreign key in the sessions table.
projectsDb.createProjectPath(normalizedProjectPath);
const existing = db
.prepare(
`SELECT session_id FROM sessions
WHERE provider_session_id = ? AND provider = ?
LIMIT 1`
)
.get(providerSessionId, provider) as { session_id: string } | undefined;
if (existing) {
db.prepare(
`UPDATE sessions SET
provider = ?,
updated_at = COALESCE(?, CURRENT_TIMESTAMP),
project_path = ?,
jsonl_path = ?,
isArchived = 0,
custom_name = COALESCE(?, custom_name)
WHERE session_id = ?`
).run(
provider,
updatedAtValue,
normalizedProjectPath,
jsonlPath ?? null,
customName ?? null,
existing.session_id
);
return existing.session_id;
}
// Sessions created outside the app (directly via the provider CLI) are
// keyed by the provider-native id for both columns. The ON CONFLICT path
// covers legacy rows that predate the provider_session_id mapping.
db.prepare(
`INSERT INTO sessions (session_id, provider, custom_name, project_path, jsonl_path, isArchived, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 0, COALESCE(?, CURRENT_TIMESTAMP), COALESCE(?, CURRENT_TIMESTAMP))
`INSERT INTO sessions (session_id, provider, provider_session_id, custom_name, project_path, jsonl_path, isArchived, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, 0, COALESCE(?, CURRENT_TIMESTAMP), COALESCE(?, CURRENT_TIMESTAMP))
ON CONFLICT(session_id) DO UPDATE SET
provider = excluded.provider,
provider_session_id = excluded.provider_session_id,
updated_at = excluded.updated_at,
project_path = excluded.project_path,
jsonl_path = excluded.jsonl_path,
isArchived = 0,
custom_name = COALESCE(excluded.custom_name, sessions.custom_name)`
).run(
sessionId,
providerSessionId,
provider,
providerSessionId,
customName ?? null,
normalizedProjectPath,
jsonlPath ?? null,
@@ -73,9 +115,77 @@ export const sessionsDb = {
updatedAtValue
);
return providerSessionId;
},
/**
* Inserts one app-allocated session row before any provider run happens.
*
* The session gateway uses this when the frontend starts a brand-new chat:
* `session_id` is the stable app-facing id, while `provider_session_id`
* stays NULL until the provider runtime announces its own id and
* `assignProviderSessionId` records the mapping.
*/
createAppSession(sessionId: string, provider: string, projectPath: string): string {
const db = getConnection();
const normalizedProjectPath = normalizeProjectPathForProvider(provider, projectPath);
projectsDb.createProjectPath(normalizedProjectPath);
db.prepare(
`INSERT INTO sessions (session_id, provider, provider_session_id, custom_name, project_path, jsonl_path, isArchived, created_at, updated_at)
VALUES (?, ?, NULL, NULL, ?, NULL, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`
).run(sessionId, provider, normalizedProjectPath);
return sessionId;
},
/**
* Records the provider-native session id for one app-allocated session.
*
* If the filesystem watcher indexed the provider transcript before this
* mapping was recorded (a duplicate row keyed by the provider id exists),
* the duplicate is merged into the app row: its transcript path and name
* are adopted and the duplicate row is removed. Runs in a transaction so
* the sidebar can never observe both rows at once.
*/
assignProviderSessionId(sessionId: string, providerSessionId: string): void {
const db = getConnection();
const merge = db.transaction(() => {
const duplicate = db
.prepare(
`SELECT ${SESSION_ROW_COLUMNS} FROM sessions
WHERE (session_id = ? OR provider_session_id = ?)
AND session_id <> ?
LIMIT 1`
)
.get(providerSessionId, providerSessionId, sessionId) as SessionRow | undefined;
if (duplicate) {
db.prepare('DELETE FROM sessions WHERE session_id = ?').run(duplicate.session_id);
db.prepare(
`UPDATE sessions SET
provider_session_id = ?,
jsonl_path = COALESCE(jsonl_path, ?),
custom_name = COALESCE(custom_name, ?),
updated_at = CURRENT_TIMESTAMP
WHERE session_id = ?`
).run(providerSessionId, duplicate.jsonl_path, duplicate.custom_name, sessionId);
return;
}
db.prepare(
`UPDATE sessions SET
provider_session_id = ?,
updated_at = CURRENT_TIMESTAMP
WHERE session_id = ?`
).run(providerSessionId, sessionId);
});
merge();
},
updateSessionCustomName(sessionId: string, customName: string): void {
const db = getConnection();
db.prepare(
@@ -85,17 +195,39 @@ export const sessionsDb = {
).run(customName, sessionId);
},
getSessionById(sessionId: string): SessionMetadataLookupRow | null {
getSessionById(sessionId: string): SessionRow | null {
const db = getConnection();
const row = db
.prepare(
`SELECT session_id, provider, project_path, jsonl_path, custom_name, isArchived, created_at, updated_at
`SELECT ${SESSION_ROW_COLUMNS}
FROM sessions
WHERE session_id = ?
ORDER BY updated_at DESC
LIMIT 1`
)
.get(sessionId) as SessionMetadataLookupRow | undefined;
.get(sessionId) as SessionRow | undefined;
return row ?? null;
},
/**
* Resolves one session row through the provider-native id.
*
* The filesystem watcher only knows provider ids (they come from transcript
* file names), so it uses this lookup to translate disk artifacts back to
* the app-facing session row before broadcasting sidebar updates.
*/
getSessionByProviderSessionId(providerSessionId: string): SessionRow | null {
const db = getConnection();
const row = db
.prepare(
`SELECT ${SESSION_ROW_COLUMNS}
FROM sessions
WHERE provider_session_id = ?
ORDER BY updated_at DESC
LIMIT 1`
)
.get(providerSessionId) as SessionRow | undefined;
return row ?? null;
},
@@ -104,7 +236,7 @@ export const sessionsDb = {
const db = getConnection();
return db
.prepare(
`SELECT session_id, provider, project_path, jsonl_path, custom_name, isArchived, created_at, updated_at
`SELECT ${SESSION_ROW_COLUMNS}
FROM sessions
WHERE isArchived = 0`
)
@@ -119,7 +251,7 @@ export const sessionsDb = {
const db = getConnection();
return db
.prepare(
`SELECT session_id, provider, project_path, jsonl_path, custom_name, isArchived, created_at, updated_at
`SELECT ${SESSION_ROW_COLUMNS}
FROM sessions
WHERE isArchived = 1
ORDER BY datetime(COALESCE(updated_at, created_at)) DESC, session_id DESC`
@@ -132,7 +264,7 @@ export const sessionsDb = {
const normalizedProjectPath = normalizeProjectPath(projectPath);
return db
.prepare(
`SELECT session_id, provider, project_path, jsonl_path, custom_name, isArchived, created_at, updated_at
`SELECT ${SESSION_ROW_COLUMNS}
FROM sessions
WHERE project_path = ?
AND isArchived = 0`
@@ -149,7 +281,7 @@ export const sessionsDb = {
const normalizedProjectPath = normalizeProjectPath(projectPath);
return db
.prepare(
`SELECT session_id, provider, project_path, jsonl_path, custom_name, isArchived, created_at, updated_at
`SELECT ${SESSION_ROW_COLUMNS}
FROM sessions
WHERE project_path = ?`
)
@@ -161,7 +293,7 @@ export const sessionsDb = {
const normalizedProjectPath = normalizeProjectPath(projectPath);
return db
.prepare(
`SELECT session_id, provider, project_path, jsonl_path, custom_name, isArchived, created_at, updated_at
`SELECT ${SESSION_ROW_COLUMNS}
FROM sessions
WHERE project_path = ?
AND isArchived = 0

View File

@@ -83,6 +83,12 @@ export const SESSIONS_TABLE_SCHEMA_SQL = `
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT NOT NULL,
provider TEXT NOT NULL DEFAULT 'claude',
-- The session id used by the provider CLI/SDK on disk (JSONL file name,
-- store.db folder, sqlite row id, ...). \`session_id\` is the stable
-- app-facing id that the frontend uses for the whole session lifetime;
-- \`provider_session_id\` is filled in once the provider announces its own
-- id mid-run, or equals \`session_id\` for sessions discovered on disk.
provider_session_id TEXT,
custom_name TEXT,
project_path TEXT,
jsonl_path TEXT,

View File

@@ -0,0 +1,108 @@
import assert from 'node:assert/strict';
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import path from 'node:path';
import test from 'node:test';
import { closeConnection } from '@/modules/database/connection.js';
import { initializeDatabase } from '@/modules/database/init-db.js';
import { sessionsDb } from '@/modules/database/repositories/sessions.db.js';
async function withIsolatedDatabase(runTest: () => void | Promise<void>): Promise<void> {
const previousDatabasePath = process.env.DATABASE_PATH;
const tempDirectory = await mkdtemp(path.join(tmpdir(), 'sessions-mapping-'));
const databasePath = path.join(tempDirectory, 'auth.db');
closeConnection();
process.env.DATABASE_PATH = databasePath;
await initializeDatabase();
try {
await runTest();
} finally {
closeConnection();
if (previousDatabasePath === undefined) {
delete process.env.DATABASE_PATH;
} else {
process.env.DATABASE_PATH = previousDatabasePath;
}
await rm(tempDirectory, { recursive: true, force: true });
}
}
test('disk-discovered sessions are keyed by the provider id for both columns', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createSession('provider-abc', 'claude', '/workspace/demo', 'From Disk');
const row = sessionsDb.getSessionById('provider-abc');
assert.equal(row?.session_id, 'provider-abc');
assert.equal(row?.provider_session_id, 'provider-abc');
const byProviderId = sessionsDb.getSessionByProviderSessionId('provider-abc');
assert.equal(byProviderId?.session_id, 'provider-abc');
});
});
test('app sessions get the provider id assigned without creating a duplicate row', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createAppSession('app-id-1', 'claude', '/workspace/demo');
sessionsDb.assignProviderSessionId('app-id-1', 'provider-xyz');
// A later synchronizer pass that discovers the transcript on disk must
// update the app row in place instead of inserting a provider-keyed row.
const returnedId = sessionsDb.createSession(
'provider-xyz',
'claude',
'/workspace/demo',
'Synced Name',
undefined,
undefined,
'/fake/path/provider-xyz.jsonl',
);
assert.equal(returnedId, 'app-id-1');
assert.equal(sessionsDb.getAllSessions().length, 1);
const row = sessionsDb.getSessionById('app-id-1');
assert.equal(row?.provider_session_id, 'provider-xyz');
assert.equal(row?.jsonl_path, '/fake/path/provider-xyz.jsonl');
});
});
test('assignProviderSessionId merges a watcher-created duplicate into the app row', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createAppSession('app-id-2', 'codex', '/workspace/demo');
// Simulate the race: the filesystem watcher indexed the provider
// transcript before the runtime announced its session id to the gateway.
sessionsDb.createSession(
'provider-race',
'codex',
'/workspace/demo',
'Watcher Name',
undefined,
undefined,
'/fake/provider-race.jsonl',
);
assert.equal(sessionsDb.getAllSessions().length, 2);
sessionsDb.assignProviderSessionId('app-id-2', 'provider-race');
const rows = sessionsDb.getAllSessions();
assert.equal(rows.length, 1);
assert.equal(rows[0]?.session_id, 'app-id-2');
assert.equal(rows[0]?.provider_session_id, 'provider-race');
// Transcript path and name from the duplicate are adopted.
assert.equal(rows[0]?.jsonl_path, '/fake/provider-race.jsonl');
assert.equal(rows[0]?.custom_name, 'Watcher Name');
});
});
test('legacy provider-keyed rows stay resolvable through both lookups', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createSession('legacy-1', 'gemini', '/workspace/demo');
assert.equal(sessionsDb.getSessionById('legacy-1')?.provider, 'gemini');
assert.equal(sessionsDb.getSessionByProviderSessionId('legacy-1')?.session_id, 'legacy-1');
});
});

View File

@@ -189,10 +189,11 @@ function readProjectSessionsPageByPath(
};
}
// Broadcast progress to all connected WebSocket clients
// Broadcast progress to all connected WebSocket clients.
// Uses the unified `kind` envelope like every other websocket frame.
function broadcastProgress(progress: ProgressUpdate) {
const message = JSON.stringify({
type: 'loading_progress',
kind: 'loading_progress',
...progress,
});

View File

@@ -111,7 +111,10 @@ export class ClaudeSessionSynchronizer implements IProviderSessionSynchronizer {
return null;
}
const existingSession = sessionsDb.getSessionById(parsed.sessionId);
// App-created sessions are keyed by an app id, so disk-discovered provider
// ids must be resolved through the provider-id mapping first.
const existingSession = sessionsDb.getSessionByProviderSessionId(parsed.sessionId)
?? sessionsDb.getSessionById(parsed.sessionId);
const existingSessionName = existingSession?.custom_name;
if (existingSessionName && existingSessionName !== 'Untitled Claude Session') {
return {

View File

@@ -5,7 +5,7 @@ import readline from 'node:readline';
import type { IProviderSessions } from '@/shared/interfaces.js';
import type { AnyRecord, FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js';
import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js';
import { createNormalizedMessage, generateMessageId, readObjectRecord, sliceTailPage } from '@/shared/utils.js';
import { sessionsDb } from '@/modules/database/index.js';
const PROVIDER = 'claude';
@@ -103,10 +103,13 @@ async function parseAgentTools(filePath: string): Promise<AnyRecord[]> {
async function getSessionMessages(
sessionId: string,
providerSessionId: string,
limit: number | null,
offset: number,
): Promise<ClaudeHistoryMessagesResult> {
try {
// The DB row is keyed by the app-facing session id, while the JSONL rows
// on disk carry the provider-native id — both ids are needed here.
const jsonLPath = sessionsDb.getSessionById(sessionId)?.jsonl_path;
if (!jsonLPath) {
@@ -133,7 +136,7 @@ async function getSessionMessages(
try {
const entry = JSON.parse(line) as AnyRecord;
if (entry.sessionId === sessionId) {
if (entry.sessionId === providerSessionId) {
messages.push(entry);
}
} catch {
@@ -553,12 +556,13 @@ export class ClaudeSessionsProvider implements IProviderSessions {
options: FetchHistoryOptions = {},
): Promise<FetchHistoryResult> {
const { limit = null, offset = 0 } = options;
const providerSessionId = options.providerSessionId ?? sessionId;
let result: ClaudeHistoryResult;
try {
// Load full history first so `total` reflects frontend-normalized messages,
// not raw JSONL records.
result = await getSessionMessages(sessionId, null, 0);
result = await getSessionMessages(sessionId, providerSessionId, null, 0);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.warn(`[ClaudeProvider] Failed to load session ${sessionId}:`, message);
@@ -606,7 +610,6 @@ export class ClaudeSessionsProvider implements IProviderSessions {
}
}
const totalNormalized = normalized.length;
let total = 0;
for (const msg of normalized) {
if (msg.kind !== 'tool_result') {
@@ -615,18 +618,10 @@ export class ClaudeSessionsProvider implements IProviderSessions {
}
const normalizedOffset = Math.max(0, offset);
const normalizedLimit = limit === null ? null : Math.max(0, limit);
const messages = normalizedLimit === null
? normalized
: normalized.slice(
Math.max(0, totalNormalized - normalizedOffset - normalizedLimit),
Math.max(0, totalNormalized - normalizedOffset),
);
const hasMore = normalizedLimit === null
? false
: Math.max(0, totalNormalized - normalizedOffset - normalizedLimit) > 0;
const { page, hasMore } = sliceTailPage(normalized, normalizedLimit, normalizedOffset);
return {
messages,
messages: page,
total,
hasMore,
offset: normalizedOffset,

View File

@@ -43,11 +43,12 @@ export class CodexSessionSynchronizer implements IProviderSessionSynchronizer {
continue;
}
const existingSession = sessionsDb.getSessionById(parsed.sessionId);
const existingSession = sessionsDb.getSessionByProviderSessionId(parsed.sessionId)
?? sessionsDb.getSessionById(parsed.sessionId);
if (existingSession) {
// If session name is untitled and we now have a name, update it
if (existingSession.custom_name === 'Untitled Codex Session' && parsed.sessionName && parsed.sessionName !== 'Untitled Codex Session') {
sessionsDb.updateSessionCustomName(parsed.sessionId, parsed.sessionName);
sessionsDb.updateSessionCustomName(existingSession.session_id, parsed.sessionName);
}
}
@@ -120,7 +121,10 @@ export class CodexSessionSynchronizer implements IProviderSessionSynchronizer {
return null;
}
const existingSession = sessionsDb.getSessionById(parsed.sessionId);
// App-created sessions are keyed by an app id, so disk-discovered provider
// ids must be resolved through the provider-id mapping first.
const existingSession = sessionsDb.getSessionByProviderSessionId(parsed.sessionId)
?? sessionsDb.getSessionById(parsed.sessionId);
const existingSessionName = existingSession?.custom_name;
if (existingSessionName && existingSessionName !== 'Untitled Codex Session') {
return {

View File

@@ -4,7 +4,7 @@ import readline from 'node:readline';
import { sessionsDb } from '@/modules/database/index.js';
import type { IProviderSessions } from '@/shared/interfaces.js';
import type { AnyRecord, FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js';
import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js';
import { createNormalizedMessage, generateMessageId, readObjectRecord, sliceTailPage } from '@/shared/utils.js';
const PROVIDER = 'codex';
@@ -552,7 +552,6 @@ export class CodexSessionsProvider implements IProviderSessions {
}
}
const totalNormalized = normalized.length;
let total = 0;
for (const msg of normalized) {
if (msg.kind !== 'tool_result') {
@@ -561,18 +560,10 @@ export class CodexSessionsProvider implements IProviderSessions {
}
const normalizedOffset = Math.max(0, offset);
const normalizedLimit = limit === null ? null : Math.max(0, limit);
const messages = normalizedLimit === null
? normalized
: normalized.slice(
Math.max(0, totalNormalized - normalizedOffset - normalizedLimit),
Math.max(0, totalNormalized - normalizedOffset),
);
const hasMore = normalizedLimit === null
? false
: Math.max(0, totalNormalized - normalizedOffset - normalizedLimit) > 0;
const { page, hasMore } = sliceTailPage(normalized, normalizedLimit, normalizedOffset);
return {
messages,
messages: page,
total,
hasMore,
offset: normalizedOffset,

View File

@@ -9,6 +9,7 @@ import {
generateMessageId,
readObjectRecord,
sanitizeLeafDirectoryName,
sliceTailPage,
} from '@/shared/utils.js';
const PROVIDER = 'cursor';
@@ -363,42 +364,32 @@ export class CursorSessionsProvider implements IProviderSessions {
/**
* Fetches and paginates Cursor session history from its project-scoped store.db.
*
* Pagination follows the shared tail contract (`sliceTailPage`): offset 0 is
* the most recent page, matching every other provider.
*/
async fetchHistory(
sessionId: string,
options: FetchHistoryOptions = {},
): Promise<FetchHistoryResult> {
const { projectPath = '', limit = null, offset = 0 } = options;
// The store.db folder on disk is named after the provider-native id, not
// the app-facing session id this method is addressed with.
const providerSessionId = options.providerSessionId ?? sessionId;
try {
const blobs = await this.loadCursorBlobs(sessionId, projectPath);
const blobs = await this.loadCursorBlobs(providerSessionId, projectPath);
const allNormalized = this.normalizeCursorBlobs(blobs, sessionId);
const renderableMessages = allNormalized.filter((msg) => msg.kind !== 'tool_result');
const total = renderableMessages.length;
if (limit !== null) {
const start = offset;
const page = limit === 0
? []
: renderableMessages.slice(start, start + limit);
const hasMore = limit === 0
? start < total
: start + limit < total;
return {
messages: page,
total,
hasMore,
offset,
limit,
};
}
const { page, hasMore } = sliceTailPage(renderableMessages, limit, offset);
return {
messages: renderableMessages,
messages: page,
total,
hasMore: false,
offset: 0,
limit: null,
hasMore,
offset,
limit,
};
} catch (error) {
const message = error instanceof Error ? error.message : String(error);

View File

@@ -5,7 +5,7 @@ import readline from 'node:readline';
import { sessionsDb } from '@/modules/database/index.js';
import type { IProviderSessions } from '@/shared/interfaces.js';
import type { AnyRecord, FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js';
import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js';
import { createNormalizedMessage, generateMessageId, readObjectRecord, sliceTailPage } from '@/shared/utils.js';
const PROVIDER = 'gemini';
@@ -518,9 +518,9 @@ export class GeminiSessionsProvider implements IProviderSessions {
const start = Math.max(0, offset);
const pageLimit = limit === null ? null : Math.max(0, limit);
const messages = pageLimit === null
? normalized.slice(start)
: normalized.slice(start, start + pageLimit);
// Tail pagination via the shared contract: offset 0 returns the most
// recent page, matching every other provider.
const { page, hasMore } = sliceTailPage(normalized, pageLimit, start);
let total = 0;
for (const msg of normalized) {
if (msg.kind !== 'tool_result') {
@@ -529,9 +529,9 @@ export class GeminiSessionsProvider implements IProviderSessions {
}
return {
messages,
messages: page,
total,
hasMore: pageLimit === null ? false : start + pageLimit < normalized.length,
hasMore,
offset: start,
limit: pageLimit,
tokenUsage: result.tokenUsage,

View File

@@ -112,7 +112,10 @@ export class OpenCodeSessionSynchronizer implements IProviderSessionSynchronizer
}
const fallbackTitle = 'Untitled OpenCode Session';
const existingSession = sessionsDb.getSessionById(sessionId);
// App-created sessions are keyed by an app id, so disk-discovered provider
// ids must be resolved through the provider-id mapping first.
const existingSession = sessionsDb.getSessionByProviderSessionId(sessionId)
?? sessionsDb.getSessionById(sessionId);
const existingName = existingSession?.custom_name;
const nextName = existingName && existingName !== fallbackTitle
? existingName

View File

@@ -12,6 +12,7 @@ import {
readObjectRecord,
readJsonRecord,
readOptionalString,
sliceTailPage,
} from '@/shared/utils.js';
const PROVIDER = 'opencode';
@@ -325,6 +326,9 @@ export class OpenCodeSessionsProvider implements IProviderSessions {
options: FetchHistoryOptions = {},
): Promise<FetchHistoryResult> {
const { limit = null, offset = 0 } = options;
// OpenCode's shared sqlite database keys messages by the provider-native
// session id, not the app-facing id this method is addressed with.
const providerSessionId = options.providerSessionId ?? sessionId;
const db = openOpenCodeDatabase();
if (!db) {
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
@@ -349,27 +353,20 @@ export class OpenCodeSessionsProvider implements IProviderSessions {
m.id,
COALESCE(p.time_created, 0),
p.id
`).all(sessionId) as OpenCodeHistoryRow[];
`).all(providerSessionId) as OpenCodeHistoryRow[];
const normalized = this.normalizeHistoryRows(rows, sessionId);
const tokenUsage = aggregateOpenCodeSessionTokenUsage(db, sessionId);
const tokenUsage = aggregateOpenCodeSessionTokenUsage(db, providerSessionId);
const normalizedOffset = Math.max(0, offset);
const normalizedLimit = limit === null ? null : Math.max(0, limit);
const total = normalized.length;
const messages = normalizedLimit === null
? normalized
: normalized.slice(
Math.max(0, total - normalizedOffset - normalizedLimit),
Math.max(0, total - normalizedOffset),
);
const { page, hasMore } = sliceTailPage(normalized, normalizedLimit, normalizedOffset);
return {
messages,
messages: page,
total,
hasMore: normalizedLimit === null
? false
: Math.max(0, total - normalizedOffset - normalizedLimit) > 0,
hasMore,
offset: normalizedOffset,
limit: normalizedLimit,
tokenUsage,

View File

@@ -1,6 +1,7 @@
import express, { type Request, type Response } from 'express';
import { providerAuthService } from '@/modules/providers/services/provider-auth.service.js';
import { providerCapabilitiesService } from '@/modules/providers/services/provider-capabilities.service.js';
import { providerMcpService } from '@/modules/providers/services/mcp.service.js';
import { providerModelsService } from '@/modules/providers/services/provider-models.service.js';
import { providerSkillsService } from '@/modules/providers/services/skills.service.js';
@@ -382,7 +383,43 @@ router.post(
}),
);
router.get(
'/capabilities',
asyncHandler(async (_req: Request, res: Response) => {
res.json(createApiSuccessResponse({
providers: providerCapabilitiesService.listAllProviderCapabilities(),
}));
}),
);
router.get(
'/:provider/capabilities',
asyncHandler(async (req: Request, res: Response) => {
const provider = parseProvider(req.params.provider);
res.json(createApiSuccessResponse(
providerCapabilitiesService.getProviderCapabilities(provider),
));
}),
);
// ----------------- Session routes -----------------
/**
* Session gateway entry point: allocates the stable app-facing session id for
* a brand-new chat. The frontend must call this before the first `chat.send`
* so the session id in the URL, the store, and the websocket all agree from
* the very first message — there is no client-visible session-id handoff.
*/
router.post(
'/sessions',
asyncHandler(async (req: Request, res: Response) => {
const body = (req.body ?? {}) as Record<string, unknown>;
const provider = parseProvider(body.provider);
const projectPath = typeof body.projectPath === 'string' ? body.projectPath : '';
const result = sessionsService.createAppSession(provider, projectPath);
res.status(201).json(createApiSuccessResponse(result));
}),
);
router.get(
'/sessions/archived',
asyncHandler(async (_req: Request, res: Response) => {
@@ -459,7 +496,7 @@ router.get(
limit,
offset,
});
res.json(result);
res.json(createApiSuccessResponse(result));
}),
);

View File

@@ -0,0 +1,91 @@
import type { LLMProvider } from '@/shared/types.js';
/**
* Static, backend-owned description of what one provider integration supports.
*
* The frontend renders its composer UI (permission mode picker, image upload,
* abort button, ...) purely from this shape, which is what keeps the frontend
* free of per-provider conditionals. New provider features should be exposed
* here instead of branching on the provider id in React components.
*/
type ProviderCapabilities = {
provider: LLMProvider;
/** Permission modes the provider runtime understands, in cycle order. */
permissionModes: string[];
defaultPermissionMode: string;
/** Whether image attachments can be included in a chat.send. */
supportsImages: boolean;
/** Whether an in-flight run can be cancelled via chat.abort. */
supportsAbort: boolean;
/** Whether interactive tool permission prompts can reach the UI. */
supportsPermissionRequests: boolean;
/** Whether the token-usage endpoint has data for this provider. */
supportsTokenUsage: boolean;
};
/**
* The capability matrix mirrors what each runtime actually implements today:
* - permission modes match the option sets accepted by each CLI/SDK.
* - only the Claude SDK integration surfaces interactive permission requests.
* - Cursor has no token usage endpoint support (its store.db has no usage rows).
*/
const PROVIDER_CAPABILITIES: Record<LLMProvider, ProviderCapabilities> = {
claude: {
provider: 'claude',
permissionModes: ['default', 'auto', 'acceptEdits', 'bypassPermissions', 'plan'],
defaultPermissionMode: 'default',
supportsImages: true,
supportsAbort: true,
supportsPermissionRequests: true,
supportsTokenUsage: true,
},
cursor: {
provider: 'cursor',
permissionModes: ['default', 'acceptEdits', 'bypassPermissions', 'plan'],
defaultPermissionMode: 'default',
supportsImages: false,
supportsAbort: true,
supportsPermissionRequests: false,
supportsTokenUsage: false,
},
codex: {
provider: 'codex',
permissionModes: ['default', 'acceptEdits', 'bypassPermissions'],
defaultPermissionMode: 'default',
supportsImages: false,
supportsAbort: true,
supportsPermissionRequests: false,
supportsTokenUsage: true,
},
gemini: {
provider: 'gemini',
permissionModes: ['default', 'acceptEdits', 'bypassPermissions', 'plan'],
defaultPermissionMode: 'default',
supportsImages: false,
supportsAbort: true,
supportsPermissionRequests: false,
supportsTokenUsage: true,
},
opencode: {
provider: 'opencode',
permissionModes: ['default'],
defaultPermissionMode: 'default',
supportsImages: false,
supportsAbort: true,
supportsPermissionRequests: false,
supportsTokenUsage: true,
},
};
/**
* Application service exposing the provider capability matrix.
*/
export const providerCapabilitiesService = {
getProviderCapabilities(provider: LLMProvider): ProviderCapabilities {
return PROVIDER_CAPABILITIES[provider];
},
listAllProviderCapabilities(): ProviderCapabilities[] {
return Object.values(PROVIDER_CAPABILITIES);
},
};

View File

@@ -4,10 +4,11 @@ import { promises as fsPromises } from 'node:fs';
import chokidar, { type FSWatcher } from 'chokidar';
import { projectsDb, sessionsDb } from '@/modules/database/index.js';
import { sessionSynchronizerService } from '@/modules/providers/services/session-synchronizer.service.js';
import { WS_OPEN_STATE, connectedClients } from '@/modules/websocket/index.js';
import type { LLMProvider } from '@/shared/types.js';
import { getProjectsWithSessions } from '@/modules/projects/index.js';
import { generateDisplayName } from '@/modules/projects/index.js';
type WatcherEventType = 'add' | 'change';
@@ -58,6 +59,11 @@ const watchers: FSWatcher[] = [];
type PendingWatcherUpdate = {
providers: Set<LLMProvider>;
changeTypes: Set<WatcherEventType>;
/**
* Provider-native session ids reported by the synchronizers. They are
* translated back to app-facing session rows at flush time, because the
* transcript file names on disk only ever contain provider ids.
*/
updatedSessionIds: Set<string>;
};
@@ -131,6 +137,50 @@ function queuePendingWatcherUpdate(
schedulePendingWatcherFlush();
}
/**
* Builds one `session_upserted` delta event for a provider-native session id.
*
* The event carries everything a sidebar needs to upsert the session in place
* (session summary plus owning-project metadata), so clients never need a full
* project-list refetch when a transcript file changes on disk. Returns `null`
* when the id cannot be resolved to an indexed session row.
*/
async function buildSessionUpsertedEvent(updatedProviderSessionId: string): Promise<string | null> {
const row = sessionsDb.getSessionByProviderSessionId(updatedProviderSessionId)
?? sessionsDb.getSessionById(updatedProviderSessionId);
if (!row || row.isArchived) {
return null;
}
const projectPath = row.project_path;
const project = projectPath ? projectsDb.getProjectPath(projectPath) : null;
const displayName = project?.custom_project_name?.trim()
? project.custom_project_name
: await generateDisplayName(path.basename(projectPath ?? '') || (projectPath ?? ''), projectPath);
return JSON.stringify({
kind: 'session_upserted',
sessionId: row.session_id,
provider: row.provider,
session: {
id: row.session_id,
summary: row.custom_name || '',
messageCount: 0,
lastActivity: row.updated_at ?? row.created_at ?? new Date().toISOString(),
},
project: project
? {
projectId: project.project_id,
path: project.project_path,
fullPath: project.project_path,
displayName,
isStarred: Boolean(project.isStarred),
}
: null,
timestamp: new Date().toISOString(),
});
}
async function flushPendingWatcherUpdate(): Promise<void> {
clearPendingWatcherFlushTimer();
@@ -149,33 +199,29 @@ async function flushPendingWatcherUpdate(): Promise<void> {
watcherRefreshInFlight = true;
try {
const updatedProjects = await getProjectsWithSessions({ skipSynchronization: true });
const changeTypes = Array.from(queuedUpdate.changeTypes);
const watchProviders = Array.from(queuedUpdate.providers);
const updatedSessionIds = Array.from(queuedUpdate.updatedSessionIds);
// Backward-compatible fields stay populated with the first queued values.
const updateMessage = JSON.stringify({
type: 'projects_updated',
projects: updatedProjects,
timestamp: new Date().toISOString(),
changeType: changeTypes[0] ?? 'change',
updatedSessionId: updatedSessionIds[0] ?? undefined,
watchProvider: watchProviders[0] ?? undefined,
changeTypes,
updatedSessionIds,
watchProviders,
batched: true,
});
connectedClients.forEach(client => {
if (client.readyState === WS_OPEN_STATE) {
client.send(updateMessage);
// Per-session deltas instead of full project snapshots: an upsert of one
// session can never clobber unrelated client state, so the frontend needs
// no "suppress updates while a run is active" protection logic.
const events: string[] = [];
for (const updatedSessionId of queuedUpdate.updatedSessionIds) {
const event = await buildSessionUpsertedEvent(updatedSessionId);
if (event) {
events.push(event);
}
});
}
if (events.length > 0) {
connectedClients.forEach(client => {
if (client.readyState === WS_OPEN_STATE) {
for (const event of events) {
client.send(event);
}
}
});
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error('Session watcher refresh failed while broadcasting projects_updated', { error: message });
console.error('Session watcher refresh failed while broadcasting session_upserted', { error: message });
} finally {
watcherRefreshInFlight = false;

View File

@@ -1,3 +1,4 @@
import { randomUUID } from 'node:crypto';
import fsp from 'node:fs/promises';
import path from 'node:path';
@@ -11,6 +12,12 @@ import type {
} from '@/shared/types.js';
import { AppError } from '@/shared/utils.js';
type CreateAppSessionResult = {
sessionId: string;
provider: LLMProvider;
projectPath: string;
};
type ArchivedSessionListItem = {
sessionId: string;
provider: LLMProvider;
@@ -89,12 +96,43 @@ export const sessionsService = {
},
/**
* Fetches persisted history by session id.
* Allocates a stable app-facing session id before any provider run happens.
*
* This is the entry point of the session gateway: the frontend calls this
* (via `POST /api/providers/sessions`) when the user starts a brand-new
* chat, navigates to the returned id immediately, and the id never changes
* for the lifetime of the conversation. The provider-native id is mapped to
* this row later, when the provider runtime announces it mid-run.
*/
createAppSession(provider: LLMProvider, projectPath: string): CreateAppSessionResult {
const normalizedProjectPath = projectPath.trim();
if (!normalizedProjectPath) {
throw new AppError('projectPath is required.', {
code: 'PROJECT_PATH_REQUIRED',
statusCode: 400,
});
}
const sessionId = randomUUID();
sessionsDb.createAppSession(sessionId, provider, normalizedProjectPath);
return {
sessionId,
provider,
projectPath: normalizedProjectPath,
};
},
/**
* Fetches persisted history by app session id.
*
* Provider and provider-specific lookup hints are resolved from the indexed
* session metadata in the database.
* session metadata in the database. The provider adapter receives the
* provider-native session id (the one written into transcripts on disk),
* and every returned message is remapped back to the app session id so
* provider ids never reach the frontend.
*/
fetchHistory(
async fetchHistory(
sessionId: string,
options: Pick<FetchHistoryOptions, 'limit' | 'offset'> = {},
): Promise<FetchHistoryResult> {
@@ -106,12 +144,33 @@ export const sessionsService = {
});
}
// App-created sessions that never produced a provider transcript yet
// (e.g. first message still streaming) simply have no history.
if (!session.provider_session_id) {
return {
messages: [],
total: 0,
hasMore: false,
offset: options.offset ?? 0,
limit: options.limit ?? null,
};
}
const provider = session.provider as LLMProvider;
return providerRegistry.resolveProvider(provider).sessions.fetchHistory(sessionId, {
const result = await providerRegistry.resolveProvider(provider).sessions.fetchHistory(sessionId, {
limit: options.limit ?? null,
offset: options.offset ?? 0,
projectPath: session.project_path ?? '',
providerSessionId: session.provider_session_id,
});
return {
...result,
messages: result.messages.map((message) => ({
...message,
sessionId,
})),
};
},
/**

View File

@@ -33,10 +33,12 @@ Benefits:
|---|---|
| `services/websocket-server.service.ts` | Creates `WebSocketServer`, binds `verifyClient`, routes connection by pathname |
| `services/websocket-auth.service.ts` | Authenticates upgrade requests and attaches `request.user` |
| `services/chat-websocket.service.ts` | Handles `/ws` chat protocol and provider command/session control messages |
| `services/chat-websocket.service.ts` | Handles the `/ws` chat protocol (`chat.send` / `chat.abort` / `chat.subscribe` / `chat.permission-response`) |
| `services/chat-run-registry.service.ts` | Tracks live provider runs per app session id: seq numbering, event replay buffer, provider-id mapping, completion state |
| `services/chat-session-writer.service.ts` | Gateway writer handed to provider runtimes: remaps provider session ids to app ids, swallows `session_created`, assigns `seq` |
| `services/shell-websocket.service.ts` | Handles `/shell` PTY lifecycle, reconnect buffering, auth URL detection |
| `services/plugin-websocket-proxy.service.ts` | Bridges client socket to plugin socket |
| `services/websocket-writer.service.ts` | Adapts raw WebSocket to writer interface (`send`, `setSessionId`, `getSessionId`) |
| `services/websocket-writer.service.ts` | Adapts raw WebSocket to writer interface (`send`, `setSessionId`, `getSessionId`) for non-chat writer consumers |
| `services/websocket-state.service.ts` | Holds shared chat client set and open-state constant |
## High-Level Architecture
@@ -52,12 +54,12 @@ flowchart LR
D -->|other| H[close()]
E --> I[connectedClients Set]
E --> J[WebSocketWriter]
E --> J[chatRunRegistry + ChatSessionWriter]
F --> K[ptySessionsMap]
G --> L[Upstream Plugin ws://127.0.0.1:port/ws]
I --> M[projects.service broadcastProgress]
I --> N[sessions-watcher.service projects_updated]
I --> M[projects.service loading_progress]
I --> N[sessions-watcher.service session_upserted]
```
## Connection Handshake + Routing
@@ -105,38 +107,41 @@ sequenceDiagram
When a chat socket connects:
1. Add socket to `connectedClients`.
2. Build `WebSocketWriter` (captures `userId` from authenticated request).
3. Parse each incoming message with `parseIncomingJsonObject`.
4. Dispatch by `data.type`.
5. On close, remove socket from `connectedClients`.
2. Parse each incoming message with `parseIncomingJsonObject`.
3. Dispatch by `data.type` (four message types, none provider-specific).
4. On close, remove socket from `connectedClients`.
### Session identity model
The frontend only ever knows the **app session id** (allocated by
`POST /api/providers/sessions` or discovered via the session index). The
provider-native id (JSONL file name, CLI resume id) stays inside the backend:
1. `chat.send` resolves the app id to `{ provider, provider_session_id, project_path }` from the sessions DB.
2. The provider runtime receives the provider-native id for resume.
3. The `ChatSessionWriter` remaps every outbound event back to the app id, and turns `session_created` announcements into a DB mapping update instead of forwarding them.
### Chat Message Dispatch
```mermaid
flowchart TD
A[Incoming WS message] --> B[parseIncomingJsonObject]
B -->|invalid| C[send {type:error}]
B -->|invalid| C[send kind:protocol_error]
B -->|ok| D{data.type}
D -->|claude-command| E[queryClaudeSDK]
D -->|cursor-command| F[spawnCursor]
D -->|codex-command| G[queryCodex]
D -->|gemini-command| H[spawnGemini]
D -->|cursor-resume| I[spawnCursor resume]
D -->|abort-session| J[abort by provider]
D -->|claude-permission-response| K[resolveToolApproval]
D -->|cursor-abort| L[abortCursorSession]
D -->|check-session-status| M[is*SessionActive + optional reconnectSessionWriter]
D -->|get-pending-permissions| N[getPendingApprovalsForSession]
D -->|get-active-sessions| O[getActive*Sessions]
D -->|chat.send| E[resolve session row -> startRun -> spawnFns provider]
D -->|chat.abort| F[abortFns provider + synthetic complete]
D -->|chat.subscribe| G[chat_subscribed ack + attach socket + replay events seq > lastSeq]
D -->|chat.permission-response| H[resolveToolApproval]
D -->|other| I[send kind:protocol_error]
```
### Chat Notes
1. **Unified terminal lifecycle**: every provider run ends with exactly one `complete` message built by `createCompleteMessage()` (`server/shared/utils.ts`), regardless of provider: `{ kind: "complete", sessionId, actualSessionId, exitCode, success, aborted }`. Failed runs emit an informational `error` message first, then the terminal `complete` with `success: false`. Mid-run `error` messages (e.g. stderr output) are non-terminal; the frontend only treats `complete` as end-of-run.
2. `abort-session` sends the terminal `complete` (`aborted: true`) on behalf of the cancelled run; providers detect the abort and skip their own `complete` so the client sees exactly one.
3. `check-session-status` returns `{ type: "session-status", isProcessing }`.
4. Claude status checks can reconnect output stream to the new socket via `reconnectSessionWriter`.
1. **Unified envelope**: every server-to-client frame carries a `kind` — either a provider `NormalizedMessage` kind or a gateway kind (`chat_subscribed`, `session_upserted`, `loading_progress`, `protocol_error`). There is no second `type`-based protocol.
2. **Unified terminal lifecycle**: every provider run ends with exactly one `complete` message built by `createCompleteMessage()` (`server/shared/utils.ts`): `{ kind: "complete", sessionId, actualSessionId, exitCode, success, aborted }`. The chat handler emits a synthetic `complete` for runs that crash or get aborted, and the run registry drops duplicate completes.
3. **Per-run event log**: every live event gets a monotonically increasing `seq`. `chat.subscribe { sessions: [{ sessionId, lastSeq }] }` re-attaches the live stream to the requesting socket (any provider, not just Claude) and replays events with `seq > lastSeq`. If the buffer no longer covers `lastSeq`, the client refreshes over REST.
4. `chat_subscribed` includes `isProcessing` (replaces `check-session-status`) and `pendingPermissions` (replaces `get-pending-permissions`).
## `/shell` Terminal Flow
@@ -224,9 +229,9 @@ Only chat sockets (`/ws`) are tracked in `connectedClients`.
That shared set is consumed by:
1. `modules/projects/services/projects-with-sessions-fetch.service.ts`
Broadcasts `loading_progress` while project snapshots are being built.
Broadcasts `kind: loading_progress` while project snapshots are being built.
2. `modules/providers/services/sessions-watcher.service.ts`
Broadcasts `projects_updated` when provider session artifacts change.
Broadcasts per-session `kind: session_upserted` deltas when provider session artifacts change (no full project snapshots).
This design centralizes cross-module realtime fanout without requiring route-local references to WebSocket internals.
@@ -253,7 +258,7 @@ Current explicit close codes in this module:
Other errors:
1. Chat handler catches and emits `{ type: "error", error }`.
1. Chat handler catches and emits `{ kind: "protocol_error", code, error }`.
2. Shell handler catches and writes terminal-visible error output.
3. Unknown websocket paths are closed immediately.

View File

@@ -0,0 +1,257 @@
import { sessionsDb } from '@/modules/database/index.js';
import { ChatSessionWriter } from '@/modules/websocket/services/chat-session-writer.service.js';
import type {
LLMProvider,
NormalizedMessage,
RealtimeClientConnection,
} from '@/shared/types.js';
type ChatRunStatus = 'running' | 'completed';
/**
* One live (or recently finished) provider run for a single app session.
*
* State notes — why each mutable field is essential:
* - `providerSessionId`: the provider-native id captured mid-run. The abort
* handler needs it to address the provider runtime, and the DB mapping is
* written from it so history/resume work after the run.
* - `status`: drives `chat_subscribed.isProcessing`, prevents double sends
* into the same session, and guards the synthetic-complete fallback in the
* chat handler (only emitted when a runtime died without completing).
* - `lastSeq` / `events`: the per-run event log. Every live event gets a
* monotonically increasing `seq` and is buffered so a reconnecting client
* can replay exactly the events it missed via `chat.subscribe`.
*/
type ChatRun = {
appSessionId: string;
provider: LLMProvider;
providerSessionId: string | null;
status: ChatRunStatus;
lastSeq: number;
events: NormalizedMessage[];
writer: ChatSessionWriter;
startedAt: number;
completedAt: number | null;
};
/**
* How long a completed run stays available for replay. Covers the window
* between a run finishing and the client refreshing history over REST (for
* example when the browser tab was asleep while the run completed).
*/
const COMPLETED_RUN_RETENTION_MS = 5 * 60 * 1000;
/**
* Upper bound on buffered events per run so a very long tool-heavy run cannot
* grow memory unbounded. When exceeded, the oldest events are dropped —
* a reconnecting client whose `lastSeq` predates the buffer falls back to a
* REST history refresh, which is always the authoritative source.
*/
const MAX_BUFFERED_EVENTS_PER_RUN = 5000;
/**
* Active and recently-completed runs keyed by app session id.
*
* This map is the single in-memory source of truth for "is something running
* for this session" — the chat websocket handler, abort path, and subscribe
* path all consult it instead of asking each provider runtime individually.
*/
const runs = new Map<string, ChatRun>();
function evictRunLater(appSessionId: string): void {
const timer = setTimeout(() => {
const run = runs.get(appSessionId);
if (run && run.status === 'completed') {
runs.delete(appSessionId);
}
}, COMPLETED_RUN_RETENTION_MS);
// Never keep the process alive just to evict a buffered run.
timer.unref?.();
}
/**
* Decorates one outbound live event for a run and records it in the event log.
*
* Responsibilities:
* 1. Remap `sessionId` (and `actualSessionId` on `complete`) to the stable
* app session id — provider-native ids never leave the backend.
* 2. Assign the next `seq` so clients can detect/replay gaps.
* 3. Buffer the event for `chat.subscribe` replay.
* 4. Flip the run to `completed` when the terminal `complete` event passes by.
*/
function decorateAndRecordEvent(run: ChatRun, message: NormalizedMessage): NormalizedMessage | null {
// Exactly-one-complete contract: when a run is aborted the chat handler
// emits the terminal `complete` immediately, but the killed runtime may
// still emit its own `complete` from its exit handler moments later.
// Whichever arrives first wins; the duplicate is dropped here.
if (message.kind === 'complete' && run.status === 'completed') {
return null;
}
run.lastSeq += 1;
const outbound: NormalizedMessage = {
...message,
sessionId: run.appSessionId,
seq: run.lastSeq,
};
if (message.kind === 'complete') {
// The provider may report its own id here; the frontend only ever knows
// the app id, so the "actual" id is by definition the app id as well.
outbound.actualSessionId = run.appSessionId;
run.status = 'completed';
run.completedAt = Date.now();
evictRunLater(run.appSessionId);
}
run.events.push(outbound);
if (run.events.length > MAX_BUFFERED_EVENTS_PER_RUN) {
run.events.splice(0, run.events.length - MAX_BUFFERED_EVENTS_PER_RUN);
}
return outbound;
}
/**
* Records the provider-native session id for a run and persists the
* app-id-to-provider-id mapping so history fetches and future resumes can
* address the provider transcript.
*
* Called from the gateway writer when the runtime either calls
* `setSessionId(...)` or emits its `session_created` event — whichever
* happens first wins; later calls with the same id are no-ops.
*/
function recordProviderSessionId(run: ChatRun, providerSessionId: string): void {
if (!providerSessionId || run.providerSessionId === providerSessionId) {
return;
}
run.providerSessionId = providerSessionId;
try {
sessionsDb.assignProviderSessionId(run.appSessionId, providerSessionId);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error('[ChatRunRegistry] Failed to persist provider session id mapping', {
appSessionId: run.appSessionId,
providerSessionId,
error: message,
});
}
}
/**
* Registry of live provider runs keyed by the stable app session id.
*
* The registry is what makes the websocket protocol provider-independent:
* every run gets a `ChatSessionWriter` that remaps provider-native session
* ids to the app id, assigns `seq` numbers, and buffers events for replay —
* regardless of which provider runtime produced them.
*/
export const chatRunRegistry = {
/**
* Starts tracking a run and returns it, or `null` when a run is already in
* progress for the session (callers must reject the duplicate send).
*/
startRun(input: {
appSessionId: string;
provider: LLMProvider;
providerSessionId: string | null;
connection: RealtimeClientConnection;
userId: string | number | null;
}): ChatRun | null {
const existing = runs.get(input.appSessionId);
if (existing && existing.status === 'running') {
return null;
}
const run: ChatRun = {
appSessionId: input.appSessionId,
provider: input.provider,
providerSessionId: input.providerSessionId,
status: 'running',
lastSeq: 0,
events: [],
writer: null as unknown as ChatSessionWriter,
startedAt: Date.now(),
completedAt: null,
};
run.writer = new ChatSessionWriter({
connection: input.connection,
userId: input.userId,
provider: input.provider,
providerSessionId: input.providerSessionId,
onProviderSessionId: (providerSessionId) => {
recordProviderSessionId(run, providerSessionId);
},
decorateOutboundEvent: (message) => decorateAndRecordEvent(run, message),
});
runs.set(input.appSessionId, run);
return run;
},
getRun(appSessionId: string): ChatRun | undefined {
return runs.get(appSessionId);
},
isProcessing(appSessionId: string): boolean {
return runs.get(appSessionId)?.status === 'running';
},
/**
* Re-attaches a run's outbound stream to a (new) websocket connection.
*
* This is the generic replacement for the Claude-only writer reconnect:
* after a page refresh the new socket subscribes and immediately starts
* receiving the still-running stream, for every provider.
*/
attachConnection(appSessionId: string, connection: RealtimeClientConnection): boolean {
const run = runs.get(appSessionId);
if (!run) {
return false;
}
run.writer.updateWebSocket(connection);
return true;
},
/**
* Returns buffered events with `seq` greater than `afterSeq` for replay.
*
* An empty array with `run.lastSeq > afterSeq` not covered by the buffer
* means the buffer was truncated; the client should refresh over REST.
*/
replayEvents(appSessionId: string, afterSeq: number): NormalizedMessage[] {
const run = runs.get(appSessionId);
if (!run) {
return [];
}
return run.events.filter((event) => typeof event.seq === 'number' && event.seq > afterSeq);
},
/**
* Emits a synthetic terminal `complete` if (and only if) the run is still
* marked running. Used when a provider runtime throws or resolves without
* having produced its own terminal event, and by the abort path.
*/
completeRun(appSessionId: string, opts: { exitCode: number; aborted?: boolean }): void {
const run = runs.get(appSessionId);
if (!run || run.status !== 'running') {
return;
}
run.writer.sendComplete(opts);
},
/**
* Test-only escape hatch: clears every tracked run.
*/
clearAll(): void {
runs.clear();
},
};

View File

@@ -0,0 +1,145 @@
import { WS_OPEN_STATE } from '@/modules/websocket/services/websocket-state.service.js';
import type {
LLMProvider,
NormalizedMessage,
RealtimeClientConnection,
} from '@/shared/types.js';
import { createCompleteMessage, readObjectRecord } from '@/shared/utils.js';
type ChatSessionWriterOptions = {
connection: RealtimeClientConnection;
userId: string | number | null;
provider: LLMProvider;
/** Provider-native id when resuming an existing session, otherwise null. */
providerSessionId: string | null;
/**
* Invoked the moment the provider runtime reveals its native session id
* (either via `setSessionId` or a `session_created` event). The registry
* persists the app-id-to-provider-id mapping from this callback.
*/
onProviderSessionId: (providerSessionId: string) => void;
/**
* Remaps/sequences/buffers one outbound live event. Implemented by the chat
* run registry; the writer never forwards a provider event untouched.
* Returns `null` when the event must be dropped (duplicate terminal
* `complete` after an abort already completed the run).
*/
decorateOutboundEvent: (message: NormalizedMessage) => NormalizedMessage | null;
};
/**
* Gateway writer handed to provider runtimes instead of a raw websocket writer.
*
* It exposes the exact same surface as `WebSocketWriter` (`send`,
* `setSessionId`, `getSessionId`, `updateWebSocket`, `userId`,
* `isWebSocketWriter`) so the provider runtimes (`claude-sdk.js`,
* `cursor-cli.js`, ...) need zero changes — but everything that flows through
* it is translated from the provider's world into the app's protocol:
*
* - `session_created` events are swallowed and turned into a provider-id
* mapping; the frontend never learns provider-native ids.
* - every other event gets `sessionId` remapped to the app session id and a
* per-run `seq` assigned before being forwarded.
* - `setSessionId(...)` calls (used by runtimes to label captured ids) are
* intercepted and recorded as the provider-id mapping as well.
*/
export class ChatSessionWriter {
ws: RealtimeClientConnection;
userId: string | number | null;
/**
* Some runtimes feature-detect their writer with this flag; keep it so the
* gateway writer is a drop-in replacement for `WebSocketWriter`.
*/
isWebSocketWriter = true;
private readonly options: ChatSessionWriterOptions;
/**
* The provider-native session id as the runtime knows it. Kept locally
* (besides the registry) because runtimes read it back via `getSessionId()`
* to label their own outgoing events — those labels are remapped on send
* anyway, but the runtime-visible value must stay provider-native.
*/
private providerSessionId: string | null;
constructor(options: ChatSessionWriterOptions) {
this.options = options;
this.ws = options.connection;
this.userId = options.userId;
this.providerSessionId = options.providerSessionId;
}
send(data: unknown): void {
const record = readObjectRecord(data);
if (!record || typeof record.kind !== 'string') {
// Provider runtimes only emit kind-based normalized messages. Anything
// else indicates a programming error; drop it rather than leaking an
// un-remapped payload to the client.
console.error('[ChatSessionWriter] Dropping non-normalized outbound payload', data);
return;
}
const message = record as NormalizedMessage;
if (message.kind === 'session_created') {
const announcedId =
typeof message.newSessionId === 'string' && message.newSessionId
? message.newSessionId
: message.sessionId;
if (announcedId) {
this.captureProviderSessionId(announcedId);
}
// Swallowed on purpose: the frontend already has the stable app session
// id, so there is no client-side handoff to perform anymore.
return;
}
const outbound = this.options.decorateOutboundEvent(message);
if (outbound) {
this.forward(outbound);
}
}
/**
* Emits the synthetic terminal `complete` for runs that ended without one
* (runtime crash before completing, or user abort).
*/
sendComplete(opts: { exitCode: number; aborted?: boolean }): void {
const message = createCompleteMessage({
provider: this.options.provider,
sessionId: this.providerSessionId,
exitCode: opts.exitCode,
aborted: opts.aborted,
});
const outbound = this.options.decorateOutboundEvent(message);
if (outbound) {
this.forward(outbound);
}
}
updateWebSocket(newConnection: RealtimeClientConnection): void {
this.ws = newConnection;
}
setSessionId(sessionId: string): void {
this.captureProviderSessionId(sessionId);
}
getSessionId(): string | null {
return this.providerSessionId;
}
private captureProviderSessionId(providerSessionId: string): void {
if (!providerSessionId || this.providerSessionId === providerSessionId) {
return;
}
this.providerSessionId = providerSessionId;
this.options.onProviderSessionId(providerSessionId);
}
private forward(message: NormalizedMessage): void {
if (this.ws.readyState === WS_OPEN_STATE) {
this.ws.send(JSON.stringify(message));
}
}
}

View File

@@ -1,40 +1,35 @@
import type { WebSocket } from 'ws';
import { connectedClients } from '@/modules/websocket/services/websocket-state.service.js';
import { WebSocketWriter } from '@/modules/websocket/services/websocket-writer.service.js';
import { sessionsDb } from '@/modules/database/index.js';
import { chatRunRegistry } from '@/modules/websocket/services/chat-run-registry.service.js';
import { connectedClients, WS_OPEN_STATE } from '@/modules/websocket/services/websocket-state.service.js';
import type {
AnyRecord,
AuthenticatedWebSocketRequest,
LLMProvider,
} from '@/shared/types.js';
import { createCompleteMessage, parseIncomingJsonObject } from '@/shared/utils.js';
import { parseIncomingJsonObject } from '@/shared/utils.js';
type ChatIncomingMessage = AnyRecord & {
type?: string;
command?: string;
options?: AnyRecord;
provider?: string;
sessionId?: string;
requestId?: string;
allow?: unknown;
updatedInput?: unknown;
message?: unknown;
rememberEntry?: unknown;
};
const DEFAULT_PROVIDER: LLMProvider = 'claude';
/**
* One provider runtime entry point. All five runtimes share this signature,
* which lets the chat handler dispatch through a provider-keyed map instead
* of provider-specific branches.
*/
type ProviderSpawnFn = (
command: string,
options: AnyRecord,
writer: unknown
) => Promise<unknown>;
type ChatWebSocketDependencies = {
queryClaudeSDK: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
spawnCursor: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
queryCodex: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
spawnGemini: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
spawnOpenCode: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
abortClaudeSDKSession: (sessionId: string) => Promise<boolean>;
abortCursorSession: (sessionId: string) => boolean;
abortCodexSession: (sessionId: string) => boolean;
abortGeminiSession: (sessionId: string) => boolean;
abortOpenCodeSession: (sessionId: string) => boolean;
/** Provider runtimes keyed by provider id. */
spawnFns: Record<LLMProvider, ProviderSpawnFn>;
/**
* Abort functions keyed by provider id. They are addressed with the
* provider-native session id (that is how runtimes key their process maps).
* The Claude abort is async; the rest are sync — both shapes are accepted.
*/
abortFns: Record<LLMProvider, (providerSessionId: string) => boolean | Promise<boolean>>;
resolveToolApproval: (
requestId: string,
payload: {
@@ -44,31 +39,10 @@ type ChatWebSocketDependencies = {
rememberEntry?: unknown;
}
) => void;
isClaudeSDKSessionActive: (sessionId: string) => boolean;
isCursorSessionActive: (sessionId: string) => boolean;
isCodexSessionActive: (sessionId: string) => boolean;
isGeminiSessionActive: (sessionId: string) => boolean;
isOpenCodeSessionActive: (sessionId: string) => boolean;
reconnectSessionWriter: (sessionId: string, ws: WebSocket) => boolean;
getPendingApprovalsForSession: (sessionId: string) => unknown[];
getActiveClaudeSDKSessions: () => unknown;
getActiveCursorSessions: () => unknown;
getActiveCodexSessions: () => unknown;
getActiveGeminiSessions: () => unknown;
getActiveOpenCodeSessions: () => unknown;
/** Claude-only today: pending tool approvals included in `chat_subscribed`. */
getPendingApprovalsForSession: (providerSessionId: string) => unknown[];
};
/**
* Normalizes potentially invalid provider names coming from websocket payloads.
*/
function readProvider(value: unknown): LLMProvider {
if (value === 'claude' || value === 'cursor' || value === 'codex' || value === 'gemini' || value === 'opencode') {
return value;
}
return DEFAULT_PROVIDER;
}
/**
* Extracts the authenticated request user id in the formats currently produced
* by platform and OSS auth code paths.
@@ -92,8 +66,258 @@ function readRequestUserId(
return null;
}
function sendJson(ws: WebSocket, payload: unknown): void {
if (ws.readyState === WS_OPEN_STATE) {
ws.send(JSON.stringify(payload));
}
}
/**
* Reports a protocol-level failure to the requesting client.
*
* Protocol errors deliberately use their own `kind` (instead of the provider
* `error` message kind) so the frontend can distinguish "your request was
* invalid" from "the model run produced an error" without inspecting text.
*/
function sendProtocolError(
ws: WebSocket,
code: string,
error: string,
sessionId?: string
): void {
sendJson(ws, {
kind: 'protocol_error',
code,
error,
sessionId: sessionId ?? null,
timestamp: new Date().toISOString(),
});
}
function readRequiredSessionId(data: AnyRecord): string | null {
const sessionId = typeof data.sessionId === 'string' ? data.sessionId.trim() : '';
return sessionId.length > 0 ? sessionId : null;
}
/**
* Handles `chat.send`: resolves the session row (provider, project path, and
* provider-native id all come from the database — never from the client),
* registers the run, and dispatches to the provider runtime.
*/
async function handleChatSend(
ws: WebSocket,
userId: string | number | null,
data: AnyRecord,
dependencies: ChatWebSocketDependencies
): Promise<void> {
const sessionId = readRequiredSessionId(data);
if (!sessionId) {
sendProtocolError(ws, 'SESSION_ID_REQUIRED', 'chat.send requires a sessionId.');
return;
}
const session = sessionsDb.getSessionById(sessionId);
if (!session) {
sendProtocolError(
ws,
'SESSION_NOT_FOUND',
`Session "${sessionId}" was not found. Create it via POST /api/providers/sessions first.`,
sessionId
);
return;
}
const provider = session.provider as LLMProvider;
const spawnFn = dependencies.spawnFns[provider];
if (!spawnFn) {
sendProtocolError(ws, 'UNSUPPORTED_PROVIDER', `Provider "${provider}" is not available.`, sessionId);
return;
}
const run = chatRunRegistry.startRun({
appSessionId: sessionId,
provider,
providerSessionId: session.provider_session_id,
connection: ws,
userId,
});
if (!run) {
sendProtocolError(
ws,
'RUN_IN_PROGRESS',
`Session "${sessionId}" already has a run in progress.`,
sessionId
);
return;
}
const clientOptions = (data.options ?? {}) as AnyRecord;
const command = typeof data.content === 'string' ? data.content : '';
// The provider runtimes receive the provider-native session id (that is the
// id their CLI/SDK understands for resume). Brand-new sessions have no
// provider id yet, so the runtime starts fresh and announces one, which the
// gateway writer captures and maps back to the app session id.
const runtimeOptions: AnyRecord = {
...clientOptions,
sessionId: session.provider_session_id ?? undefined,
resume: Boolean(session.provider_session_id),
cwd: clientOptions.cwd ?? session.project_path ?? undefined,
projectPath: session.project_path ?? clientOptions.projectPath,
};
try {
await spawnFn(command, runtimeOptions, run.writer);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(`[Chat] Provider runtime "${provider}" failed`, { sessionId, error: message });
} finally {
// Safety net: a runtime that crashed (or resolved) without emitting its
// terminal `complete` would otherwise leave the session stuck in
// "processing" forever on every connected client.
chatRunRegistry.completeRun(sessionId, { exitCode: 1 });
}
}
/**
* Handles `chat.abort`: cancels the run for one app session and emits the
* terminal `complete` on its behalf (runtimes skip their own complete for
* aborted runs, and the registry drops any duplicate).
*/
async function handleChatAbort(
ws: WebSocket,
data: AnyRecord,
dependencies: ChatWebSocketDependencies
): Promise<void> {
const sessionId = readRequiredSessionId(data);
if (!sessionId) {
sendProtocolError(ws, 'SESSION_ID_REQUIRED', 'chat.abort requires a sessionId.');
return;
}
const run = chatRunRegistry.getRun(sessionId);
if (!run || run.status !== 'running') {
sendProtocolError(ws, 'NO_ACTIVE_RUN', `Session "${sessionId}" has no active run.`, sessionId);
return;
}
const abortFn = dependencies.abortFns[run.provider];
let success = false;
if (abortFn && run.providerSessionId) {
success = Boolean(await abortFn(run.providerSessionId));
}
chatRunRegistry.completeRun(sessionId, {
exitCode: success ? 0 : 1,
aborted: true,
});
}
/**
* Handles `chat.subscribe`: for each requested session, reports whether a run
* is processing, re-attaches the live stream to this socket, replays missed
* events (seq > lastSeq), and includes pending permission requests.
*
* This single message replaces the old `check-session-status`,
* `get-pending-permissions`, and Claude-only writer reconnect flows.
*/
function handleChatSubscribe(
ws: WebSocket,
data: AnyRecord,
dependencies: ChatWebSocketDependencies
): void {
const targets = Array.isArray(data.sessions) ? data.sessions : [];
for (const target of targets) {
if (!target || typeof target !== 'object') {
continue;
}
const sessionId = typeof (target as AnyRecord).sessionId === 'string'
? ((target as AnyRecord).sessionId as string).trim()
: '';
if (!sessionId) {
continue;
}
const lastSeqRaw = (target as AnyRecord).lastSeq;
const lastSeq = typeof lastSeqRaw === 'number' && Number.isFinite(lastSeqRaw)
? Math.max(0, Math.floor(lastSeqRaw))
: 0;
const run = chatRunRegistry.getRun(sessionId);
const isProcessing = chatRunRegistry.isProcessing(sessionId);
// Future live events for this run should land on the socket that asked —
// this is what makes mid-stream page refreshes work for all providers.
if (isProcessing) {
chatRunRegistry.attachConnection(sessionId, ws);
}
// Pending approvals are tracked under the provider-native id inside the
// Claude runtime; remap their sessionId so the client only sees app ids.
const pendingPermissions = (run?.providerSessionId
? dependencies.getPendingApprovalsForSession(run.providerSessionId)
: []
).map((approval) =>
approval && typeof approval === 'object'
? { ...(approval as AnyRecord), sessionId }
: approval,
);
sendJson(ws, {
kind: 'chat_subscribed',
sessionId,
isProcessing,
lastSeq: run?.lastSeq ?? 0,
pendingPermissions,
timestamp: new Date().toISOString(),
});
// Replay only for RUNNING runs, strictly after the ack. Completed runs
// are fully persisted to the provider transcript and served over REST —
// replaying them (e.g. after a page reload where the client's lastSeq is
// 0) would duplicate messages the history fetch already returned.
if (isProcessing) {
for (const event of chatRunRegistry.replayEvents(sessionId, lastSeq)) {
sendJson(ws, event);
}
}
}
}
/**
* Handles `chat.permission-response`: forwards a tool-approval decision to the
* pending approval resolver (Claude is the only provider with interactive
* approvals today, but the message is intentionally provider-neutral).
*/
function handlePermissionResponse(data: AnyRecord, dependencies: ChatWebSocketDependencies): void {
if (typeof data.requestId !== 'string' || data.requestId.length === 0) {
return;
}
dependencies.resolveToolApproval(data.requestId, {
allow: Boolean(data.allow),
updatedInput: data.updatedInput,
message: typeof data.message === 'string' ? data.message : undefined,
rememberEntry: data.rememberEntry,
});
}
/**
* Handles authenticated chat websocket messages used by the main chat panel.
*
* Inbound protocol (client to server):
* - `chat.send` { sessionId, content, options? }
* - `chat.abort` { sessionId }
* - `chat.subscribe` { sessions: [{ sessionId, lastSeq? }] }
* - `chat.permission-response` { requestId, allow, updatedInput?, message?, rememberEntry? }
*
* Outbound protocol (server to client): every frame is `kind`-based — either
* a provider `NormalizedMessage` (with `seq`) or a gateway event
* (`chat_subscribed`, `session_upserted`, `loading_progress`,
* `protocol_error`).
*/
export function handleChatConnection(
ws: WebSocket,
@@ -103,7 +327,7 @@ export function handleChatConnection(
console.log('[INFO] Chat WebSocket connected');
connectedClients.add(ws);
const writer = new WebSocketWriter(ws, readRequestUserId(request));
const userId = readRequestUserId(request);
ws.on('message', async (rawMessage) => {
try {
@@ -112,167 +336,30 @@ export function handleChatConnection(
throw new Error('Invalid websocket payload');
}
const data = parsed as ChatIncomingMessage;
const messageType = data.type;
if (!messageType) {
throw new Error('Message type is required');
}
const data = parsed as AnyRecord;
const messageType = typeof data.type === 'string' ? data.type : '';
if (messageType === 'claude-command') {
await dependencies.queryClaudeSDK(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'cursor-command') {
await dependencies.spawnCursor(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'codex-command') {
await dependencies.queryCodex(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'gemini-command') {
await dependencies.spawnGemini(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'opencode-command') {
await dependencies.spawnOpenCode(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'cursor-resume') {
await dependencies.spawnCursor(
'',
{
sessionId: data.sessionId,
resume: true,
cwd: data.options?.cwd,
},
writer
);
return;
}
if (messageType === 'abort-session') {
const provider = readProvider(data.provider);
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : '';
let success = false;
if (provider === 'cursor') {
success = dependencies.abortCursorSession(sessionId);
} else if (provider === 'codex') {
success = dependencies.abortCodexSession(sessionId);
} else if (provider === 'gemini') {
success = dependencies.abortGeminiSession(sessionId);
} else if (provider === 'opencode') {
success = dependencies.abortOpenCodeSession(sessionId);
} else {
success = await dependencies.abortClaudeSDKSession(sessionId);
}
// Terminal complete on behalf of the cancelled run — providers skip
// their own complete for aborted runs so the client sees exactly one.
writer.send(
createCompleteMessage({
provider,
sessionId,
exitCode: success ? 0 : 1,
aborted: true,
})
);
return;
}
if (messageType === 'claude-permission-response') {
if (typeof data.requestId === 'string' && data.requestId.length > 0) {
dependencies.resolveToolApproval(data.requestId, {
allow: Boolean(data.allow),
updatedInput: data.updatedInput,
message: typeof data.message === 'string' ? data.message : undefined,
rememberEntry: data.rememberEntry,
});
}
return;
}
if (messageType === 'cursor-abort') {
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : '';
const success = dependencies.abortCursorSession(sessionId);
writer.send(
createCompleteMessage({
provider: 'cursor',
sessionId,
exitCode: success ? 0 : 1,
aborted: true,
})
);
return;
}
if (messageType === 'check-session-status') {
const provider = readProvider(data.provider);
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : '';
let isActive = false;
if (provider === 'cursor') {
isActive = dependencies.isCursorSessionActive(sessionId);
} else if (provider === 'codex') {
isActive = dependencies.isCodexSessionActive(sessionId);
} else if (provider === 'gemini') {
isActive = dependencies.isGeminiSessionActive(sessionId);
} else if (provider === 'opencode') {
isActive = dependencies.isOpenCodeSessionActive(sessionId);
} else {
isActive = dependencies.isClaudeSDKSessionActive(sessionId);
if (isActive) {
dependencies.reconnectSessionWriter(sessionId, ws);
}
}
writer.send({
type: 'session-status',
sessionId,
provider,
isProcessing: isActive,
});
return;
}
if (messageType === 'get-pending-permissions') {
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : '';
if (sessionId && dependencies.isClaudeSDKSessionActive(sessionId)) {
const pending = dependencies.getPendingApprovalsForSession(sessionId);
writer.send({
type: 'pending-permissions-response',
sessionId,
data: pending,
});
}
return;
}
if (messageType === 'get-active-sessions') {
writer.send({
type: 'active-sessions',
sessions: {
claude: dependencies.getActiveClaudeSDKSessions(),
cursor: dependencies.getActiveCursorSessions(),
codex: dependencies.getActiveCodexSessions(),
gemini: dependencies.getActiveGeminiSessions(),
opencode: dependencies.getActiveOpenCodeSessions(),
},
});
switch (messageType) {
case 'chat.send':
await handleChatSend(ws, userId, data, dependencies);
return;
case 'chat.abort':
await handleChatAbort(ws, data, dependencies);
return;
case 'chat.subscribe':
handleChatSubscribe(ws, data, dependencies);
return;
case 'chat.permission-response':
handlePermissionResponse(data, dependencies);
return;
default:
sendProtocolError(ws, 'UNKNOWN_MESSAGE_TYPE', `Unknown message type "${messageType}".`);
return;
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error('[ERROR] Chat WebSocket error:', message);
writer.send({
type: 'error',
error: message,
});
sendProtocolError(ws, 'INTERNAL_ERROR', message);
}
});

View File

@@ -0,0 +1,207 @@
import assert from 'node:assert/strict';
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import path from 'node:path';
import test from 'node:test';
import { closeConnection, initializeDatabase, sessionsDb } from '@/modules/database/index.js';
import { chatRunRegistry } from '@/modules/websocket/services/chat-run-registry.service.js';
import type { NormalizedMessage } from '@/shared/types.js';
/**
* Minimal stand-in for a websocket connection: collects every JSON frame the
* gateway writer forwards so assertions can inspect the outbound protocol.
*/
class FakeConnection {
readyState = 1; // WS_OPEN_STATE
frames: NormalizedMessage[] = [];
send(data: string): void {
this.frames.push(JSON.parse(data) as NormalizedMessage);
}
}
async function withIsolatedDatabase(runTest: () => void | Promise<void>): Promise<void> {
const previousDatabasePath = process.env.DATABASE_PATH;
const tempDirectory = await mkdtemp(path.join(tmpdir(), 'chat-run-registry-'));
const databasePath = path.join(tempDirectory, 'auth.db');
closeConnection();
process.env.DATABASE_PATH = databasePath;
await initializeDatabase();
try {
await runTest();
} finally {
chatRunRegistry.clearAll();
closeConnection();
if (previousDatabasePath === undefined) {
delete process.env.DATABASE_PATH;
} else {
process.env.DATABASE_PATH = previousDatabasePath;
}
await rm(tempDirectory, { recursive: true, force: true });
}
}
test('live events are remapped to the app session id and sequenced', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createAppSession('app-run-1', 'claude', '/workspace/demo');
const connection = new FakeConnection();
const run = chatRunRegistry.startRun({
appSessionId: 'app-run-1',
provider: 'claude',
providerSessionId: null,
connection,
userId: 'user-1',
});
assert.ok(run);
run.writer.send({ kind: 'stream_delta', provider: 'claude', sessionId: 'provider-id-9', content: 'hello' });
run.writer.send({ kind: 'text', provider: 'claude', sessionId: 'provider-id-9', content: 'hello world' });
assert.equal(connection.frames.length, 2);
assert.equal(connection.frames[0]?.sessionId, 'app-run-1');
assert.equal(connection.frames[0]?.seq, 1);
assert.equal(connection.frames[1]?.sessionId, 'app-run-1');
assert.equal(connection.frames[1]?.seq, 2);
});
});
test('session_created is swallowed and persisted as the provider-id mapping', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createAppSession('app-run-2', 'cursor', '/workspace/demo');
const connection = new FakeConnection();
const run = chatRunRegistry.startRun({
appSessionId: 'app-run-2',
provider: 'cursor',
providerSessionId: null,
connection,
userId: null,
});
assert.ok(run);
run.writer.send({
kind: 'session_created',
provider: 'cursor',
sessionId: 'cursor-native-7',
newSessionId: 'cursor-native-7',
});
// Never forwarded to the client...
assert.equal(connection.frames.length, 0);
// ...but recorded in the registry and persisted in the database.
assert.equal(run.providerSessionId, 'cursor-native-7');
assert.equal(sessionsDb.getSessionById('app-run-2')?.provider_session_id, 'cursor-native-7');
});
});
test('complete marks the run finished and duplicate completes are dropped', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createAppSession('app-run-3', 'codex', '/workspace/demo');
const connection = new FakeConnection();
const run = chatRunRegistry.startRun({
appSessionId: 'app-run-3',
provider: 'codex',
providerSessionId: null,
connection,
userId: null,
});
assert.ok(run);
run.writer.send({ kind: 'complete', provider: 'codex', sessionId: 'native-3', exitCode: 0 });
// Late duplicate from a killed runtime's exit handler.
run.writer.send({ kind: 'complete', provider: 'codex', sessionId: 'native-3', exitCode: 1 });
const completes = connection.frames.filter((frame) => frame.kind === 'complete');
assert.equal(completes.length, 1);
assert.equal(completes[0]?.actualSessionId, 'app-run-3');
assert.equal(chatRunRegistry.isProcessing('app-run-3'), false);
// completeRun is also a no-op once the run already completed.
chatRunRegistry.completeRun('app-run-3', { exitCode: 1 });
assert.equal(connection.frames.filter((frame) => frame.kind === 'complete').length, 1);
});
});
test('replayEvents returns only events after the requested seq', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createAppSession('app-run-4', 'claude', '/workspace/demo');
const connection = new FakeConnection();
const run = chatRunRegistry.startRun({
appSessionId: 'app-run-4',
provider: 'claude',
providerSessionId: null,
connection,
userId: null,
});
assert.ok(run);
run.writer.send({ kind: 'stream_delta', provider: 'claude', sessionId: 'x', content: 'a' });
run.writer.send({ kind: 'stream_delta', provider: 'claude', sessionId: 'x', content: 'b' });
run.writer.send({ kind: 'stream_delta', provider: 'claude', sessionId: 'x', content: 'c' });
const replayed = chatRunRegistry.replayEvents('app-run-4', 1);
assert.deepEqual(replayed.map((event) => event.content), ['b', 'c']);
assert.deepEqual(replayed.map((event) => event.seq), [2, 3]);
});
});
test('attachConnection reroutes the live stream to a new socket', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createAppSession('app-run-5', 'gemini', '/workspace/demo');
const firstConnection = new FakeConnection();
const run = chatRunRegistry.startRun({
appSessionId: 'app-run-5',
provider: 'gemini',
providerSessionId: null,
connection: firstConnection,
userId: null,
});
assert.ok(run);
run.writer.send({ kind: 'stream_delta', provider: 'gemini', sessionId: 'g', content: 'before' });
const secondConnection = new FakeConnection();
assert.equal(chatRunRegistry.attachConnection('app-run-5', secondConnection), true);
run.writer.send({ kind: 'stream_delta', provider: 'gemini', sessionId: 'g', content: 'after' });
assert.deepEqual(firstConnection.frames.map((frame) => frame.content), ['before']);
assert.deepEqual(secondConnection.frames.map((frame) => frame.content), ['after']);
});
});
test('startRun rejects a second concurrent run for the same session', async () => {
await withIsolatedDatabase(() => {
sessionsDb.createAppSession('app-run-6', 'opencode', '/workspace/demo');
const connection = new FakeConnection();
const first = chatRunRegistry.startRun({
appSessionId: 'app-run-6',
provider: 'opencode',
providerSessionId: null,
connection,
userId: null,
});
assert.ok(first);
const second = chatRunRegistry.startRun({
appSessionId: 'app-run-6',
provider: 'opencode',
providerSessionId: null,
connection,
userId: null,
});
assert.equal(second, null);
// After the run finishes a new one is allowed again.
chatRunRegistry.completeRun('app-run-6', { exitCode: 0 });
const third = chatRunRegistry.startRun({
appSessionId: 'app-run-6',
provider: 'opencode',
providerSessionId: null,
connection,
userId: null,
});
assert.ok(third);
});
});

View File

@@ -0,0 +1,42 @@
import assert from 'node:assert/strict';
import test from 'node:test';
import { sliceTailPage } from '@/shared/utils.js';
const ITEMS = ['a', 'b', 'c', 'd', 'e'];
test('offset 0 returns the most recent page', () => {
const { page, hasMore } = sliceTailPage(ITEMS, 2, 0);
assert.deepEqual(page, ['d', 'e']);
assert.equal(hasMore, true);
});
test('increasing offsets walk backwards in time', () => {
const { page, hasMore } = sliceTailPage(ITEMS, 2, 2);
assert.deepEqual(page, ['b', 'c']);
assert.equal(hasMore, true);
});
test('the oldest page reports hasMore false', () => {
const { page, hasMore } = sliceTailPage(ITEMS, 2, 4);
assert.deepEqual(page, ['a']);
assert.equal(hasMore, false);
});
test('null limit returns everything', () => {
const { page, hasMore } = sliceTailPage(ITEMS, null, 0);
assert.deepEqual(page, ITEMS);
assert.equal(hasMore, false);
});
test('offsets past the start return an empty page', () => {
const { page, hasMore } = sliceTailPage(ITEMS, 3, 10);
assert.deepEqual(page, []);
assert.equal(hasMore, false);
});
test('zero limit returns an empty page but keeps hasMore accurate', () => {
const { page, hasMore } = sliceTailPage(ITEMS, 0, 0);
assert.deepEqual(page, []);
assert.equal(hasMore, true);
});

View File

@@ -175,6 +175,30 @@ export type MessageKind =
| 'interactive_prompt'
| 'task_notification';
/**
* Event kinds added by the chat gateway layer on top of provider message kinds.
*
* These are app-level realtime events (subscription acks, sidebar deltas,
* project loading progress, protocol failures) that are not produced by any
* provider adapter. Together with `MessageKind` they form the complete set of
* `kind` values a websocket client can receive, so the frontend only ever
* needs one kind-based switch.
*/
export type GatewayEventKind =
| 'chat_subscribed'
| 'session_upserted'
| 'loading_progress'
| 'protocol_error';
/**
* Complete set of `kind` values emitted to websocket clients.
*
* Every server-to-client websocket frame carries a `kind` from this union.
* Provider runtimes emit `MessageKind` values; gateway services emit
* `GatewayEventKind` values.
*/
export type ServerEventKind = MessageKind | GatewayEventKind;
/**
* Provider-neutral message envelope used in REST responses and realtime channels.
*
@@ -187,6 +211,13 @@ export type NormalizedMessage = {
timestamp: string;
provider: LLMProvider;
kind: MessageKind;
/**
* Monotonic per-run sequence number assigned by the chat run registry when a
* live event is forwarded to the websocket. History messages loaded over
* REST do not carry it. Clients use it with `chat.subscribe` to replay only
* the live events they missed across websocket reconnects.
*/
seq?: number;
role?: 'user' | 'assistant';
content?: string;
/**
@@ -237,11 +268,18 @@ export type NormalizedMessage = {
*
* Consumers should pass provider-specific lookup hints (`projectPath`) only
* when the selected provider requires them.
*
* `providerSessionId` is the provider-native session id from the sessions
* index (transcript file name / provider database key). Provider adapters
* must use it — never the app-facing session id they were called with — when
* matching transcript rows on disk, because app-created sessions use an
* app-allocated id that the provider has never seen.
*/
export type FetchHistoryOptions = {
projectPath?: string;
limit?: number | null;
offset?: number;
providerSessionId?: string;
};
/**

View File

@@ -383,6 +383,47 @@ export function createCompleteMessage(opts: {
});
}
// ---------------------------
//----------------- CONVERSATION HISTORY PAGINATION UTILITIES ------------
/**
* Slices one page from the END of a chronologically ordered message list.
*
* This is the single pagination contract for conversation history across all
* providers: `offset = 0` returns the most recent `limit` items, increasing
* offsets walk backwards in time (for "scroll up to load older" UIs), and a
* `null` limit returns everything. Items must already be sorted oldest-first;
* the returned page preserves that order.
*
* Every provider history reader must use this helper instead of slicing
* manually so `offset`/`limit` query params behave identically regardless of
* which provider produced the session.
*/
export function sliceTailPage<T>(
items: T[],
limit: number | null,
offset: number,
): { page: T[]; hasMore: boolean } {
const total = items.length;
const normalizedOffset = Math.max(0, offset);
if (limit === null) {
// A null limit returns the full list; offset still trims newest entries
// so "everything before the page I already have" stays expressible.
const end = Math.max(0, total - normalizedOffset);
return {
page: items.slice(0, end),
hasMore: false,
};
}
const end = Math.max(0, total - normalizedOffset);
const start = Math.max(0, end - Math.max(0, limit));
return {
page: items.slice(start, end),
hasMore: start > 0,
};
}
// ---------------------------
//----------------- MCP CONFIG PARSING UTILITIES ------------
/**