feat: unified message architecture with provider adapters and session store

- Add provider adapter layer (server/providers/) with registry pattern
    - Claude, Cursor, Codex, Gemini adapters normalize native formats to NormalizedMessage
    - Shared types.js defines ProviderAdapter interface and message kinds
    - Registry enables polymorphic provider lookup

  - Add unified REST endpoint: GET /api/sessions/:id/messages?provider=...
    - Replaces four provider-specific message endpoints with one
    - Delegates to provider adapters via registry

  - Add frontend session-keyed store (useSessionStore)
    - Per-session Map with serverMessages/realtimeMessages/merged
    - Dedup by ID, stale threshold for re-fetch, background session accumulation
    - No localStorage for messages — backend JSONL is source of truth

  - Add normalizedToChatMessages converter (useChatMessages)
    - Converts NormalizedMessage[] to existing ChatMessage[] UI format

  - Wire unified store into ChatInterface, useChatSessionState, useChatRealtimeHandlers
    - Session switch uses store cache for instant render
    - Background WebSocket messages routed to correct session slot
This commit is contained in:
simosmik
2026-03-19 00:03:07 +00:00
parent 612390db53
commit 7b3bc54d1b
26 changed files with 2664 additions and 2549 deletions

View File

@@ -24,6 +24,8 @@ import {
notifyRunStopped,
notifyUserIfEnabled
} from './services/notification-orchestrator.js';
import { claudeAdapter } from './providers/claude/adapter.js';
import { createNormalizedMessage } from './providers/types.js';
const activeSessions = new Map();
const pendingToolApprovals = new Map();
@@ -142,7 +144,7 @@ function matchesToolPermission(entry, toolName, input) {
* @returns {Object} SDK-compatible options
*/
function mapCliOptionsToSDK(options = {}) {
const { sessionId, cwd, toolsSettings, permissionMode, images } = options;
const { sessionId, cwd, toolsSettings, permissionMode } = options;
const sdkOptions = {};
@@ -193,7 +195,7 @@ function mapCliOptionsToSDK(options = {}) {
// Map model (default to sonnet)
// Valid models: sonnet, opus, haiku, opusplan, sonnet[1m]
sdkOptions.model = options.model || CLAUDE_MODELS.DEFAULT;
console.log(`Using model: ${sdkOptions.model}`);
// Model logged at query start below
// Map system prompt configuration
sdkOptions.systemPrompt = {
@@ -304,7 +306,7 @@ function extractTokenBudget(resultMessage) {
// This is the user's budget limit, not the model's context window
const contextWindow = parseInt(process.env.CONTEXT_WINDOW) || 160000;
console.log(`Token calculation: input=${inputTokens}, output=${outputTokens}, cache=${cacheReadTokens + cacheCreationTokens}, total=${totalUsed}/${contextWindow}`);
// Token calc logged via token-budget WS event
return {
used: totalUsed,
@@ -360,7 +362,7 @@ async function handleImages(command, images, cwd) {
modifiedCommand = command + imageNote;
}
console.log(`Processed ${tempImagePaths.length} images to temp directory: ${tempDir}`);
// Images processed
return { modifiedCommand, tempImagePaths, tempDir };
} catch (error) {
console.error('Error processing images for SDK:', error);
@@ -393,7 +395,7 @@ async function cleanupTempFiles(tempImagePaths, tempDir) {
);
}
console.log(`Cleaned up ${tempImagePaths.length} temp image files`);
// Temp files cleaned
} catch (error) {
console.error('Error during temp file cleanup:', error);
}
@@ -413,7 +415,7 @@ async function loadMcpConfig(cwd) {
await fs.access(claudeConfigPath);
} catch (error) {
// File doesn't exist, return null
console.log('No ~/.claude.json found, proceeding without MCP servers');
// No config file
return null;
}
@@ -433,7 +435,7 @@ async function loadMcpConfig(cwd) {
// Add global MCP servers
if (claudeConfig.mcpServers && typeof claudeConfig.mcpServers === 'object') {
mcpServers = { ...claudeConfig.mcpServers };
console.log(`Loaded ${Object.keys(mcpServers).length} global MCP servers`);
// Global MCP servers loaded
}
// Add/override with project-specific MCP servers
@@ -441,17 +443,14 @@ async function loadMcpConfig(cwd) {
const projectConfig = claudeConfig.claudeProjects[cwd];
if (projectConfig && projectConfig.mcpServers && typeof projectConfig.mcpServers === 'object') {
mcpServers = { ...mcpServers, ...projectConfig.mcpServers };
console.log(`Loaded ${Object.keys(projectConfig.mcpServers).length} project-specific MCP servers`);
// Project MCP servers merged
}
}
// Return null if no servers found
if (Object.keys(mcpServers).length === 0) {
console.log('No MCP servers configured');
return null;
}
console.log(`Total MCP servers loaded: ${Object.keys(mcpServers).length}`);
return mcpServers;
} catch (error) {
console.error('Error loading MCP config:', error.message);
@@ -541,13 +540,7 @@ async function queryClaudeSDK(command, options = {}, ws) {
}
const requestId = createRequestId();
ws.send({
type: 'claude-permission-request',
requestId,
toolName,
input,
sessionId: capturedSessionId || sessionId || null
});
ws.send(createNormalizedMessage({ kind: 'permission_request', requestId, toolName, input, sessionId: capturedSessionId || sessionId || null, provider: 'claude' }));
emitNotification(createNotificationEvent({
provider: 'claude',
sessionId: capturedSessionId || sessionId || null,
@@ -569,12 +562,7 @@ async function queryClaudeSDK(command, options = {}, ws) {
_receivedAt: new Date(),
},
onCancel: (reason) => {
ws.send({
type: 'claude-permission-cancelled',
requestId,
reason,
sessionId: capturedSessionId || sessionId || null
});
ws.send(createNormalizedMessage({ kind: 'permission_cancelled', requestId, reason, sessionId: capturedSessionId || sessionId || null, provider: 'claude' }));
}
});
if (!decision) {
@@ -650,39 +638,35 @@ async function queryClaudeSDK(command, options = {}, ws) {
// Send session-created event only once for new sessions
if (!sessionId && !sessionCreatedSent) {
sessionCreatedSent = true;
ws.send({
type: 'session-created',
sessionId: capturedSessionId
});
} else {
console.log('Not sending session-created. sessionId:', sessionId, 'sessionCreatedSent:', sessionCreatedSent);
ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'claude' }));
}
} else {
console.log('No session_id in message or already captured. message.session_id:', message.session_id, 'capturedSessionId:', capturedSessionId);
// session_id already captured
}
// Transform and send message to WebSocket
// Transform and normalize message via adapter
const transformedMessage = transformMessage(message);
ws.send({
type: 'claude-response',
data: transformedMessage,
sessionId: capturedSessionId || sessionId || null
});
const sid = capturedSessionId || sessionId || null;
// Use adapter to normalize SDK events into NormalizedMessage[]
const normalized = claudeAdapter.normalizeMessage(transformedMessage, sid);
for (const msg of normalized) {
// Preserve parentToolUseId from SDK wrapper for subagent tool grouping
if (transformedMessage.parentToolUseId && !msg.parentToolUseId) {
msg.parentToolUseId = transformedMessage.parentToolUseId;
}
ws.send(msg);
}
// Extract and send token budget updates from result messages
if (message.type === 'result') {
const models = Object.keys(message.modelUsage || {});
if (models.length > 0) {
console.log("---> Model was sent using:", models);
// Model info available in result message
}
const tokenBudget = extractTokenBudget(message);
if (tokenBudget) {
console.log('Token budget from modelUsage:', tokenBudget);
ws.send({
type: 'token-budget',
data: tokenBudget,
sessionId: capturedSessionId || sessionId || null
});
const tokenBudgetData = extractTokenBudget(message);
if (tokenBudgetData) {
ws.send(createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: tokenBudgetData, sessionId: capturedSessionId || sessionId || null, provider: 'claude' }));
}
}
}
@@ -696,13 +680,7 @@ async function queryClaudeSDK(command, options = {}, ws) {
await cleanupTempFiles(tempImagePaths, tempDir);
// Send completion event
console.log('Streaming complete, sending claude-complete event');
ws.send({
type: 'claude-complete',
sessionId: capturedSessionId,
exitCode: 0,
isNewSession: !sessionId && !!command
});
ws.send(createNormalizedMessage({ kind: 'complete', exitCode: 0, isNewSession: !sessionId && !!command, sessionId: capturedSessionId, provider: 'claude' }));
notifyRunStopped({
userId: ws?.userId || null,
provider: 'claude',
@@ -710,7 +688,7 @@ async function queryClaudeSDK(command, options = {}, ws) {
sessionName: sessionSummary,
stopReason: 'completed'
});
console.log('claude-complete event sent');
// Complete
} catch (error) {
console.error('SDK query error:', error);
@@ -724,11 +702,7 @@ async function queryClaudeSDK(command, options = {}, ws) {
await cleanupTempFiles(tempImagePaths, tempDir);
// Send error to WebSocket
ws.send({
type: 'claude-error',
error: error.message,
sessionId: capturedSessionId || sessionId || null
});
ws.send(createNormalizedMessage({ kind: 'error', content: error.message, sessionId: capturedSessionId || sessionId || null, provider: 'claude' }));
notifyRunFailed({
userId: ws?.userId || null,
provider: 'claude',

View File

@@ -1,6 +1,8 @@
import { spawn } from 'child_process';
import crossSpawn from 'cross-spawn';
import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js';
import { cursorAdapter } from './providers/cursor/adapter.js';
import { createNormalizedMessage } from './providers/types.js';
// Use cross-spawn on Windows for better command execution
const spawnFunction = process.platform === 'win32' ? crossSpawn : spawn;
@@ -172,75 +174,42 @@ async function spawnCursor(command, options = {}, ws) {
// Send session-created event only once for new sessions
if (!sessionId && !sessionCreatedSent) {
sessionCreatedSent = true;
ws.send({
type: 'session-created',
sessionId: capturedSessionId,
model: response.model,
cwd: response.cwd
});
ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, model: response.model, cwd: response.cwd, sessionId: capturedSessionId, provider: 'cursor' }));
}
}
// Send system info to frontend
ws.send({
type: 'cursor-system',
data: response,
sessionId: capturedSessionId || sessionId || null
});
// System info — no longer needed by the frontend (session-lifecycle 'created' handles nav).
}
break;
case 'user':
// Forward user message
ws.send({
type: 'cursor-user',
data: response,
sessionId: capturedSessionId || sessionId || null
});
// User messages are not displayed in the UI — skip.
break;
case 'assistant':
// Accumulate assistant message chunks
if (response.message && response.message.content && response.message.content.length > 0) {
const textContent = response.message.content[0].text;
// Send as Claude-compatible format for frontend
ws.send({
type: 'claude-response',
data: {
type: 'content_block_delta',
delta: {
type: 'text_delta',
text: textContent
}
},
sessionId: capturedSessionId || sessionId || null
});
const normalized = cursorAdapter.normalizeMessage(response, capturedSessionId || sessionId || null);
for (const msg of normalized) ws.send(msg);
}
break;
case 'result':
// Session complete
case 'result': {
// Session complete — send stream end + lifecycle complete with result payload
console.log('Cursor session result:', response);
// Do not emit an extra content_block_stop here.
// The UI already finalizes the streaming message in cursor-result handling,
// and emitting both can produce duplicate assistant messages.
ws.send({
type: 'cursor-result',
sessionId: capturedSessionId || sessionId,
data: response,
success: response.subtype === 'success'
});
const resultText = typeof response.result === 'string' ? response.result : '';
ws.send(createNormalizedMessage({
kind: 'complete',
exitCode: response.subtype === 'success' ? 0 : 1,
resultText,
isError: response.subtype !== 'success',
sessionId: capturedSessionId || sessionId, provider: 'cursor',
}));
break;
}
default:
// Forward any other message types
ws.send({
type: 'cursor-response',
data: response,
sessionId: capturedSessionId || sessionId || null
});
// Unknown message types — ignore.
}
} catch (parseError) {
console.log('Non-JSON response:', line);
@@ -249,12 +218,9 @@ async function spawnCursor(command, options = {}, ws) {
return;
}
// If not JSON, send as raw text
ws.send({
type: 'cursor-output',
data: line,
sessionId: capturedSessionId || sessionId || null
});
// If not JSON, send as stream delta via adapter
const normalized = cursorAdapter.normalizeMessage(line, capturedSessionId || sessionId || null);
for (const msg of normalized) ws.send(msg);
}
};
@@ -282,12 +248,7 @@ async function spawnCursor(command, options = {}, ws) {
return;
}
ws.send({
type: 'cursor-error',
error: stderrText,
sessionId: capturedSessionId || sessionId || null,
provider: 'cursor'
});
ws.send(createNormalizedMessage({ kind: 'error', content: stderrText, sessionId: capturedSessionId || sessionId || null, provider: 'cursor' }));
});
// Handle process completion
@@ -314,13 +275,7 @@ async function spawnCursor(command, options = {}, ws) {
return;
}
ws.send({
type: 'claude-complete',
sessionId: finalSessionId,
exitCode: code,
provider: 'cursor',
isNewSession: !sessionId && !!command // Flag to indicate this was a new session
});
ws.send(createNormalizedMessage({ kind: 'complete', exitCode: code, isNewSession: !sessionId && !!command, sessionId: finalSessionId, provider: 'cursor' }));
if (code === 0) {
notifyTerminalState({ code });
@@ -339,12 +294,7 @@ async function spawnCursor(command, options = {}, ws) {
const finalSessionId = capturedSessionId || sessionId || processKey;
activeCursorProcesses.delete(finalSessionId);
ws.send({
type: 'cursor-error',
error: error.message,
sessionId: capturedSessionId || sessionId || null,
provider: 'cursor'
});
ws.send(createNormalizedMessage({ kind: 'error', content: error.message, sessionId: capturedSessionId || sessionId || null, provider: 'cursor' }));
notifyTerminalState({ error });
settleOnce(() => reject(error));

View File

@@ -6,15 +6,15 @@ const spawnFunction = process.platform === 'win32' ? crossSpawn : spawn;
import { promises as fs } from 'fs';
import path from 'path';
import os from 'os';
import { getSessions, getSessionMessages } from './projects.js';
import sessionManager from './sessionManager.js';
import GeminiResponseHandler from './gemini-response-handler.js';
import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js';
import { createNormalizedMessage } from './providers/types.js';
let activeGeminiProcesses = new Map(); // Track active processes by session ID
async function spawnGemini(command, options = {}, ws) {
const { sessionId, projectPath, cwd, resume, toolsSettings, permissionMode, images, sessionSummary } = options;
const { sessionId, projectPath, cwd, toolsSettings, permissionMode, images, sessionSummary } = options;
let capturedSessionId = sessionId; // Track session ID throughout the process
let sessionCreatedSent = false; // Track if we've already sent session-created event
let assistantBlocks = []; // Accumulate the full response blocks including tools
@@ -219,7 +219,6 @@ async function spawnGemini(command, options = {}, ws) {
geminiProcess.stdin.end();
// Add timeout handler
let hasReceivedOutput = false;
const timeoutMs = 120000; // 120 seconds for slower models
let timeout;
@@ -228,12 +227,7 @@ async function spawnGemini(command, options = {}, ws) {
timeout = setTimeout(() => {
const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : (capturedSessionId || sessionId || processKey);
terminalFailureReason = `Gemini CLI timeout - no response received for ${timeoutMs / 1000} seconds`;
ws.send({
type: 'gemini-error',
sessionId: socketSessionId,
error: terminalFailureReason,
provider: 'gemini'
});
ws.send(createNormalizedMessage({ kind: 'error', content: terminalFailureReason, sessionId: socketSessionId, provider: 'gemini' }));
try {
geminiProcess.kill('SIGTERM');
} catch (e) { }
@@ -295,7 +289,6 @@ async function spawnGemini(command, options = {}, ws) {
// Handle stdout
geminiProcess.stdout.on('data', (data) => {
const rawOutput = data.toString();
hasReceivedOutput = true;
startTimeout(); // Re-arm the timeout
// For new sessions, create a session ID FIRST
@@ -319,21 +312,7 @@ async function spawnGemini(command, options = {}, ws) {
ws.setSessionId && typeof ws.setSessionId === 'function' && ws.setSessionId(capturedSessionId);
ws.send({
type: 'session-created',
sessionId: capturedSessionId
});
// Emit fake system init so the frontend immediately navigates and saves the session
ws.send({
type: 'claude-response',
sessionId: capturedSessionId,
data: {
type: 'system',
subtype: 'init',
session_id: capturedSessionId
}
});
ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'gemini' }));
}
if (responseHandler) {
@@ -346,14 +325,7 @@ async function spawnGemini(command, options = {}, ws) {
assistantBlocks.push({ type: 'text', text: rawOutput });
}
const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : (capturedSessionId || sessionId);
ws.send({
type: 'gemini-response',
sessionId: socketSessionId,
data: {
type: 'message',
content: rawOutput
}
});
ws.send(createNormalizedMessage({ kind: 'stream_delta', content: rawOutput, sessionId: socketSessionId, provider: 'gemini' }));
}
});
@@ -370,12 +342,7 @@ async function spawnGemini(command, options = {}, ws) {
}
const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : (capturedSessionId || sessionId);
ws.send({
type: 'gemini-error',
sessionId: socketSessionId,
error: errorMsg,
provider: 'gemini'
});
ws.send(createNormalizedMessage({ kind: 'error', content: errorMsg, sessionId: socketSessionId, provider: 'gemini' }));
});
// Handle process completion
@@ -397,13 +364,7 @@ async function spawnGemini(command, options = {}, ws) {
sessionManager.addMessage(finalSessionId, 'assistant', assistantBlocks);
}
ws.send({
type: 'claude-complete', // Use claude-complete for compatibility with UI
sessionId: finalSessionId,
exitCode: code,
provider: 'gemini',
isNewSession: !sessionId && !!command // Flag to indicate this was a new session
});
ws.send(createNormalizedMessage({ kind: 'complete', exitCode: code, isNewSession: !sessionId && !!command, sessionId: finalSessionId, provider: 'gemini' }));
// Clean up temporary image files if any
if (geminiProcess.tempImagePaths && geminiProcess.tempImagePaths.length > 0) {
@@ -434,12 +395,7 @@ async function spawnGemini(command, options = {}, ws) {
activeGeminiProcesses.delete(finalSessionId);
const errorSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : finalSessionId;
ws.send({
type: 'gemini-error',
sessionId: errorSessionId,
error: error.message,
provider: 'gemini'
});
ws.send(createNormalizedMessage({ kind: 'error', content: error.message, sessionId: errorSessionId, provider: 'gemini' }));
notifyTerminalState({ error });
reject(error);

View File

@@ -1,4 +1,6 @@
// Gemini Response Handler - JSON Stream processing
import { geminiAdapter } from './providers/gemini/adapter.js';
class GeminiResponseHandler {
constructor(ws, options = {}) {
this.ws = ws;
@@ -27,13 +29,12 @@ class GeminiResponseHandler {
this.handleEvent(event);
} catch (err) {
// Not a JSON line, probably debug output or CLI warnings
// console.error('[Gemini Handler] Non-JSON line ignored:', line);
}
}
}
handleEvent(event) {
const socketSessionId = typeof this.ws.getSessionId === 'function' ? this.ws.getSessionId() : null;
const sid = typeof this.ws.getSessionId === 'function' ? this.ws.getSessionId() : null;
if (event.type === 'init') {
if (this.onInit) {
@@ -42,88 +43,26 @@ class GeminiResponseHandler {
return;
}
// Invoke per-type callbacks for session tracking
if (event.type === 'message' && event.role === 'assistant') {
const content = event.content || '';
// Notify the parent CLI handler of accumulated text
if (this.onContentFragment && content) {
this.onContentFragment(content);
}
} else if (event.type === 'tool_use' && this.onToolUse) {
this.onToolUse(event);
} else if (event.type === 'tool_result' && this.onToolResult) {
this.onToolResult(event);
}
let payload = {
type: 'gemini-response',
data: {
type: 'message',
content: content,
isPartial: event.delta === true
}
};
if (socketSessionId) payload.sessionId = socketSessionId;
this.ws.send(payload);
}
else if (event.type === 'tool_use') {
if (this.onToolUse) {
this.onToolUse(event);
}
let payload = {
type: 'gemini-tool-use',
toolName: event.tool_name,
toolId: event.tool_id,
parameters: event.parameters || {}
};
if (socketSessionId) payload.sessionId = socketSessionId;
this.ws.send(payload);
}
else if (event.type === 'tool_result') {
if (this.onToolResult) {
this.onToolResult(event);
}
let payload = {
type: 'gemini-tool-result',
toolId: event.tool_id,
status: event.status,
output: event.output || ''
};
if (socketSessionId) payload.sessionId = socketSessionId;
this.ws.send(payload);
}
else if (event.type === 'result') {
// Send a finalize message string
let payload = {
type: 'gemini-response',
data: {
type: 'message',
content: '',
isPartial: false
}
};
if (socketSessionId) payload.sessionId = socketSessionId;
this.ws.send(payload);
if (event.stats && event.stats.total_tokens) {
let statsPayload = {
type: 'claude-status',
data: {
status: 'Complete',
tokens: event.stats.total_tokens
}
};
if (socketSessionId) statsPayload.sessionId = socketSessionId;
this.ws.send(statsPayload);
}
}
else if (event.type === 'error') {
let payload = {
type: 'gemini-error',
error: event.error || event.message || 'Unknown Gemini streaming error'
};
if (socketSessionId) payload.sessionId = socketSessionId;
this.ws.send(payload);
// Normalize via adapter and send all resulting messages
const normalized = geminiAdapter.normalizeMessage(event, sid);
for (const msg of normalized) {
this.ws.send(msg);
}
}
forceFlush() {
// If the buffer has content, try to parse it one last time
if (this.buffer.trim()) {
try {
const event = JSON.parse(this.buffer);

View File

@@ -44,7 +44,7 @@ import pty from 'node-pty';
import fetch from 'node-fetch';
import mime from 'mime-types';
import { getProjects, getSessions, getSessionMessages, renameProject, deleteSession, deleteProject, addProjectManually, extractProjectDirectory, clearProjectDirectoryCache, searchConversations } from './projects.js';
import { getProjects, getSessions, renameProject, deleteSession, deleteProject, addProjectManually, extractProjectDirectory, clearProjectDirectoryCache, searchConversations } from './projects.js';
import { queryClaudeSDK, abortClaudeSDKSession, isClaudeSDKSessionActive, getActiveClaudeSDKSessions, resolveToolApproval, getPendingApprovalsForSession, reconnectSessionWriter } from './claude-sdk.js';
import { spawnCursor, abortCursorSession, isCursorSessionActive, getActiveCursorSessions } from './cursor-cli.js';
import { queryCodex, abortCodexSession, isCodexSessionActive, getActiveCodexSessions } from './openai-codex.js';
@@ -65,6 +65,8 @@ import userRoutes from './routes/user.js';
import codexRoutes from './routes/codex.js';
import geminiRoutes from './routes/gemini.js';
import pluginsRoutes from './routes/plugins.js';
import messagesRoutes from './routes/messages.js';
import { createNormalizedMessage } from './providers/types.js';
import { startEnabledPluginServers, stopAllPlugins, getPluginPort } from './utils/plugin-process-manager.js';
import { initializeDatabase, sessionNamesDb, applyCustomSessionNames } from './database/db.js';
import { configureWebPush } from './services/vapid-keys.js';
@@ -396,6 +398,9 @@ app.use('/api/gemini', authenticateToken, geminiRoutes);
// Plugins API Routes (protected)
app.use('/api/plugins', authenticateToken, pluginsRoutes);
// Unified session messages route (protected)
app.use('/api/sessions', authenticateToken, messagesRoutes);
// Agent API Routes (uses API key authentication)
app.use('/api/agent', agentRoutes);
@@ -509,31 +514,6 @@ app.get('/api/projects/:projectName/sessions', authenticateToken, async (req, re
}
});
// Get messages for a specific session
app.get('/api/projects/:projectName/sessions/:sessionId/messages', authenticateToken, async (req, res) => {
try {
const { projectName, sessionId } = req.params;
const { limit, offset } = req.query;
// Parse limit and offset if provided
const parsedLimit = limit ? parseInt(limit, 10) : null;
const parsedOffset = offset ? parseInt(offset, 10) : 0;
const result = await getSessionMessages(projectName, sessionId, parsedLimit, parsedOffset);
// Handle both old and new response formats
if (Array.isArray(result)) {
// Backward compatibility: no pagination parameters were provided
res.json({ messages: result });
} else {
// New format with pagination info
res.json(result);
}
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Rename project endpoint
app.put('/api/projects/:projectName/rename', authenticateToken, async (req, res) => {
try {
@@ -958,7 +938,6 @@ app.get('/api/projects/:projectName/files', authenticateToken, async (req, res)
}
const files = await getFileTree(actualPath, 10, 0, true);
const hiddenFiles = files.filter(f => f.name.startsWith('.'));
res.json(files);
} catch (error) {
console.error('[ERROR] File tree error:', error.message);
@@ -1463,6 +1442,10 @@ wss.on('connection', (ws, request) => {
/**
* WebSocket Writer - Wrapper for WebSocket to match SSEStreamWriter interface
*
* Provider files use `createNormalizedMessage()` from `providers/types.js` and
* adapter `normalizeMessage()` to produce unified NormalizedMessage events.
* The writer simply serialises and sends.
*/
class WebSocketWriter {
constructor(ws, userId = null) {
@@ -1474,7 +1457,6 @@ class WebSocketWriter {
send(data) {
if (this.ws.readyState === 1) { // WebSocket.OPEN
// Providers send raw objects, we stringify for WebSocket
this.ws.send(JSON.stringify(data));
}
}
@@ -1555,12 +1537,7 @@ function handleChatConnection(ws, request) {
success = await abortClaudeSDKSession(data.sessionId);
}
writer.send({
type: 'session-aborted',
sessionId: data.sessionId,
provider,
success
});
writer.send(createNormalizedMessage({ kind: 'complete', exitCode: success ? 0 : 1, aborted: true, success, sessionId: data.sessionId, provider }));
} else if (data.type === 'claude-permission-response') {
// Relay UI approval decisions back into the SDK control flow.
// This does not persist permissions; it only resolves the in-flight request,
@@ -1576,12 +1553,7 @@ function handleChatConnection(ws, request) {
} else if (data.type === 'cursor-abort') {
console.log('[DEBUG] Abort Cursor session:', data.sessionId);
const success = abortCursorSession(data.sessionId);
writer.send({
type: 'session-aborted',
sessionId: data.sessionId,
provider: 'cursor',
success
});
writer.send(createNormalizedMessage({ kind: 'complete', exitCode: success ? 0 : 1, aborted: true, success, sessionId: data.sessionId, provider: 'cursor' }));
} else if (data.type === 'check-session-status') {
// Check if a specific session is currently processing
const provider = data.provider || 'claude';

View File

@@ -15,6 +15,8 @@
import { Codex } from '@openai/codex-sdk';
import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js';
import { codexAdapter } from './providers/codex/adapter.js';
import { createNormalizedMessage } from './providers/types.js';
// Track active sessions
const activeCodexSessions = new Map();
@@ -241,11 +243,7 @@ export async function queryCodex(command, options = {}, ws) {
});
// Send session created event
sendMessage(ws, {
type: 'session-created',
sessionId: currentSessionId,
provider: 'codex'
});
sendMessage(ws, createNormalizedMessage({ kind: 'session_created', newSessionId: currentSessionId, sessionId: currentSessionId, provider: 'codex' }));
// Execute with streaming
const streamedTurn = await thread.runStreamed(command, {
@@ -265,11 +263,11 @@ export async function queryCodex(command, options = {}, ws) {
const transformed = transformCodexEvent(event);
sendMessage(ws, {
type: 'codex-response',
data: transformed,
sessionId: currentSessionId
});
// Normalize the transformed event into NormalizedMessage(s) via adapter
const normalizedMsgs = codexAdapter.normalizeMessage(transformed, currentSessionId);
for (const msg of normalizedMsgs) {
sendMessage(ws, msg);
}
if (event.type === 'turn.failed' && !terminalFailure) {
terminalFailure = event.error || new Error('Turn failed');
@@ -285,25 +283,13 @@ export async function queryCodex(command, options = {}, ws) {
// Extract and send token usage if available (normalized to match Claude format)
if (event.type === 'turn.completed' && event.usage) {
const totalTokens = (event.usage.input_tokens || 0) + (event.usage.output_tokens || 0);
sendMessage(ws, {
type: 'token-budget',
data: {
used: totalTokens,
total: 200000 // Default context window for Codex models
},
sessionId: currentSessionId
});
sendMessage(ws, createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: { used: totalTokens, total: 200000 }, sessionId: currentSessionId, provider: 'codex' }));
}
}
// Send completion event
if (!terminalFailure) {
sendMessage(ws, {
type: 'codex-complete',
sessionId: currentSessionId,
actualSessionId: thread.id,
provider: 'codex'
});
sendMessage(ws, createNormalizedMessage({ kind: 'complete', actualSessionId: thread.id, sessionId: currentSessionId, provider: 'codex' }));
notifyRunStopped({
userId: ws?.userId || null,
provider: 'codex',
@@ -322,12 +308,7 @@ export async function queryCodex(command, options = {}, ws) {
if (!wasAborted) {
console.error('[Codex] Error:', error);
sendMessage(ws, {
type: 'codex-error',
error: error.message,
sessionId: currentSessionId,
provider: 'codex'
});
sendMessage(ws, createNormalizedMessage({ kind: 'error', content: error.message, sessionId: currentSessionId, provider: 'codex' }));
if (!terminalFailure) {
notifyRunFailed({
userId: ws?.userId || null,

View File

@@ -1014,7 +1014,7 @@ async function getSessionMessages(projectName, sessionId, limit = null, offset =
messages.push(entry);
}
} catch (parseError) {
console.warn('Error parsing line:', parseError.message);
// Silently skip malformed JSONL lines (common with concurrent writes)
}
}
}

View File

@@ -0,0 +1,278 @@
/**
* Claude provider adapter.
*
* Normalizes Claude SDK session history into NormalizedMessage format.
* @module adapters/claude
*/
import { getSessionMessages } from '../../projects.js';
import { createNormalizedMessage, generateMessageId } from '../types.js';
import { isInternalContent } from '../utils.js';
const PROVIDER = 'claude';
/**
* Normalize a raw JSONL message or realtime SDK event into NormalizedMessage(s).
* Handles both history entries (JSONL `{ message: { role, content } }`) and
* realtime streaming events (`content_block_delta`, `content_block_stop`, etc.).
* @param {object} raw - A single entry from JSONL or a live SDK event
* @param {string} sessionId
* @returns {import('../types.js').NormalizedMessage[]}
*/
export function normalizeMessage(raw, sessionId) {
// ── Streaming events (realtime) ──────────────────────────────────────────
if (raw.type === 'content_block_delta' && raw.delta?.text) {
return [createNormalizedMessage({ kind: 'stream_delta', content: raw.delta.text, sessionId, provider: PROVIDER })];
}
if (raw.type === 'content_block_stop') {
return [createNormalizedMessage({ kind: 'stream_end', sessionId, provider: PROVIDER })];
}
// ── History / full-message events ────────────────────────────────────────
const messages = [];
const ts = raw.timestamp || new Date().toISOString();
const baseId = raw.uuid || generateMessageId('claude');
// User message
if (raw.message?.role === 'user' && raw.message?.content) {
if (Array.isArray(raw.message.content)) {
// Handle tool_result parts
for (const part of raw.message.content) {
if (part.type === 'tool_result') {
messages.push(createNormalizedMessage({
id: `${baseId}_tr_${part.tool_use_id}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_result',
toolId: part.tool_use_id,
content: typeof part.content === 'string' ? part.content : JSON.stringify(part.content),
isError: Boolean(part.is_error),
subagentTools: raw.subagentTools,
toolUseResult: raw.toolUseResult,
}));
} else if (part.type === 'text') {
// Regular text parts from user
const text = part.text || '';
if (text && !isInternalContent(text)) {
messages.push(createNormalizedMessage({
id: `${baseId}_text`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'user',
content: text,
}));
}
}
}
// If no text parts were found, check if it's a pure user message
if (messages.length === 0) {
const textParts = raw.message.content
.filter(p => p.type === 'text')
.map(p => p.text)
.filter(Boolean)
.join('\n');
if (textParts && !isInternalContent(textParts)) {
messages.push(createNormalizedMessage({
id: `${baseId}_text`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'user',
content: textParts,
}));
}
}
} else if (typeof raw.message.content === 'string') {
const text = raw.message.content;
if (text && !isInternalContent(text)) {
messages.push(createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'user',
content: text,
}));
}
}
return messages;
}
// Thinking message
if (raw.type === 'thinking' && raw.message?.content) {
messages.push(createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'thinking',
content: raw.message.content,
}));
return messages;
}
// Tool use result (codex-style in Claude)
if (raw.type === 'tool_use' && raw.toolName) {
messages.push(createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: raw.toolName,
toolInput: raw.toolInput,
toolId: raw.toolCallId || baseId,
}));
return messages;
}
if (raw.type === 'tool_result') {
messages.push(createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_result',
toolId: raw.toolCallId || '',
content: raw.output || '',
isError: false,
}));
return messages;
}
// Assistant message
if (raw.message?.role === 'assistant' && raw.message?.content) {
if (Array.isArray(raw.message.content)) {
let partIndex = 0;
for (const part of raw.message.content) {
if (part.type === 'text' && part.text) {
messages.push(createNormalizedMessage({
id: `${baseId}_${partIndex}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'assistant',
content: part.text,
}));
} else if (part.type === 'tool_use') {
messages.push(createNormalizedMessage({
id: `${baseId}_${partIndex}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: part.name,
toolInput: part.input,
toolId: part.id,
}));
} else if (part.type === 'thinking' && part.thinking) {
messages.push(createNormalizedMessage({
id: `${baseId}_${partIndex}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'thinking',
content: part.thinking,
}));
}
partIndex++;
}
} else if (typeof raw.message.content === 'string') {
messages.push(createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'assistant',
content: raw.message.content,
}));
}
return messages;
}
return messages;
}
/**
* @type {import('../types.js').ProviderAdapter}
*/
export const claudeAdapter = {
normalizeMessage,
/**
* Fetch session history from JSONL files, returning normalized messages.
*/
async fetchHistory(sessionId, opts = {}) {
const { projectName, limit = null, offset = 0 } = opts;
if (!projectName) {
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
}
let result;
try {
result = await getSessionMessages(projectName, sessionId, limit, offset);
} catch (error) {
console.warn(`[ClaudeAdapter] Failed to load session ${sessionId}:`, error.message);
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
}
// getSessionMessages returns either an array (no limit) or { messages, total, hasMore }
const rawMessages = Array.isArray(result) ? result : (result.messages || []);
const total = Array.isArray(result) ? rawMessages.length : (result.total || 0);
const hasMore = Array.isArray(result) ? false : Boolean(result.hasMore);
// First pass: collect tool results for attachment to tool_use messages
const toolResultMap = new Map();
for (const raw of rawMessages) {
if (raw.message?.role === 'user' && Array.isArray(raw.message?.content)) {
for (const part of raw.message.content) {
if (part.type === 'tool_result') {
toolResultMap.set(part.tool_use_id, {
content: part.content,
isError: Boolean(part.is_error),
timestamp: raw.timestamp,
subagentTools: raw.subagentTools,
toolUseResult: raw.toolUseResult,
});
}
}
}
}
// Second pass: normalize all messages
const normalized = [];
for (const raw of rawMessages) {
const entries = normalizeMessage(raw, sessionId);
normalized.push(...entries);
}
// Attach tool results to their corresponding tool_use messages
for (const msg of normalized) {
if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) {
const tr = toolResultMap.get(msg.toolId);
msg.toolResult = {
content: typeof tr.content === 'string' ? tr.content : JSON.stringify(tr.content),
isError: tr.isError,
toolUseResult: tr.toolUseResult,
};
msg.subagentTools = tr.subagentTools;
}
}
return {
messages: normalized,
total,
hasMore,
offset,
limit,
};
},
};

View File

@@ -0,0 +1,248 @@
/**
* Codex (OpenAI) provider adapter.
*
* Normalizes Codex SDK session history into NormalizedMessage format.
* @module adapters/codex
*/
import { getCodexSessionMessages } from '../../projects.js';
import { createNormalizedMessage, generateMessageId } from '../types.js';
const PROVIDER = 'codex';
/**
* Normalize a raw Codex JSONL message into NormalizedMessage(s).
* @param {object} raw - A single parsed message from Codex JSONL
* @param {string} sessionId
* @returns {import('../types.js').NormalizedMessage[]}
*/
function normalizeCodexHistoryEntry(raw, sessionId) {
const ts = raw.timestamp || new Date().toISOString();
const baseId = raw.uuid || generateMessageId('codex');
// User message
if (raw.message?.role === 'user') {
const content = typeof raw.message.content === 'string'
? raw.message.content
: Array.isArray(raw.message.content)
? raw.message.content.map(p => typeof p === 'string' ? p : p?.text || '').filter(Boolean).join('\n')
: String(raw.message.content || '');
if (!content.trim()) return [];
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'user',
content,
})];
}
// Assistant message
if (raw.message?.role === 'assistant') {
const content = typeof raw.message.content === 'string'
? raw.message.content
: Array.isArray(raw.message.content)
? raw.message.content.map(p => typeof p === 'string' ? p : p?.text || '').filter(Boolean).join('\n')
: '';
if (!content.trim()) return [];
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'assistant',
content,
})];
}
// Thinking/reasoning
if (raw.type === 'thinking' || raw.isReasoning) {
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'thinking',
content: raw.message?.content || '',
})];
}
// Tool use
if (raw.type === 'tool_use' || raw.toolName) {
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: raw.toolName || 'Unknown',
toolInput: raw.toolInput,
toolId: raw.toolCallId || baseId,
})];
}
// Tool result
if (raw.type === 'tool_result') {
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_result',
toolId: raw.toolCallId || '',
content: raw.output || '',
isError: Boolean(raw.isError),
})];
}
return [];
}
/**
* Normalize a raw Codex event (history JSONL or transformed SDK event) into NormalizedMessage(s).
* @param {object} raw - A history entry (has raw.message.role) or transformed SDK event (has raw.type)
* @param {string} sessionId
* @returns {import('../types.js').NormalizedMessage[]}
*/
export function normalizeMessage(raw, sessionId) {
// History format: has message.role
if (raw.message?.role) {
return normalizeCodexHistoryEntry(raw, sessionId);
}
const ts = raw.timestamp || new Date().toISOString();
const baseId = raw.uuid || generateMessageId('codex');
// SDK event format (output of transformCodexEvent)
if (raw.type === 'item') {
switch (raw.itemType) {
case 'agent_message':
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'text', role: 'assistant', content: raw.message?.content || '',
})];
case 'reasoning':
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'thinking', content: raw.message?.content || '',
})];
case 'command_execution':
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'tool_use', toolName: 'Bash', toolInput: { command: raw.command },
toolId: baseId,
output: raw.output, exitCode: raw.exitCode, status: raw.status,
})];
case 'file_change':
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'tool_use', toolName: 'FileChanges', toolInput: raw.changes,
toolId: baseId, status: raw.status,
})];
case 'mcp_tool_call':
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'tool_use', toolName: raw.tool || 'MCP', toolInput: raw.arguments,
toolId: baseId, server: raw.server, result: raw.result,
error: raw.error, status: raw.status,
})];
case 'web_search':
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'tool_use', toolName: 'WebSearch', toolInput: { query: raw.query },
toolId: baseId,
})];
case 'todo_list':
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'tool_use', toolName: 'TodoList', toolInput: { items: raw.items },
toolId: baseId,
})];
case 'error':
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'error', content: raw.message?.content || 'Unknown error',
})];
default:
// Unknown item type — pass through as generic tool_use
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'tool_use', toolName: raw.itemType || 'Unknown',
toolInput: raw.item || raw, toolId: baseId,
})];
}
}
if (raw.type === 'turn_complete') {
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'complete',
})];
}
if (raw.type === 'turn_failed') {
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'error', content: raw.error?.message || 'Turn failed',
})];
}
return [];
}
/**
* @type {import('../types.js').ProviderAdapter}
*/
export const codexAdapter = {
normalizeMessage,
/**
* Fetch session history from Codex JSONL files.
*/
async fetchHistory(sessionId, opts = {}) {
const { limit = null, offset = 0 } = opts;
let result;
try {
result = await getCodexSessionMessages(sessionId, limit, offset);
} catch (error) {
console.warn(`[CodexAdapter] Failed to load session ${sessionId}:`, error.message);
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
}
const rawMessages = Array.isArray(result) ? result : (result.messages || []);
const total = Array.isArray(result) ? rawMessages.length : (result.total || 0);
const hasMore = Array.isArray(result) ? false : Boolean(result.hasMore);
const tokenUsage = result.tokenUsage || null;
const normalized = [];
for (const raw of rawMessages) {
const entries = normalizeCodexHistoryEntry(raw, sessionId);
normalized.push(...entries);
}
// Attach tool results to tool_use messages
const toolResultMap = new Map();
for (const msg of normalized) {
if (msg.kind === 'tool_result' && msg.toolId) {
toolResultMap.set(msg.toolId, msg);
}
}
for (const msg of normalized) {
if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) {
const tr = toolResultMap.get(msg.toolId);
msg.toolResult = { content: tr.content, isError: tr.isError };
}
}
return {
messages: normalized,
total,
hasMore,
offset,
limit,
tokenUsage,
};
},
};

View File

@@ -0,0 +1,353 @@
/**
* Cursor provider adapter.
*
* Normalizes Cursor CLI session history into NormalizedMessage format.
* @module adapters/cursor
*/
import path from 'path';
import os from 'os';
import crypto from 'crypto';
import { createNormalizedMessage, generateMessageId } from '../types.js';
const PROVIDER = 'cursor';
/**
* Load raw blobs from Cursor's SQLite store.db, parse the DAG structure,
* and return sorted message blobs in chronological order.
* @param {string} sessionId
* @param {string} projectPath - Absolute project path (used to compute cwdId hash)
* @returns {Promise<Array<{id: string, sequence: number, rowid: number, content: object}>>}
*/
async function loadCursorBlobs(sessionId, projectPath) {
// Lazy-import sqlite so the module doesn't fail if sqlite3 is unavailable
const { default: sqlite3 } = await import('sqlite3');
const { open } = await import('sqlite');
const cwdId = crypto.createHash('md5').update(projectPath || process.cwd()).digest('hex');
const storeDbPath = path.join(os.homedir(), '.cursor', 'chats', cwdId, sessionId, 'store.db');
const db = await open({
filename: storeDbPath,
driver: sqlite3.Database,
mode: sqlite3.OPEN_READONLY,
});
try {
const allBlobs = await db.all('SELECT rowid, id, data FROM blobs');
const blobMap = new Map();
const parentRefs = new Map();
const childRefs = new Map();
const jsonBlobs = [];
for (const blob of allBlobs) {
blobMap.set(blob.id, blob);
if (blob.data && blob.data[0] === 0x7B) {
try {
const parsed = JSON.parse(blob.data.toString('utf8'));
jsonBlobs.push({ ...blob, parsed });
} catch {
// skip unparseable blobs
}
} else if (blob.data) {
const parents = [];
let i = 0;
while (i < blob.data.length - 33) {
if (blob.data[i] === 0x0A && blob.data[i + 1] === 0x20) {
const parentHash = blob.data.slice(i + 2, i + 34).toString('hex');
if (blobMap.has(parentHash)) {
parents.push(parentHash);
}
i += 34;
} else {
i++;
}
}
if (parents.length > 0) {
parentRefs.set(blob.id, parents);
for (const parentId of parents) {
if (!childRefs.has(parentId)) childRefs.set(parentId, []);
childRefs.get(parentId).push(blob.id);
}
}
}
}
// Topological sort (DFS)
const visited = new Set();
const sorted = [];
function visit(nodeId) {
if (visited.has(nodeId)) return;
visited.add(nodeId);
for (const pid of (parentRefs.get(nodeId) || [])) visit(pid);
const b = blobMap.get(nodeId);
if (b) sorted.push(b);
}
for (const blob of allBlobs) {
if (!parentRefs.has(blob.id)) visit(blob.id);
}
for (const blob of allBlobs) visit(blob.id);
// Order JSON blobs by DAG appearance
const messageOrder = new Map();
let orderIndex = 0;
for (const blob of sorted) {
if (blob.data && blob.data[0] !== 0x7B) {
for (const jb of jsonBlobs) {
try {
const idBytes = Buffer.from(jb.id, 'hex');
if (blob.data.includes(idBytes) && !messageOrder.has(jb.id)) {
messageOrder.set(jb.id, orderIndex++);
}
} catch { /* skip */ }
}
}
}
const sortedJsonBlobs = jsonBlobs.sort((a, b) => {
const oa = messageOrder.get(a.id) ?? Number.MAX_SAFE_INTEGER;
const ob = messageOrder.get(b.id) ?? Number.MAX_SAFE_INTEGER;
return oa !== ob ? oa - ob : a.rowid - b.rowid;
});
const messages = [];
for (let idx = 0; idx < sortedJsonBlobs.length; idx++) {
const blob = sortedJsonBlobs[idx];
const parsed = blob.parsed;
if (!parsed) continue;
const role = parsed?.role || parsed?.message?.role;
if (role === 'system') continue;
messages.push({
id: blob.id,
sequence: idx + 1,
rowid: blob.rowid,
content: parsed,
});
}
return messages;
} finally {
await db.close();
}
}
/**
* Normalize a realtime NDJSON event from Cursor CLI into NormalizedMessage(s).
* History uses normalizeCursorBlobs (SQLite DAG), this handles streaming NDJSON.
* @param {object|string} raw - A parsed NDJSON event or a raw text line
* @param {string} sessionId
* @returns {import('../types.js').NormalizedMessage[]}
*/
export function normalizeMessage(raw, sessionId) {
// Structured assistant message with content array
if (raw && typeof raw === 'object' && raw.type === 'assistant' && raw.message?.content?.[0]?.text) {
return [createNormalizedMessage({ kind: 'stream_delta', content: raw.message.content[0].text, sessionId, provider: PROVIDER })];
}
// Plain string line (non-JSON output)
if (typeof raw === 'string' && raw.trim()) {
return [createNormalizedMessage({ kind: 'stream_delta', content: raw, sessionId, provider: PROVIDER })];
}
return [];
}
/**
* @type {import('../types.js').ProviderAdapter}
*/
export const cursorAdapter = {
normalizeMessage,
/**
* Fetch session history for Cursor from SQLite store.db.
*/
async fetchHistory(sessionId, opts = {}) {
const { projectPath = '', limit = null, offset = 0 } = opts;
try {
const blobs = await loadCursorBlobs(sessionId, projectPath);
const allNormalized = cursorAdapter.normalizeCursorBlobs(blobs, sessionId);
// Apply pagination
if (limit !== null && limit > 0) {
const start = offset;
const page = allNormalized.slice(start, start + limit);
return {
messages: page,
total: allNormalized.length,
hasMore: start + limit < allNormalized.length,
offset,
limit,
};
}
return {
messages: allNormalized,
total: allNormalized.length,
hasMore: false,
offset: 0,
limit: null,
};
} catch (error) {
// DB doesn't exist or is unreadable — return empty
console.warn(`[CursorAdapter] Failed to load session ${sessionId}:`, error.message);
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
}
},
/**
* Normalize raw Cursor blob messages into NormalizedMessage[].
* @param {any[]} blobs - Raw cursor blobs from store.db ({id, sequence, rowid, content})
* @param {string} sessionId
* @returns {import('../types.js').NormalizedMessage[]}
*/
normalizeCursorBlobs(blobs, sessionId) {
const messages = [];
const toolUseMap = new Map();
// Use a fixed base timestamp so messages have stable, monotonically-increasing
// timestamps based on their sequence number rather than wall-clock time.
const baseTime = Date.now();
for (let i = 0; i < blobs.length; i++) {
const blob = blobs[i];
const content = blob.content;
const ts = new Date(baseTime + (blob.sequence ?? i) * 100).toISOString();
const baseId = blob.id || generateMessageId('cursor');
try {
if (!content?.role || !content?.content) {
// Try nested message format
if (content?.message?.role && content?.message?.content) {
if (content.message.role === 'system') continue;
const role = content.message.role === 'user' ? 'user' : 'assistant';
let text = '';
if (Array.isArray(content.message.content)) {
text = content.message.content
.map(p => typeof p === 'string' ? p : p?.text || '')
.filter(Boolean)
.join('\n');
} else if (typeof content.message.content === 'string') {
text = content.message.content;
}
if (text?.trim()) {
messages.push(createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role,
content: text,
sequence: blob.sequence,
rowid: blob.rowid,
}));
}
}
continue;
}
if (content.role === 'system') continue;
// Tool results
if (content.role === 'tool') {
const toolItems = Array.isArray(content.content) ? content.content : [];
for (const item of toolItems) {
if (item?.type !== 'tool-result') continue;
const toolCallId = item.toolCallId || content.id;
messages.push(createNormalizedMessage({
id: `${baseId}_tr`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_result',
toolId: toolCallId,
content: item.result || '',
isError: false,
}));
}
continue;
}
const role = content.role === 'user' ? 'user' : 'assistant';
if (Array.isArray(content.content)) {
for (let partIdx = 0; partIdx < content.content.length; partIdx++) {
const part = content.content[partIdx];
if (part?.type === 'text' && part?.text) {
messages.push(createNormalizedMessage({
id: `${baseId}_${partIdx}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role,
content: part.text,
sequence: blob.sequence,
rowid: blob.rowid,
}));
} else if (part?.type === 'reasoning' && part?.text) {
messages.push(createNormalizedMessage({
id: `${baseId}_${partIdx}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'thinking',
content: part.text,
}));
} else if (part?.type === 'tool-call' || part?.type === 'tool_use') {
const toolName = (part.toolName || part.name || 'Unknown Tool') === 'ApplyPatch'
? 'Edit' : (part.toolName || part.name || 'Unknown Tool');
const toolId = part.toolCallId || part.id || `tool_${i}_${partIdx}`;
messages.push(createNormalizedMessage({
id: `${baseId}_${partIdx}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName,
toolInput: part.args || part.input,
toolId,
}));
toolUseMap.set(toolId, messages[messages.length - 1]);
}
}
} else if (typeof content.content === 'string' && content.content.trim()) {
messages.push(createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role,
content: content.content,
sequence: blob.sequence,
rowid: blob.rowid,
}));
}
} catch (error) {
console.warn('Error normalizing cursor blob:', error);
}
}
// Attach tool results to tool_use messages
for (const msg of messages) {
if (msg.kind === 'tool_result' && msg.toolId && toolUseMap.has(msg.toolId)) {
const toolUse = toolUseMap.get(msg.toolId);
toolUse.toolResult = {
content: msg.content,
isError: msg.isError,
};
}
}
// Sort by sequence/rowid
messages.sort((a, b) => {
if (a.sequence !== undefined && b.sequence !== undefined) return a.sequence - b.sequence;
if (a.rowid !== undefined && b.rowid !== undefined) return a.rowid - b.rowid;
return new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime();
});
return messages;
},
};

View File

@@ -0,0 +1,186 @@
/**
* Gemini provider adapter.
*
* Normalizes Gemini CLI session history into NormalizedMessage format.
* @module adapters/gemini
*/
import sessionManager from '../../sessionManager.js';
import { getGeminiCliSessionMessages } from '../../projects.js';
import { createNormalizedMessage, generateMessageId } from '../types.js';
const PROVIDER = 'gemini';
/**
* Normalize a realtime NDJSON event from Gemini CLI into NormalizedMessage(s).
* Handles: message (delta/final), tool_use, tool_result, result, error.
* @param {object} raw - A parsed NDJSON event
* @param {string} sessionId
* @returns {import('../types.js').NormalizedMessage[]}
*/
export function normalizeMessage(raw, sessionId) {
const ts = raw.timestamp || new Date().toISOString();
const baseId = raw.uuid || generateMessageId('gemini');
if (raw.type === 'message' && raw.role === 'assistant') {
const content = raw.content || '';
const msgs = [];
if (content) {
msgs.push(createNormalizedMessage({ id: baseId, sessionId, timestamp: ts, provider: PROVIDER, kind: 'stream_delta', content }));
}
// If not a delta, also send stream_end
if (raw.delta !== true) {
msgs.push(createNormalizedMessage({ sessionId, timestamp: ts, provider: PROVIDER, kind: 'stream_end' }));
}
return msgs;
}
if (raw.type === 'tool_use') {
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'tool_use', toolName: raw.tool_name, toolInput: raw.parameters || {},
toolId: raw.tool_id || baseId,
})];
}
if (raw.type === 'tool_result') {
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'tool_result', toolId: raw.tool_id || '',
content: raw.output === undefined ? '' : String(raw.output),
isError: raw.status === 'error',
})];
}
if (raw.type === 'result') {
const msgs = [createNormalizedMessage({ sessionId, timestamp: ts, provider: PROVIDER, kind: 'stream_end' })];
if (raw.stats?.total_tokens) {
msgs.push(createNormalizedMessage({
sessionId, timestamp: ts, provider: PROVIDER,
kind: 'status', text: 'Complete', tokens: raw.stats.total_tokens, canInterrupt: false,
}));
}
return msgs;
}
if (raw.type === 'error') {
return [createNormalizedMessage({
id: baseId, sessionId, timestamp: ts, provider: PROVIDER,
kind: 'error', content: raw.error || raw.message || 'Unknown Gemini streaming error',
})];
}
return [];
}
/**
* @type {import('../types.js').ProviderAdapter}
*/
export const geminiAdapter = {
normalizeMessage,
/**
* Fetch session history for Gemini.
* First tries in-memory session manager, then falls back to CLI sessions on disk.
*/
async fetchHistory(sessionId, opts = {}) {
let rawMessages;
try {
rawMessages = sessionManager.getSessionMessages(sessionId);
// Fallback to Gemini CLI sessions on disk
if (rawMessages.length === 0) {
rawMessages = await getGeminiCliSessionMessages(sessionId);
}
} catch (error) {
console.warn(`[GeminiAdapter] Failed to load session ${sessionId}:`, error.message);
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
}
const normalized = [];
for (let i = 0; i < rawMessages.length; i++) {
const raw = rawMessages[i];
const ts = raw.timestamp || new Date().toISOString();
const baseId = raw.uuid || generateMessageId('gemini');
// sessionManager format: { type: 'message', message: { role, content }, timestamp }
// CLI format: { role: 'user'|'gemini'|'assistant', content: string|array }
const role = raw.message?.role || raw.role;
const content = raw.message?.content || raw.content;
if (!role || !content) continue;
const normalizedRole = (role === 'user') ? 'user' : 'assistant';
if (Array.isArray(content)) {
for (let partIdx = 0; partIdx < content.length; partIdx++) {
const part = content[partIdx];
if (part.type === 'text' && part.text) {
normalized.push(createNormalizedMessage({
id: `${baseId}_${partIdx}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: normalizedRole,
content: part.text,
}));
} else if (part.type === 'tool_use') {
normalized.push(createNormalizedMessage({
id: `${baseId}_${partIdx}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: part.name,
toolInput: part.input,
toolId: part.id || generateMessageId('gemini_tool'),
}));
} else if (part.type === 'tool_result') {
normalized.push(createNormalizedMessage({
id: `${baseId}_${partIdx}`,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_result',
toolId: part.tool_use_id || '',
content: part.content === undefined ? '' : String(part.content),
isError: Boolean(part.is_error),
}));
}
}
} else if (typeof content === 'string' && content.trim()) {
normalized.push(createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: normalizedRole,
content,
}));
}
}
// Attach tool results to tool_use messages
const toolResultMap = new Map();
for (const msg of normalized) {
if (msg.kind === 'tool_result' && msg.toolId) {
toolResultMap.set(msg.toolId, msg);
}
}
for (const msg of normalized) {
if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) {
const tr = toolResultMap.get(msg.toolId);
msg.toolResult = { content: tr.content, isError: tr.isError };
}
}
return {
messages: normalized,
total: normalized.length,
hasMore: false,
offset: 0,
limit: null,
};
},
};

View File

@@ -0,0 +1,44 @@
/**
* Provider Registry
*
* Centralizes provider adapter lookup. All code that needs a provider adapter
* should go through this registry instead of importing individual adapters directly.
*
* @module providers/registry
*/
import { claudeAdapter } from './claude/adapter.js';
import { cursorAdapter } from './cursor/adapter.js';
import { codexAdapter } from './codex/adapter.js';
import { geminiAdapter } from './gemini/adapter.js';
/**
* @typedef {import('./types.js').ProviderAdapter} ProviderAdapter
* @typedef {import('./types.js').SessionProvider} SessionProvider
*/
/** @type {Map<string, ProviderAdapter>} */
const providers = new Map();
// Register built-in providers
providers.set('claude', claudeAdapter);
providers.set('cursor', cursorAdapter);
providers.set('codex', codexAdapter);
providers.set('gemini', geminiAdapter);
/**
* Get a provider adapter by name.
* @param {string} name - Provider name (e.g., 'claude', 'cursor', 'codex', 'gemini')
* @returns {ProviderAdapter | undefined}
*/
export function getProvider(name) {
return providers.get(name);
}
/**
* Get all registered provider names.
* @returns {string[]}
*/
export function getAllProviders() {
return Array.from(providers.keys());
}

119
server/providers/types.js Normal file
View File

@@ -0,0 +1,119 @@
/**
* Provider Types & Interface
*
* Defines the normalized message format and the provider adapter interface.
* All providers normalize their native formats into NormalizedMessage
* before sending over REST or WebSocket.
*
* @module providers/types
*/
// ─── Session Provider ────────────────────────────────────────────────────────
/**
* @typedef {'claude' | 'cursor' | 'codex' | 'gemini'} SessionProvider
*/
// ─── Message Kind ────────────────────────────────────────────────────────────
/**
* @typedef {'text' | 'tool_use' | 'tool_result' | 'thinking' | 'stream_delta' | 'stream_end'
* | 'error' | 'complete' | 'status' | 'permission_request' | 'permission_cancelled'
* | 'session_created' | 'interactive_prompt' | 'task_notification'} MessageKind
*/
// ─── NormalizedMessage ───────────────────────────────────────────────────────
/**
* @typedef {Object} NormalizedMessage
* @property {string} id - Unique message id (for dedup between server + realtime)
* @property {string} sessionId
* @property {string} timestamp - ISO 8601
* @property {SessionProvider} provider
* @property {MessageKind} kind
*
* Additional fields depending on kind:
* - text: role ('user'|'assistant'), content, images?
* - tool_use: toolName, toolInput, toolId
* - tool_result: toolId, content, isError
* - thinking: content
* - stream_delta: content
* - stream_end: (no extra fields)
* - error: content
* - complete: (no extra fields)
* - status: text, tokens?, canInterrupt?
* - permission_request: requestId, toolName, input, context?
* - permission_cancelled: requestId
* - session_created: newSessionId
* - interactive_prompt: content
* - task_notification: status, summary
*/
// ─── Fetch History ───────────────────────────────────────────────────────────
/**
* @typedef {Object} FetchHistoryOptions
* @property {string} [projectName] - Project name (required for Claude)
* @property {string} [projectPath] - Absolute project path (required for Cursor cwdId hash)
* @property {number|null} [limit] - Page size (null = all messages)
* @property {number} [offset] - Pagination offset (default: 0)
*/
/**
* @typedef {Object} FetchHistoryResult
* @property {NormalizedMessage[]} messages - Normalized messages
* @property {number} total - Total number of messages in the session
* @property {boolean} hasMore - Whether more messages exist before the current page
* @property {number} offset - Current offset
* @property {number|null} limit - Page size used
* @property {object} [tokenUsage] - Token usage data (provider-specific)
*/
// ─── Provider Adapter Interface ──────────────────────────────────────────────
/**
* Every provider adapter MUST implement this interface.
*
* @typedef {Object} ProviderAdapter
*
* @property {(sessionId: string, opts?: FetchHistoryOptions) => Promise<FetchHistoryResult>} fetchHistory
* Read persisted session messages from disk/database and return them as NormalizedMessage[].
* The backend calls this from the unified GET /api/sessions/:id/messages endpoint.
*
* Provider implementations:
* - Claude: reads ~/.claude/projects/{projectName}/*.jsonl
* - Cursor: reads from SQLite store.db (via normalizeCursorBlobs helper)
* - Codex: reads ~/.codex/sessions/*.jsonl
* - Gemini: reads from in-memory sessionManager or ~/.gemini/tmp/ JSON files
*
* @property {(raw: any, sessionId: string) => NormalizedMessage[]} normalizeMessage
* Normalize a provider-specific event (JSONL entry or live SDK event) into NormalizedMessage[].
* Used by provider files to convert both history and realtime events.
*/
// ─── Runtime Helpers ─────────────────────────────────────────────────────────
/**
* Generate a unique message ID.
* Uses crypto.randomUUID() to avoid collisions across server restarts and workers.
* @param {string} [prefix='msg'] - Optional prefix
* @returns {string}
*/
export function generateMessageId(prefix = 'msg') {
return `${prefix}_${crypto.randomUUID()}`;
}
/**
* Create a NormalizedMessage with common fields pre-filled.
* @param {Partial<NormalizedMessage> & {kind: MessageKind, provider: SessionProvider}} fields
* @returns {NormalizedMessage}
*/
export function createNormalizedMessage(fields) {
return {
...fields,
id: fields.id || generateMessageId(fields.kind),
sessionId: fields.sessionId || '',
timestamp: fields.timestamp || new Date().toISOString(),
provider: fields.provider,
};
}

29
server/providers/utils.js Normal file
View File

@@ -0,0 +1,29 @@
/**
* Shared provider utilities.
*
* @module providers/utils
*/
/**
* Prefixes that indicate internal/system content which should be hidden from the UI.
* @type {readonly string[]}
*/
export const INTERNAL_CONTENT_PREFIXES = Object.freeze([
'<command-name>',
'<command-message>',
'<command-args>',
'<local-command-stdout>',
'<system-reminder>',
'Caveat:',
'This session is being continued from a previous',
'[Request interrupted',
]);
/**
* Check if user text content is internal/system that should be skipped.
* @param {string} content
* @returns {boolean}
*/
export function isInternalContent(content) {
return INTERNAL_CONTENT_PREFIXES.some(prefix => content.startsWith(prefix));
}

View File

@@ -4,7 +4,7 @@ import { promises as fs } from 'fs';
import path from 'path';
import os from 'os';
import TOML from '@iarna/toml';
import { getCodexSessions, getCodexSessionMessages, deleteCodexSession } from '../projects.js';
import { getCodexSessions, deleteCodexSession } from '../projects.js';
import { applyCustomSessionNames, sessionNamesDb } from '../database/db.js';
const router = express.Router();
@@ -68,24 +68,6 @@ router.get('/sessions', async (req, res) => {
}
});
router.get('/sessions/:sessionId/messages', async (req, res) => {
try {
const { sessionId } = req.params;
const { limit, offset } = req.query;
const result = await getCodexSessionMessages(
sessionId,
limit ? parseInt(limit, 10) : null,
offset ? parseInt(offset, 10) : 0
);
res.json({ success: true, ...result });
} catch (error) {
console.error('Error fetching Codex session messages:', error);
res.status(500).json({ success: false, error: error.message });
}
});
router.delete('/sessions/:sessionId', async (req, res) => {
try {
const { sessionId } = req.params;

View File

@@ -1,39 +1,9 @@
import express from 'express';
import sessionManager from '../sessionManager.js';
import { sessionNamesDb } from '../database/db.js';
import { getGeminiCliSessionMessages } from '../projects.js';
const router = express.Router();
router.get('/sessions/:sessionId/messages', async (req, res) => {
try {
const { sessionId } = req.params;
if (!sessionId || typeof sessionId !== 'string' || !/^[a-zA-Z0-9_.-]{1,100}$/.test(sessionId)) {
return res.status(400).json({ success: false, error: 'Invalid session ID format' });
}
let messages = sessionManager.getSessionMessages(sessionId);
// Fallback to Gemini CLI sessions on disk
if (messages.length === 0) {
messages = await getGeminiCliSessionMessages(sessionId);
}
res.json({
success: true,
messages: messages,
total: messages.length,
hasMore: false,
offset: 0,
limit: messages.length
});
} catch (error) {
console.error('Error fetching Gemini session messages:', error);
res.status(500).json({ success: false, error: error.message });
}
});
router.delete('/sessions/:sessionId', async (req, res) => {
try {
const { sessionId } = req.params;

61
server/routes/messages.js Normal file
View File

@@ -0,0 +1,61 @@
/**
* Unified messages endpoint.
*
* GET /api/sessions/:sessionId/messages?provider=claude&projectName=foo&limit=50&offset=0
*
* Replaces the four provider-specific session message endpoints with a single route
* that delegates to the appropriate adapter via the provider registry.
*
* @module routes/messages
*/
import express from 'express';
import { getProvider, getAllProviders } from '../providers/registry.js';
const router = express.Router();
/**
* GET /api/sessions/:sessionId/messages
*
* Auth: authenticateToken applied at mount level in index.js
*
* Query params:
* provider - 'claude' | 'cursor' | 'codex' | 'gemini' (default: 'claude')
* projectName - required for claude provider
* projectPath - required for cursor provider (absolute path used for cwdId hash)
* limit - page size (omit or null for all)
* offset - pagination offset (default: 0)
*/
router.get('/:sessionId/messages', async (req, res) => {
try {
const { sessionId } = req.params;
const provider = req.query.provider || 'claude';
const projectName = req.query.projectName || '';
const projectPath = req.query.projectPath || '';
const limitParam = req.query.limit;
const limit = limitParam !== undefined && limitParam !== null && limitParam !== ''
? parseInt(limitParam, 10)
: null;
const offset = parseInt(req.query.offset || '0', 10);
const adapter = getProvider(provider);
if (!adapter) {
const available = getAllProviders().join(', ');
return res.status(400).json({ error: `Unknown provider: ${provider}. Available: ${available}` });
}
const result = await adapter.fetchHistory(sessionId, {
projectName,
projectPath,
limit,
offset,
});
return res.json(result);
} catch (error) {
console.error('Error fetching unified messages:', error);
return res.status(500).json({ error: 'Failed to fetch messages' });
}
});
export default router;