From 78324290117c1a9bcfe87818a90893fe19df0223 Mon Sep 17 00:00:00 2001 From: Haileyesus Date: Fri, 17 Apr 2026 14:22:29 +0300 Subject: [PATCH] refactor(providers): centralize message handling in provider module Move provider-specific normalizeMessage and fetchHistory logic out of the legacy server/providers adapters and into the refactored provider classes so callers can depend on the main provider contract instead of parallel adapter plumbing. Add a providers service to resolve concrete providers through the registry and delegate message normalization/history loading from realtime handlers and the unified messages route. Add shared TypeScript message/history types and normalized message helpers so provider implementations and callers use the same contract. Remove the old adapter registry/files now that Claude, Codex, Cursor, and Gemini implement the required behavior directly. --- eslint.config.js | 10 + server/claude-sdk.js | 6 +- server/cursor-cli.js | 8 +- server/gemini-cli.js | 2 +- server/gemini-response-handler.js | 4 +- server/index.js | 5 +- .../providers/list/claude/claude.provider.ts | 308 ++++++++++++++ .../providers/list/codex/codex.provider.ts | 322 +++++++++++++++ .../providers/list/cursor/cursor.provider.ts | 390 ++++++++++++++++++ .../providers/list/gemini/gemini.provider.ts | 222 ++++++++++ server/modules/providers/provider.registry.ts | 2 +- .../providers/services/providers.service.ts | 36 ++ .../shared/base/abstract.provider.ts | 19 +- server/openai-codex.js | 6 +- server/providers/claude/adapter.js | 278 ------------- server/providers/codex/adapter.js | 248 ----------- server/providers/cursor/adapter.js | 353 ---------------- server/providers/gemini/adapter.js | 186 --------- server/providers/registry.js | 44 -- server/providers/types.js | 119 ------ server/providers/utils.js | 29 -- server/routes/messages.js | 12 +- server/shared/interfaces.ts | 14 +- server/shared/types.ts | 87 ++++ server/shared/utils.ts | 44 +- 25 files changed, 1468 insertions(+), 1286 deletions(-) create mode 100644 server/modules/providers/services/providers.service.ts delete mode 100644 server/providers/claude/adapter.js delete mode 100644 server/providers/codex/adapter.js delete mode 100644 server/providers/cursor/adapter.js delete mode 100644 server/providers/gemini/adapter.js delete mode 100644 server/providers/registry.js delete mode 100644 server/providers/types.js delete mode 100644 server/providers/utils.js diff --git a/eslint.config.js b/eslint.config.js index 6d1eac08..742f0c2b 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -160,6 +160,16 @@ export default tseslint.config( pattern: ["server/shared/utils.{js,ts}"], // classify the shared utils file so modules can depend on it explicitly mode: "file", }, + { + type: "backend-legacy-runtime", // legacy runtime persistence modules used while providers migrate into server/modules + pattern: [ + "server/projects.js", + "server/sessionManager.js", + "server/database/*.{js,ts}", + "server/utils/runtime-paths.js", + ], // provider history loading still resolves session data through these legacy runtime/database files + mode: "file", + }, { type: "backend-module", // logical element name used by boundaries rules below pattern: "server/modules/*", // each direct folder in server/modules is treated as one module boundary diff --git a/server/claude-sdk.js b/server/claude-sdk.js index 918a7bd6..d1e62090 100644 --- a/server/claude-sdk.js +++ b/server/claude-sdk.js @@ -24,8 +24,8 @@ import { notifyRunStopped, notifyUserIfEnabled } from './services/notification-orchestrator.js'; -import { claudeAdapter } from './providers/claude/adapter.js'; -import { createNormalizedMessage } from './providers/types.js'; +import { providersService } from './modules/providers/services/providers.service.js'; +import { createNormalizedMessage } from './shared/utils.js'; const activeSessions = new Map(); const pendingToolApprovals = new Map(); @@ -649,7 +649,7 @@ async function queryClaudeSDK(command, options = {}, ws) { const sid = capturedSessionId || sessionId || null; // Use adapter to normalize SDK events into NormalizedMessage[] - const normalized = claudeAdapter.normalizeMessage(transformedMessage, sid); + const normalized = providersService.normalizeMessage('claude', transformedMessage, sid); for (const msg of normalized) { // Preserve parentToolUseId from SDK wrapper for subagent tool grouping if (transformedMessage.parentToolUseId && !msg.parentToolUseId) { diff --git a/server/cursor-cli.js b/server/cursor-cli.js index aedd7e0b..76f643e6 100644 --- a/server/cursor-cli.js +++ b/server/cursor-cli.js @@ -1,8 +1,8 @@ import { spawn } from 'child_process'; import crossSpawn from 'cross-spawn'; import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js'; -import { cursorAdapter } from './providers/cursor/adapter.js'; -import { createNormalizedMessage } from './providers/types.js'; +import { providersService } from './modules/providers/services/providers.service.js'; +import { createNormalizedMessage } from './shared/utils.js'; // Use cross-spawn on Windows for better command execution const spawnFunction = process.platform === 'win32' ? crossSpawn : spawn; @@ -189,7 +189,7 @@ async function spawnCursor(command, options = {}, ws) { case 'assistant': // Accumulate assistant message chunks if (response.message && response.message.content && response.message.content.length > 0) { - const normalized = cursorAdapter.normalizeMessage(response, capturedSessionId || sessionId || null); + const normalized = providersService.normalizeMessage('cursor', response, capturedSessionId || sessionId || null); for (const msg of normalized) ws.send(msg); } break; @@ -219,7 +219,7 @@ async function spawnCursor(command, options = {}, ws) { } // If not JSON, send as stream delta via adapter - const normalized = cursorAdapter.normalizeMessage(line, capturedSessionId || sessionId || null); + const normalized = providersService.normalizeMessage('cursor', line, capturedSessionId || sessionId || null); for (const msg of normalized) ws.send(msg); } }; diff --git a/server/gemini-cli.js b/server/gemini-cli.js index 86472707..673cde52 100644 --- a/server/gemini-cli.js +++ b/server/gemini-cli.js @@ -9,7 +9,7 @@ import os from 'os'; import sessionManager from './sessionManager.js'; import GeminiResponseHandler from './gemini-response-handler.js'; import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js'; -import { createNormalizedMessage } from './providers/types.js'; +import { createNormalizedMessage } from './shared/utils.js'; let activeGeminiProcesses = new Map(); // Track active processes by session ID diff --git a/server/gemini-response-handler.js b/server/gemini-response-handler.js index 9da1f5cc..b5310142 100644 --- a/server/gemini-response-handler.js +++ b/server/gemini-response-handler.js @@ -1,5 +1,5 @@ // Gemini Response Handler - JSON Stream processing -import { geminiAdapter } from './providers/gemini/adapter.js'; +import { providersService } from './modules/providers/services/providers.service.js'; class GeminiResponseHandler { constructor(ws, options = {}) { @@ -56,7 +56,7 @@ class GeminiResponseHandler { } // Normalize via adapter and send all resulting messages - const normalized = geminiAdapter.normalizeMessage(event, sid); + const normalized = providersService.normalizeMessage('gemini', event, sid); for (const msg of normalized) { this.ws.send(msg); } diff --git a/server/index.js b/server/index.js index 28b9e184..1ea89612 100755 --- a/server/index.js +++ b/server/index.js @@ -5,7 +5,7 @@ import fs from 'fs'; import path from 'path'; import { findAppRoot, getModuleDir } from './utils/runtime-paths.js'; -import { AppError } from '@/shared/utils.js'; +import { AppError, createNormalizedMessage } from '@/shared/utils.js'; const __dirname = getModuleDir(import.meta.url); @@ -70,7 +70,6 @@ import geminiRoutes from './routes/gemini.js'; import pluginsRoutes from './routes/plugins.js'; import messagesRoutes from './routes/messages.js'; import providerRoutes from './modules/providers/provider.routes.js'; -import { createNormalizedMessage } from './providers/types.js'; import { startEnabledPluginServers, stopAllPlugins, getPluginPort } from './utils/plugin-process-manager.js'; import { initializeDatabase, sessionNamesDb, applyCustomSessionNames } from './database/db.js'; import { configureWebPush } from './services/vapid-keys.js'; @@ -1461,7 +1460,7 @@ wss.on('connection', (ws, request) => { /** * WebSocket Writer - Wrapper for WebSocket to match SSEStreamWriter interface * - * Provider files use `createNormalizedMessage()` from `providers/types.js` and + * Provider files use `createNormalizedMessage()` from `shared/utils.js` and * adapter `normalizeMessage()` to produce unified NormalizedMessage events. * The writer simply serialises and sends. */ diff --git a/server/modules/providers/list/claude/claude.provider.ts b/server/modules/providers/list/claude/claude.provider.ts index b620ad1f..0b971a93 100644 --- a/server/modules/providers/list/claude/claude.provider.ts +++ b/server/modules/providers/list/claude/claude.provider.ts @@ -1,5 +1,57 @@ +import { getSessionMessages } from '@/projects.js'; import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js'; import { ClaudeMcpProvider } from '@/modules/providers/list/claude/claude-mcp.provider.js'; +import type { FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js'; +import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js'; + +const PROVIDER = 'claude'; + +type RawProviderMessage = Record; + +type ClaudeToolResult = { + content: unknown; + isError: boolean; + subagentTools?: unknown; + toolUseResult?: unknown; +}; + +type ClaudeHistoryResult = + | RawProviderMessage[] + | { + messages?: RawProviderMessage[]; + total?: number; + hasMore?: boolean; + }; + +const loadClaudeSessionMessages = getSessionMessages as unknown as ( + projectName: string, + sessionId: string, + limit: number | null, + offset: number, +) => Promise; + +/** + * Claude writes internal command and system reminder entries into history. + * Those are useful for the CLI but should not appear in the user-facing chat. + */ +const INTERNAL_CONTENT_PREFIXES = [ + '', + '', + '', + '', + '', + 'Caveat:', + 'This session is being continued from a previous', + '[Request interrupted', +] as const; + +function isInternalContent(content: string): boolean { + return INTERNAL_CONTENT_PREFIXES.some((prefix) => content.startsWith(prefix)); +} + +function readRawProviderMessage(raw: unknown): RawProviderMessage | null { + return readObjectRecord(raw) as RawProviderMessage | null; +} export class ClaudeProvider extends AbstractProvider { readonly mcp = new ClaudeMcpProvider(); @@ -7,4 +59,260 @@ export class ClaudeProvider extends AbstractProvider { constructor() { super('claude'); } + + /** + * Normalizes one Claude JSONL entry or live SDK stream event into the shared + * message shape consumed by REST and WebSocket clients. + */ + normalizeMessage(rawMessage: unknown, sessionId: string | null): NormalizedMessage[] { + const raw = readRawProviderMessage(rawMessage); + if (!raw) { + return []; + } + + if (raw.type === 'content_block_delta' && raw.delta?.text) { + return [createNormalizedMessage({ kind: 'stream_delta', content: raw.delta.text, sessionId, provider: PROVIDER })]; + } + if (raw.type === 'content_block_stop') { + return [createNormalizedMessage({ kind: 'stream_end', sessionId, provider: PROVIDER })]; + } + + const messages: NormalizedMessage[] = []; + const ts = raw.timestamp || new Date().toISOString(); + const baseId = raw.uuid || generateMessageId('claude'); + + if (raw.message?.role === 'user' && raw.message?.content) { + if (Array.isArray(raw.message.content)) { + for (const part of raw.message.content) { + if (part.type === 'tool_result') { + messages.push(createNormalizedMessage({ + id: `${baseId}_tr_${part.tool_use_id}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_result', + toolId: part.tool_use_id, + content: typeof part.content === 'string' ? part.content : JSON.stringify(part.content), + isError: Boolean(part.is_error), + subagentTools: raw.subagentTools, + toolUseResult: raw.toolUseResult, + })); + } else if (part.type === 'text') { + const text = part.text || ''; + if (text && !isInternalContent(text)) { + messages.push(createNormalizedMessage({ + id: `${baseId}_text`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: 'user', + content: text, + })); + } + } + } + + if (messages.length === 0) { + const textParts = raw.message.content + .filter((part: RawProviderMessage) => part.type === 'text') + .map((part: RawProviderMessage) => part.text) + .filter(Boolean) + .join('\n'); + if (textParts && !isInternalContent(textParts)) { + messages.push(createNormalizedMessage({ + id: `${baseId}_text`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: 'user', + content: textParts, + })); + } + } + } else if (typeof raw.message.content === 'string') { + const text = raw.message.content; + if (text && !isInternalContent(text)) { + messages.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: 'user', + content: text, + })); + } + } + return messages; + } + + if (raw.type === 'thinking' && raw.message?.content) { + messages.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'thinking', + content: raw.message.content, + })); + return messages; + } + + if (raw.type === 'tool_use' && raw.toolName) { + messages.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: raw.toolName, + toolInput: raw.toolInput, + toolId: raw.toolCallId || baseId, + })); + return messages; + } + + if (raw.type === 'tool_result') { + messages.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_result', + toolId: raw.toolCallId || '', + content: raw.output || '', + isError: false, + })); + return messages; + } + + if (raw.message?.role === 'assistant' && raw.message?.content) { + if (Array.isArray(raw.message.content)) { + let partIndex = 0; + for (const part of raw.message.content) { + if (part.type === 'text' && part.text) { + messages.push(createNormalizedMessage({ + id: `${baseId}_${partIndex}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: 'assistant', + content: part.text, + })); + } else if (part.type === 'tool_use') { + messages.push(createNormalizedMessage({ + id: `${baseId}_${partIndex}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: part.name, + toolInput: part.input, + toolId: part.id, + })); + } else if (part.type === 'thinking' && part.thinking) { + messages.push(createNormalizedMessage({ + id: `${baseId}_${partIndex}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'thinking', + content: part.thinking, + })); + } + partIndex++; + } + } else if (typeof raw.message.content === 'string') { + messages.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: 'assistant', + content: raw.message.content, + })); + } + return messages; + } + + return messages; + } + + /** + * Loads Claude JSONL history for a project/session and returns normalized + * messages, preserving the existing pagination behavior from projects.js. + */ + async fetchHistory( + sessionId: string, + options: FetchHistoryOptions = {}, + ): Promise { + const { projectName, limit = null, offset = 0 } = options; + if (!projectName) { + return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; + } + + let result: ClaudeHistoryResult; + try { + result = await loadClaudeSessionMessages(projectName, sessionId, limit, offset); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`[ClaudeProvider] Failed to load session ${sessionId}:`, message); + return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; + } + + const rawMessages = Array.isArray(result) ? result : (result.messages || []); + const total = Array.isArray(result) ? rawMessages.length : (result.total || 0); + const hasMore = Array.isArray(result) ? false : Boolean(result.hasMore); + + const toolResultMap = new Map(); + for (const raw of rawMessages) { + if (raw.message?.role === 'user' && Array.isArray(raw.message?.content)) { + for (const part of raw.message.content) { + if (part.type === 'tool_result' && part.tool_use_id) { + toolResultMap.set(part.tool_use_id, { + content: part.content, + isError: Boolean(part.is_error), + subagentTools: raw.subagentTools, + toolUseResult: raw.toolUseResult, + }); + } + } + } + } + + const normalized: NormalizedMessage[] = []; + for (const raw of rawMessages) { + normalized.push(...this.normalizeMessage(raw, sessionId)); + } + + for (const msg of normalized) { + if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) { + const toolResult = toolResultMap.get(msg.toolId); + if (!toolResult) { + continue; + } + + msg.toolResult = { + content: typeof toolResult.content === 'string' + ? toolResult.content + : JSON.stringify(toolResult.content), + isError: toolResult.isError, + toolUseResult: toolResult.toolUseResult, + }; + msg.subagentTools = toolResult.subagentTools; + } + } + + return { + messages: normalized, + total, + hasMore, + offset, + limit, + }; + } } diff --git a/server/modules/providers/list/codex/codex.provider.ts b/server/modules/providers/list/codex/codex.provider.ts index 3abfa8d9..fad9e1bb 100644 --- a/server/modules/providers/list/codex/codex.provider.ts +++ b/server/modules/providers/list/codex/codex.provider.ts @@ -1,5 +1,31 @@ +import { getCodexSessionMessages } from '@/projects.js'; import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js'; import { CodexMcpProvider } from '@/modules/providers/list/codex/codex-mcp.provider.js'; +import type { FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js'; +import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js'; + +const PROVIDER = 'codex'; + +type RawProviderMessage = Record; + +type CodexHistoryResult = + | RawProviderMessage[] + | { + messages?: RawProviderMessage[]; + total?: number; + hasMore?: boolean; + tokenUsage?: unknown; + }; + +const loadCodexSessionMessages = getCodexSessionMessages as unknown as ( + sessionId: string, + limit: number | null, + offset: number, +) => Promise; + +function readRawProviderMessage(raw: unknown): RawProviderMessage | null { + return readObjectRecord(raw) as RawProviderMessage | null; +} export class CodexProvider extends AbstractProvider { readonly mcp = new CodexMcpProvider(); @@ -7,4 +33,300 @@ export class CodexProvider extends AbstractProvider { constructor() { super('codex'); } + + /** + * Normalizes a persisted Codex JSONL entry. + * + * Live Codex SDK events are transformed before they reach normalizeMessage(), + * while history entries already use a compact message/tool shape from projects.js. + */ + private normalizeHistoryEntry(raw: RawProviderMessage, sessionId: string | null): NormalizedMessage[] { + const ts = raw.timestamp || new Date().toISOString(); + const baseId = raw.uuid || generateMessageId('codex'); + + if (raw.message?.role === 'user') { + const content = typeof raw.message.content === 'string' + ? raw.message.content + : Array.isArray(raw.message.content) + ? raw.message.content + .map((part: string | RawProviderMessage) => typeof part === 'string' ? part : part?.text || '') + .filter(Boolean) + .join('\n') + : String(raw.message.content || ''); + if (!content.trim()) { + return []; + } + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: 'user', + content, + })]; + } + + if (raw.message?.role === 'assistant') { + const content = typeof raw.message.content === 'string' + ? raw.message.content + : Array.isArray(raw.message.content) + ? raw.message.content + .map((part: string | RawProviderMessage) => typeof part === 'string' ? part : part?.text || '') + .filter(Boolean) + .join('\n') + : ''; + if (!content.trim()) { + return []; + } + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: 'assistant', + content, + })]; + } + + if (raw.type === 'thinking' || raw.isReasoning) { + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'thinking', + content: raw.message?.content || '', + })]; + } + + if (raw.type === 'tool_use' || raw.toolName) { + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: raw.toolName || 'Unknown', + toolInput: raw.toolInput, + toolId: raw.toolCallId || baseId, + })]; + } + + if (raw.type === 'tool_result') { + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_result', + toolId: raw.toolCallId || '', + content: raw.output || '', + isError: Boolean(raw.isError), + })]; + } + + return []; + } + + /** + * Normalizes either a Codex history entry or a transformed live SDK event. + */ + normalizeMessage(rawMessage: unknown, sessionId: string | null): NormalizedMessage[] { + const raw = readRawProviderMessage(rawMessage); + if (!raw) { + return []; + } + + if (raw.message?.role) { + return this.normalizeHistoryEntry(raw, sessionId); + } + + const ts = raw.timestamp || new Date().toISOString(); + const baseId = raw.uuid || generateMessageId('codex'); + + if (raw.type === 'item') { + switch (raw.itemType) { + case 'agent_message': + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: 'assistant', + content: raw.message?.content || '', + })]; + case 'reasoning': + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'thinking', + content: raw.message?.content || '', + })]; + case 'command_execution': + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: 'Bash', + toolInput: { command: raw.command }, + toolId: baseId, + output: raw.output, + exitCode: raw.exitCode, + status: raw.status, + })]; + case 'file_change': + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: 'FileChanges', + toolInput: raw.changes, + toolId: baseId, + status: raw.status, + })]; + case 'mcp_tool_call': + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: raw.tool || 'MCP', + toolInput: raw.arguments, + toolId: baseId, + server: raw.server, + result: raw.result, + error: raw.error, + status: raw.status, + })]; + case 'web_search': + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: 'WebSearch', + toolInput: { query: raw.query }, + toolId: baseId, + })]; + case 'todo_list': + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: 'TodoList', + toolInput: { items: raw.items }, + toolId: baseId, + })]; + case 'error': + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'error', + content: raw.message?.content || 'Unknown error', + })]; + default: + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: raw.itemType || 'Unknown', + toolInput: raw.item || raw, + toolId: baseId, + })]; + } + } + + if (raw.type === 'turn_complete') { + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'complete', + })]; + } + if (raw.type === 'turn_failed') { + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'error', + content: raw.error?.message || 'Turn failed', + })]; + } + + return []; + } + + /** + * Loads Codex JSONL history and keeps token usage metadata when projects.js + * provides it. + */ + async fetchHistory( + sessionId: string, + options: FetchHistoryOptions = {}, + ): Promise { + const { limit = null, offset = 0 } = options; + + let result: CodexHistoryResult; + try { + result = await loadCodexSessionMessages(sessionId, limit, offset); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`[CodexProvider] Failed to load session ${sessionId}:`, message); + return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; + } + + const rawMessages = Array.isArray(result) ? result : (result.messages || []); + const total = Array.isArray(result) ? rawMessages.length : (result.total || 0); + const hasMore = Array.isArray(result) ? false : Boolean(result.hasMore); + const tokenUsage = Array.isArray(result) ? undefined : result.tokenUsage; + + const normalized: NormalizedMessage[] = []; + for (const raw of rawMessages) { + normalized.push(...this.normalizeHistoryEntry(raw, sessionId)); + } + + const toolResultMap = new Map(); + for (const msg of normalized) { + if (msg.kind === 'tool_result' && msg.toolId) { + toolResultMap.set(msg.toolId, msg); + } + } + for (const msg of normalized) { + if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) { + const toolResult = toolResultMap.get(msg.toolId); + if (toolResult) { + msg.toolResult = { content: toolResult.content, isError: toolResult.isError }; + } + } + } + + return { + messages: normalized, + total, + hasMore, + offset, + limit, + tokenUsage, + }; + } } diff --git a/server/modules/providers/list/cursor/cursor.provider.ts b/server/modules/providers/list/cursor/cursor.provider.ts index 7fe7163c..254b1e84 100644 --- a/server/modules/providers/list/cursor/cursor.provider.ts +++ b/server/modules/providers/list/cursor/cursor.provider.ts @@ -1,5 +1,36 @@ +import crypto from 'node:crypto'; +import os from 'node:os'; +import path from 'node:path'; + import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js'; import { CursorMcpProvider } from '@/modules/providers/list/cursor/cursor-mcp.provider.js'; +import type { FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js'; +import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js'; + +const PROVIDER = 'cursor'; + +type RawProviderMessage = Record; + +type CursorDbBlob = { + rowid: number; + id: string; + data?: Buffer; +}; + +type CursorJsonBlob = CursorDbBlob & { + parsed: RawProviderMessage; +}; + +type CursorMessageBlob = { + id: string; + sequence: number; + rowid: number; + content: RawProviderMessage; +}; + +function readRawProviderMessage(raw: unknown): RawProviderMessage | null { + return readObjectRecord(raw) as RawProviderMessage | null; +} export class CursorProvider extends AbstractProvider { readonly mcp = new CursorMcpProvider(); @@ -7,4 +38,363 @@ export class CursorProvider extends AbstractProvider { constructor() { super('cursor'); } + + /** + * Loads Cursor's SQLite blob DAG and returns message blobs in conversation + * order. Cursor history is stored as content-addressed blobs rather than JSONL. + */ + private async loadCursorBlobs(sessionId: string, projectPath: string): Promise { + const sqlite3Module = await import('sqlite3'); + const sqlite3 = sqlite3Module.default; + const { open } = await import('sqlite'); + + const cwdId = crypto.createHash('md5').update(projectPath || process.cwd()).digest('hex'); + const storeDbPath = path.join(os.homedir(), '.cursor', 'chats', cwdId, sessionId, 'store.db'); + + const db = await open({ + filename: storeDbPath, + driver: sqlite3.Database, + mode: sqlite3.OPEN_READONLY, + }); + + try { + const allBlobs = await db.all('SELECT rowid, id, data FROM blobs') as CursorDbBlob[]; + + const blobMap = new Map(); + const parentRefs = new Map(); + const childRefs = new Map(); + const jsonBlobs: CursorJsonBlob[] = []; + + for (const blob of allBlobs) { + blobMap.set(blob.id, blob); + + if (blob.data && blob.data[0] === 0x7B) { + try { + const parsed = JSON.parse(blob.data.toString('utf8')) as RawProviderMessage; + jsonBlobs.push({ ...blob, parsed }); + } catch { + // Cursor can include binary or partial blobs; only JSON blobs become messages. + } + } else if (blob.data) { + const parents: string[] = []; + let i = 0; + while (i < blob.data.length - 33) { + if (blob.data[i] === 0x0A && blob.data[i + 1] === 0x20) { + const parentHash = blob.data.slice(i + 2, i + 34).toString('hex'); + if (blobMap.has(parentHash)) { + parents.push(parentHash); + } + i += 34; + } else { + i++; + } + } + if (parents.length > 0) { + parentRefs.set(blob.id, parents); + for (const parentId of parents) { + if (!childRefs.has(parentId)) { + childRefs.set(parentId, []); + } + childRefs.get(parentId)?.push(blob.id); + } + } + } + } + + const visited = new Set(); + const sorted: CursorDbBlob[] = []; + const visit = (nodeId: string): void => { + if (visited.has(nodeId)) { + return; + } + visited.add(nodeId); + for (const parentId of parentRefs.get(nodeId) || []) { + visit(parentId); + } + const blob = blobMap.get(nodeId); + if (blob) { + sorted.push(blob); + } + }; + + for (const blob of allBlobs) { + if (!parentRefs.has(blob.id)) { + visit(blob.id); + } + } + for (const blob of allBlobs) { + visit(blob.id); + } + + const messageOrder = new Map(); + let orderIndex = 0; + for (const blob of sorted) { + if (blob.data && blob.data[0] !== 0x7B) { + for (const jsonBlob of jsonBlobs) { + try { + const idBytes = Buffer.from(jsonBlob.id, 'hex'); + if (blob.data.includes(idBytes) && !messageOrder.has(jsonBlob.id)) { + messageOrder.set(jsonBlob.id, orderIndex++); + } + } catch { + // Ignore malformed blob ids that cannot be decoded as hex. + } + } + } + } + + const sortedJsonBlobs = jsonBlobs.sort((a, b) => { + const aOrder = messageOrder.get(a.id) ?? Number.MAX_SAFE_INTEGER; + const bOrder = messageOrder.get(b.id) ?? Number.MAX_SAFE_INTEGER; + return aOrder !== bOrder ? aOrder - bOrder : a.rowid - b.rowid; + }); + + const messages: CursorMessageBlob[] = []; + for (let idx = 0; idx < sortedJsonBlobs.length; idx++) { + const blob = sortedJsonBlobs[idx]; + const parsed = blob.parsed; + const role = parsed?.role || parsed?.message?.role; + if (role === 'system') { + continue; + } + messages.push({ + id: blob.id, + sequence: idx + 1, + rowid: blob.rowid, + content: parsed, + }); + } + + return messages; + } finally { + await db.close(); + } + } + + /** + * Normalizes live Cursor CLI NDJSON events. Persisted Cursor history is + * normalized from SQLite blobs in fetchHistory(). + */ + normalizeMessage(rawMessage: unknown, sessionId: string | null): NormalizedMessage[] { + const raw = readRawProviderMessage(rawMessage); + if (raw?.type === 'assistant' && raw.message?.content?.[0]?.text) { + return [createNormalizedMessage({ + kind: 'stream_delta', + content: raw.message.content[0].text, + sessionId, + provider: PROVIDER, + })]; + } + + if (typeof rawMessage === 'string' && rawMessage.trim()) { + return [createNormalizedMessage({ + kind: 'stream_delta', + content: rawMessage, + sessionId, + provider: PROVIDER, + })]; + } + + return []; + } + + /** + * Fetches and paginates Cursor session history from its project-scoped store.db. + */ + async fetchHistory( + sessionId: string, + options: FetchHistoryOptions = {}, + ): Promise { + const { projectPath = '', limit = null, offset = 0 } = options; + + try { + const blobs = await this.loadCursorBlobs(sessionId, projectPath); + const allNormalized = this.normalizeCursorBlobs(blobs, sessionId); + + if (limit !== null && limit > 0) { + const start = offset; + const page = allNormalized.slice(start, start + limit); + return { + messages: page, + total: allNormalized.length, + hasMore: start + limit < allNormalized.length, + offset, + limit, + }; + } + + return { + messages: allNormalized, + total: allNormalized.length, + hasMore: false, + offset: 0, + limit: null, + }; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`[CursorProvider] Failed to load session ${sessionId}:`, message); + return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; + } + } + + /** + * Converts Cursor SQLite message blobs into normalized messages and attaches + * matching tool results to their tool_use entries. + */ + private normalizeCursorBlobs(blobs: CursorMessageBlob[], sessionId: string | null): NormalizedMessage[] { + const messages: NormalizedMessage[] = []; + const toolUseMap = new Map(); + const baseTime = Date.now(); + + for (let i = 0; i < blobs.length; i++) { + const blob = blobs[i]; + const content = blob.content; + const ts = new Date(baseTime + (blob.sequence ?? i) * 100).toISOString(); + const baseId = blob.id || generateMessageId('cursor'); + + try { + if (!content?.role || !content?.content) { + if (content?.message?.role && content?.message?.content) { + if (content.message.role === 'system') { + continue; + } + const role = content.message.role === 'user' ? 'user' : 'assistant'; + let text = ''; + if (Array.isArray(content.message.content)) { + text = content.message.content + .map((part: string | RawProviderMessage) => typeof part === 'string' ? part : part?.text || '') + .filter(Boolean) + .join('\n'); + } else if (typeof content.message.content === 'string') { + text = content.message.content; + } + if (text?.trim()) { + messages.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role, + content: text, + sequence: blob.sequence, + rowid: blob.rowid, + })); + } + } + continue; + } + + if (content.role === 'system') { + continue; + } + + if (content.role === 'tool') { + const toolItems = Array.isArray(content.content) ? content.content : []; + for (const item of toolItems) { + if (item?.type !== 'tool-result') { + continue; + } + const toolCallId = item.toolCallId || content.id; + messages.push(createNormalizedMessage({ + id: `${baseId}_tr`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_result', + toolId: toolCallId, + content: item.result || '', + isError: false, + })); + } + continue; + } + + const role = content.role === 'user' ? 'user' : 'assistant'; + + if (Array.isArray(content.content)) { + for (let partIdx = 0; partIdx < content.content.length; partIdx++) { + const part = content.content[partIdx]; + + if (part?.type === 'text' && part?.text) { + messages.push(createNormalizedMessage({ + id: `${baseId}_${partIdx}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role, + content: part.text, + sequence: blob.sequence, + rowid: blob.rowid, + })); + } else if (part?.type === 'reasoning' && part?.text) { + messages.push(createNormalizedMessage({ + id: `${baseId}_${partIdx}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'thinking', + content: part.text, + })); + } else if (part?.type === 'tool-call' || part?.type === 'tool_use') { + const rawToolName = part.toolName || part.name || 'Unknown Tool'; + const toolName = rawToolName === 'ApplyPatch' ? 'Edit' : rawToolName; + const toolId = part.toolCallId || part.id || `tool_${i}_${partIdx}`; + const message = createNormalizedMessage({ + id: `${baseId}_${partIdx}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName, + toolInput: part.args || part.input, + toolId, + }); + messages.push(message); + toolUseMap.set(toolId, message); + } + } + } else if (typeof content.content === 'string' && content.content.trim()) { + messages.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role, + content: content.content, + sequence: blob.sequence, + rowid: blob.rowid, + })); + } + } catch (error) { + console.warn('Error normalizing cursor blob:', error); + } + } + + for (const msg of messages) { + if (msg.kind === 'tool_result' && msg.toolId && toolUseMap.has(msg.toolId)) { + const toolUse = toolUseMap.get(msg.toolId); + if (toolUse) { + toolUse.toolResult = { + content: msg.content, + isError: msg.isError, + }; + } + } + } + + messages.sort((a, b) => { + if (a.sequence !== undefined && b.sequence !== undefined) { + return a.sequence - b.sequence; + } + if (a.rowid !== undefined && b.rowid !== undefined) { + return a.rowid - b.rowid; + } + return new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime(); + }); + + return messages; + } } diff --git a/server/modules/providers/list/gemini/gemini.provider.ts b/server/modules/providers/list/gemini/gemini.provider.ts index 69a3510c..a2017653 100644 --- a/server/modules/providers/list/gemini/gemini.provider.ts +++ b/server/modules/providers/list/gemini/gemini.provider.ts @@ -1,5 +1,17 @@ +import sessionManager from '@/sessionManager.js'; +import { getGeminiCliSessionMessages } from '@/projects.js'; import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js'; import { GeminiMcpProvider } from '@/modules/providers/list/gemini/gemini-mcp.provider.js'; +import type { FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js'; +import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js'; + +const PROVIDER = 'gemini'; + +type RawProviderMessage = Record; + +function readRawProviderMessage(raw: unknown): RawProviderMessage | null { + return readObjectRecord(raw) as RawProviderMessage | null; +} export class GeminiProvider extends AbstractProvider { readonly mcp = new GeminiMcpProvider(); @@ -7,4 +19,214 @@ export class GeminiProvider extends AbstractProvider { constructor() { super('gemini'); } + + /** + * Normalizes live Gemini stream-json events into the shared message shape. + * + * Gemini history uses a different session file shape, so fetchHistory handles + * that separately after loading raw persisted messages. + */ + normalizeMessage(rawMessage: unknown, sessionId: string | null): NormalizedMessage[] { + const raw = readRawProviderMessage(rawMessage); + if (!raw) { + return []; + } + + const ts = raw.timestamp || new Date().toISOString(); + const baseId = raw.uuid || generateMessageId('gemini'); + + if (raw.type === 'message' && raw.role === 'assistant') { + const content = raw.content || ''; + const messages: NormalizedMessage[] = []; + if (content) { + messages.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'stream_delta', + content, + })); + } + if (raw.delta !== true) { + messages.push(createNormalizedMessage({ + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'stream_end', + })); + } + return messages; + } + + if (raw.type === 'tool_use') { + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: raw.tool_name, + toolInput: raw.parameters || {}, + toolId: raw.tool_id || baseId, + })]; + } + + if (raw.type === 'tool_result') { + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_result', + toolId: raw.tool_id || '', + content: raw.output === undefined ? '' : String(raw.output), + isError: raw.status === 'error', + })]; + } + + if (raw.type === 'result') { + const messages = [createNormalizedMessage({ + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'stream_end', + })]; + if (raw.stats?.total_tokens) { + messages.push(createNormalizedMessage({ + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'status', + text: 'Complete', + tokens: raw.stats.total_tokens, + canInterrupt: false, + })); + } + return messages; + } + + if (raw.type === 'error') { + return [createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'error', + content: raw.error || raw.message || 'Unknown Gemini streaming error', + })]; + } + + return []; + } + + /** + * Loads Gemini history from the in-memory session manager first, then falls + * back to Gemini CLI session files on disk. + */ + async fetchHistory( + sessionId: string, + _options: FetchHistoryOptions = {}, + ): Promise { + let rawMessages: RawProviderMessage[]; + try { + rawMessages = sessionManager.getSessionMessages(sessionId) as RawProviderMessage[]; + + if (rawMessages.length === 0) { + rawMessages = await getGeminiCliSessionMessages(sessionId) as RawProviderMessage[]; + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`[GeminiProvider] Failed to load session ${sessionId}:`, message); + return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; + } + + const normalized: NormalizedMessage[] = []; + for (let i = 0; i < rawMessages.length; i++) { + const raw = rawMessages[i]; + const ts = raw.timestamp || new Date().toISOString(); + const baseId = raw.uuid || generateMessageId('gemini'); + + const role = raw.message?.role || raw.role; + const content = raw.message?.content || raw.content; + + if (!role || !content) { + continue; + } + + const normalizedRole = role === 'user' ? 'user' : 'assistant'; + + if (Array.isArray(content)) { + for (let partIdx = 0; partIdx < content.length; partIdx++) { + const part = content[partIdx]; + if (part.type === 'text' && part.text) { + normalized.push(createNormalizedMessage({ + id: `${baseId}_${partIdx}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: normalizedRole, + content: part.text, + })); + } else if (part.type === 'tool_use') { + normalized.push(createNormalizedMessage({ + id: `${baseId}_${partIdx}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_use', + toolName: part.name, + toolInput: part.input, + toolId: part.id || generateMessageId('gemini_tool'), + })); + } else if (part.type === 'tool_result') { + normalized.push(createNormalizedMessage({ + id: `${baseId}_${partIdx}`, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'tool_result', + toolId: part.tool_use_id || '', + content: part.content === undefined ? '' : String(part.content), + isError: Boolean(part.is_error), + })); + } + } + } else if (typeof content === 'string' && content.trim()) { + normalized.push(createNormalizedMessage({ + id: baseId, + sessionId, + timestamp: ts, + provider: PROVIDER, + kind: 'text', + role: normalizedRole, + content, + })); + } + } + + const toolResultMap = new Map(); + for (const msg of normalized) { + if (msg.kind === 'tool_result' && msg.toolId) { + toolResultMap.set(msg.toolId, msg); + } + } + for (const msg of normalized) { + if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) { + const toolResult = toolResultMap.get(msg.toolId); + if (toolResult) { + msg.toolResult = { content: toolResult.content, isError: toolResult.isError }; + } + } + } + + return { + messages: normalized, + total: normalized.length, + hasMore: false, + offset: 0, + limit: null, + }; + } } diff --git a/server/modules/providers/provider.registry.ts b/server/modules/providers/provider.registry.ts index 4ae3ef78..2f959b22 100644 --- a/server/modules/providers/provider.registry.ts +++ b/server/modules/providers/provider.registry.ts @@ -14,7 +14,7 @@ const providers: Record = { }; /** - * Central registry for resolving provider MCP implementations by id. + * Central registry for resolving concrete provider implementations by id. */ export const providerRegistry = { listProviders(): IProvider[] { diff --git a/server/modules/providers/services/providers.service.ts b/server/modules/providers/services/providers.service.ts new file mode 100644 index 00000000..9ecfc65b --- /dev/null +++ b/server/modules/providers/services/providers.service.ts @@ -0,0 +1,36 @@ +import { providerRegistry } from '@/modules/providers/provider.registry.js'; +import type { + FetchHistoryOptions, + FetchHistoryResult, + LLMProvider, + NormalizedMessage, +} from '@/shared/types.js'; + +/** + * Application service for provider message operations. + * + * Callers pass a provider id and this service resolves the concrete provider + * class, keeping normalization/history call sites decoupled from implementation + * file layout. + */ +export const providersService = { + listProviderIds(): LLMProvider[] { + return providerRegistry.listProviders().map((provider) => provider.id); + }, + + normalizeMessage( + providerName: string, + raw: unknown, + sessionId: string | null, + ): NormalizedMessage[] { + return providerRegistry.resolveProvider(providerName).normalizeMessage(raw, sessionId); + }, + + fetchHistory( + providerName: string, + sessionId: string, + options?: FetchHistoryOptions, + ): Promise { + return providerRegistry.resolveProvider(providerName).fetchHistory(sessionId, options); + }, +}; diff --git a/server/modules/providers/shared/base/abstract.provider.ts b/server/modules/providers/shared/base/abstract.provider.ts index 2cd24591..dba18dac 100644 --- a/server/modules/providers/shared/base/abstract.provider.ts +++ b/server/modules/providers/shared/base/abstract.provider.ts @@ -1,8 +1,16 @@ import type { IProvider, IProviderMcpRuntime } from '@/shared/interfaces.js'; -import type { LLMProvider } from '@/shared/types.js'; +import type { + FetchHistoryOptions, + FetchHistoryResult, + LLMProvider, + NormalizedMessage, +} from '@/shared/types.js'; /** - * Shared MCP-only provider base. + * Shared provider base. + * + * Concrete providers must implement message normalization and history loading + * because both behaviors depend on each provider's native SDK/CLI event format. */ export abstract class AbstractProvider implements IProvider { readonly id: LLMProvider; @@ -11,4 +19,11 @@ export abstract class AbstractProvider implements IProvider { protected constructor(id: LLMProvider) { this.id = id; } + + abstract normalizeMessage(raw: unknown, sessionId: string | null): NormalizedMessage[]; + + abstract fetchHistory( + sessionId: string, + options?: FetchHistoryOptions, + ): Promise; } diff --git a/server/openai-codex.js b/server/openai-codex.js index 0169a3b6..6ea6d5ae 100644 --- a/server/openai-codex.js +++ b/server/openai-codex.js @@ -15,8 +15,8 @@ import { Codex } from '@openai/codex-sdk'; import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js'; -import { codexAdapter } from './providers/codex/adapter.js'; -import { createNormalizedMessage } from './providers/types.js'; +import { providersService } from './modules/providers/services/providers.service.js'; +import { createNormalizedMessage } from './shared/utils.js'; // Track active sessions const activeCodexSessions = new Map(); @@ -264,7 +264,7 @@ export async function queryCodex(command, options = {}, ws) { const transformed = transformCodexEvent(event); // Normalize the transformed event into NormalizedMessage(s) via adapter - const normalizedMsgs = codexAdapter.normalizeMessage(transformed, currentSessionId); + const normalizedMsgs = providersService.normalizeMessage('codex', transformed, currentSessionId); for (const msg of normalizedMsgs) { sendMessage(ws, msg); } diff --git a/server/providers/claude/adapter.js b/server/providers/claude/adapter.js deleted file mode 100644 index d5f850ba..00000000 --- a/server/providers/claude/adapter.js +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Claude provider adapter. - * - * Normalizes Claude SDK session history into NormalizedMessage format. - * @module adapters/claude - */ - -import { getSessionMessages } from '../../projects.js'; -import { createNormalizedMessage, generateMessageId } from '../types.js'; -import { isInternalContent } from '../utils.js'; - -const PROVIDER = 'claude'; - -/** - * Normalize a raw JSONL message or realtime SDK event into NormalizedMessage(s). - * Handles both history entries (JSONL `{ message: { role, content } }`) and - * realtime streaming events (`content_block_delta`, `content_block_stop`, etc.). - * @param {object} raw - A single entry from JSONL or a live SDK event - * @param {string} sessionId - * @returns {import('../types.js').NormalizedMessage[]} - */ -export function normalizeMessage(raw, sessionId) { - // ── Streaming events (realtime) ────────────────────────────────────────── - if (raw.type === 'content_block_delta' && raw.delta?.text) { - return [createNormalizedMessage({ kind: 'stream_delta', content: raw.delta.text, sessionId, provider: PROVIDER })]; - } - if (raw.type === 'content_block_stop') { - return [createNormalizedMessage({ kind: 'stream_end', sessionId, provider: PROVIDER })]; - } - - // ── History / full-message events ──────────────────────────────────────── - const messages = []; - const ts = raw.timestamp || new Date().toISOString(); - const baseId = raw.uuid || generateMessageId('claude'); - - // User message - if (raw.message?.role === 'user' && raw.message?.content) { - if (Array.isArray(raw.message.content)) { - // Handle tool_result parts - for (const part of raw.message.content) { - if (part.type === 'tool_result') { - messages.push(createNormalizedMessage({ - id: `${baseId}_tr_${part.tool_use_id}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_result', - toolId: part.tool_use_id, - content: typeof part.content === 'string' ? part.content : JSON.stringify(part.content), - isError: Boolean(part.is_error), - subagentTools: raw.subagentTools, - toolUseResult: raw.toolUseResult, - })); - } else if (part.type === 'text') { - // Regular text parts from user - const text = part.text || ''; - if (text && !isInternalContent(text)) { - messages.push(createNormalizedMessage({ - id: `${baseId}_text`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: 'user', - content: text, - })); - } - } - } - - // If no text parts were found, check if it's a pure user message - if (messages.length === 0) { - const textParts = raw.message.content - .filter(p => p.type === 'text') - .map(p => p.text) - .filter(Boolean) - .join('\n'); - if (textParts && !isInternalContent(textParts)) { - messages.push(createNormalizedMessage({ - id: `${baseId}_text`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: 'user', - content: textParts, - })); - } - } - } else if (typeof raw.message.content === 'string') { - const text = raw.message.content; - if (text && !isInternalContent(text)) { - messages.push(createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: 'user', - content: text, - })); - } - } - return messages; - } - - // Thinking message - if (raw.type === 'thinking' && raw.message?.content) { - messages.push(createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'thinking', - content: raw.message.content, - })); - return messages; - } - - // Tool use result (codex-style in Claude) - if (raw.type === 'tool_use' && raw.toolName) { - messages.push(createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_use', - toolName: raw.toolName, - toolInput: raw.toolInput, - toolId: raw.toolCallId || baseId, - })); - return messages; - } - - if (raw.type === 'tool_result') { - messages.push(createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_result', - toolId: raw.toolCallId || '', - content: raw.output || '', - isError: false, - })); - return messages; - } - - // Assistant message - if (raw.message?.role === 'assistant' && raw.message?.content) { - if (Array.isArray(raw.message.content)) { - let partIndex = 0; - for (const part of raw.message.content) { - if (part.type === 'text' && part.text) { - messages.push(createNormalizedMessage({ - id: `${baseId}_${partIndex}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: 'assistant', - content: part.text, - })); - } else if (part.type === 'tool_use') { - messages.push(createNormalizedMessage({ - id: `${baseId}_${partIndex}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_use', - toolName: part.name, - toolInput: part.input, - toolId: part.id, - })); - } else if (part.type === 'thinking' && part.thinking) { - messages.push(createNormalizedMessage({ - id: `${baseId}_${partIndex}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'thinking', - content: part.thinking, - })); - } - partIndex++; - } - } else if (typeof raw.message.content === 'string') { - messages.push(createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: 'assistant', - content: raw.message.content, - })); - } - return messages; - } - - return messages; -} - -/** - * @type {import('../types.js').ProviderAdapter} - */ -export const claudeAdapter = { - normalizeMessage, - - /** - * Fetch session history from JSONL files, returning normalized messages. - */ - async fetchHistory(sessionId, opts = {}) { - const { projectName, limit = null, offset = 0 } = opts; - if (!projectName) { - return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; - } - - let result; - try { - result = await getSessionMessages(projectName, sessionId, limit, offset); - } catch (error) { - console.warn(`[ClaudeAdapter] Failed to load session ${sessionId}:`, error.message); - return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; - } - - // getSessionMessages returns either an array (no limit) or { messages, total, hasMore } - const rawMessages = Array.isArray(result) ? result : (result.messages || []); - const total = Array.isArray(result) ? rawMessages.length : (result.total || 0); - const hasMore = Array.isArray(result) ? false : Boolean(result.hasMore); - - // First pass: collect tool results for attachment to tool_use messages - const toolResultMap = new Map(); - for (const raw of rawMessages) { - if (raw.message?.role === 'user' && Array.isArray(raw.message?.content)) { - for (const part of raw.message.content) { - if (part.type === 'tool_result') { - toolResultMap.set(part.tool_use_id, { - content: part.content, - isError: Boolean(part.is_error), - timestamp: raw.timestamp, - subagentTools: raw.subagentTools, - toolUseResult: raw.toolUseResult, - }); - } - } - } - } - - // Second pass: normalize all messages - const normalized = []; - for (const raw of rawMessages) { - const entries = normalizeMessage(raw, sessionId); - normalized.push(...entries); - } - - // Attach tool results to their corresponding tool_use messages - for (const msg of normalized) { - if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) { - const tr = toolResultMap.get(msg.toolId); - msg.toolResult = { - content: typeof tr.content === 'string' ? tr.content : JSON.stringify(tr.content), - isError: tr.isError, - toolUseResult: tr.toolUseResult, - }; - msg.subagentTools = tr.subagentTools; - } - } - - return { - messages: normalized, - total, - hasMore, - offset, - limit, - }; - }, -}; diff --git a/server/providers/codex/adapter.js b/server/providers/codex/adapter.js deleted file mode 100644 index c9cae00f..00000000 --- a/server/providers/codex/adapter.js +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Codex (OpenAI) provider adapter. - * - * Normalizes Codex SDK session history into NormalizedMessage format. - * @module adapters/codex - */ - -import { getCodexSessionMessages } from '../../projects.js'; -import { createNormalizedMessage, generateMessageId } from '../types.js'; - -const PROVIDER = 'codex'; - -/** - * Normalize a raw Codex JSONL message into NormalizedMessage(s). - * @param {object} raw - A single parsed message from Codex JSONL - * @param {string} sessionId - * @returns {import('../types.js').NormalizedMessage[]} - */ -function normalizeCodexHistoryEntry(raw, sessionId) { - const ts = raw.timestamp || new Date().toISOString(); - const baseId = raw.uuid || generateMessageId('codex'); - - // User message - if (raw.message?.role === 'user') { - const content = typeof raw.message.content === 'string' - ? raw.message.content - : Array.isArray(raw.message.content) - ? raw.message.content.map(p => typeof p === 'string' ? p : p?.text || '').filter(Boolean).join('\n') - : String(raw.message.content || ''); - if (!content.trim()) return []; - return [createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: 'user', - content, - })]; - } - - // Assistant message - if (raw.message?.role === 'assistant') { - const content = typeof raw.message.content === 'string' - ? raw.message.content - : Array.isArray(raw.message.content) - ? raw.message.content.map(p => typeof p === 'string' ? p : p?.text || '').filter(Boolean).join('\n') - : ''; - if (!content.trim()) return []; - return [createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: 'assistant', - content, - })]; - } - - // Thinking/reasoning - if (raw.type === 'thinking' || raw.isReasoning) { - return [createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'thinking', - content: raw.message?.content || '', - })]; - } - - // Tool use - if (raw.type === 'tool_use' || raw.toolName) { - return [createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_use', - toolName: raw.toolName || 'Unknown', - toolInput: raw.toolInput, - toolId: raw.toolCallId || baseId, - })]; - } - - // Tool result - if (raw.type === 'tool_result') { - return [createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_result', - toolId: raw.toolCallId || '', - content: raw.output || '', - isError: Boolean(raw.isError), - })]; - } - - return []; -} - -/** - * Normalize a raw Codex event (history JSONL or transformed SDK event) into NormalizedMessage(s). - * @param {object} raw - A history entry (has raw.message.role) or transformed SDK event (has raw.type) - * @param {string} sessionId - * @returns {import('../types.js').NormalizedMessage[]} - */ -export function normalizeMessage(raw, sessionId) { - // History format: has message.role - if (raw.message?.role) { - return normalizeCodexHistoryEntry(raw, sessionId); - } - - const ts = raw.timestamp || new Date().toISOString(); - const baseId = raw.uuid || generateMessageId('codex'); - - // SDK event format (output of transformCodexEvent) - if (raw.type === 'item') { - switch (raw.itemType) { - case 'agent_message': - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'text', role: 'assistant', content: raw.message?.content || '', - })]; - case 'reasoning': - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'thinking', content: raw.message?.content || '', - })]; - case 'command_execution': - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'tool_use', toolName: 'Bash', toolInput: { command: raw.command }, - toolId: baseId, - output: raw.output, exitCode: raw.exitCode, status: raw.status, - })]; - case 'file_change': - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'tool_use', toolName: 'FileChanges', toolInput: raw.changes, - toolId: baseId, status: raw.status, - })]; - case 'mcp_tool_call': - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'tool_use', toolName: raw.tool || 'MCP', toolInput: raw.arguments, - toolId: baseId, server: raw.server, result: raw.result, - error: raw.error, status: raw.status, - })]; - case 'web_search': - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'tool_use', toolName: 'WebSearch', toolInput: { query: raw.query }, - toolId: baseId, - })]; - case 'todo_list': - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'tool_use', toolName: 'TodoList', toolInput: { items: raw.items }, - toolId: baseId, - })]; - case 'error': - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'error', content: raw.message?.content || 'Unknown error', - })]; - default: - // Unknown item type — pass through as generic tool_use - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'tool_use', toolName: raw.itemType || 'Unknown', - toolInput: raw.item || raw, toolId: baseId, - })]; - } - } - - if (raw.type === 'turn_complete') { - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'complete', - })]; - } - if (raw.type === 'turn_failed') { - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'error', content: raw.error?.message || 'Turn failed', - })]; - } - - return []; -} - -/** - * @type {import('../types.js').ProviderAdapter} - */ -export const codexAdapter = { - normalizeMessage, - /** - * Fetch session history from Codex JSONL files. - */ - async fetchHistory(sessionId, opts = {}) { - const { limit = null, offset = 0 } = opts; - - let result; - try { - result = await getCodexSessionMessages(sessionId, limit, offset); - } catch (error) { - console.warn(`[CodexAdapter] Failed to load session ${sessionId}:`, error.message); - return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; - } - - const rawMessages = Array.isArray(result) ? result : (result.messages || []); - const total = Array.isArray(result) ? rawMessages.length : (result.total || 0); - const hasMore = Array.isArray(result) ? false : Boolean(result.hasMore); - const tokenUsage = result.tokenUsage || null; - - const normalized = []; - for (const raw of rawMessages) { - const entries = normalizeCodexHistoryEntry(raw, sessionId); - normalized.push(...entries); - } - - // Attach tool results to tool_use messages - const toolResultMap = new Map(); - for (const msg of normalized) { - if (msg.kind === 'tool_result' && msg.toolId) { - toolResultMap.set(msg.toolId, msg); - } - } - for (const msg of normalized) { - if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) { - const tr = toolResultMap.get(msg.toolId); - msg.toolResult = { content: tr.content, isError: tr.isError }; - } - } - - return { - messages: normalized, - total, - hasMore, - offset, - limit, - tokenUsage, - }; - }, -}; diff --git a/server/providers/cursor/adapter.js b/server/providers/cursor/adapter.js deleted file mode 100644 index c86215ff..00000000 --- a/server/providers/cursor/adapter.js +++ /dev/null @@ -1,353 +0,0 @@ -/** - * Cursor provider adapter. - * - * Normalizes Cursor CLI session history into NormalizedMessage format. - * @module adapters/cursor - */ - -import path from 'path'; -import os from 'os'; -import crypto from 'crypto'; -import { createNormalizedMessage, generateMessageId } from '../types.js'; - -const PROVIDER = 'cursor'; - -/** - * Load raw blobs from Cursor's SQLite store.db, parse the DAG structure, - * and return sorted message blobs in chronological order. - * @param {string} sessionId - * @param {string} projectPath - Absolute project path (used to compute cwdId hash) - * @returns {Promise>} - */ -async function loadCursorBlobs(sessionId, projectPath) { - // Lazy-import sqlite so the module doesn't fail if sqlite3 is unavailable - const { default: sqlite3 } = await import('sqlite3'); - const { open } = await import('sqlite'); - - const cwdId = crypto.createHash('md5').update(projectPath || process.cwd()).digest('hex'); - const storeDbPath = path.join(os.homedir(), '.cursor', 'chats', cwdId, sessionId, 'store.db'); - - const db = await open({ - filename: storeDbPath, - driver: sqlite3.Database, - mode: sqlite3.OPEN_READONLY, - }); - - try { - const allBlobs = await db.all('SELECT rowid, id, data FROM blobs'); - - const blobMap = new Map(); - const parentRefs = new Map(); - const childRefs = new Map(); - const jsonBlobs = []; - - for (const blob of allBlobs) { - blobMap.set(blob.id, blob); - - if (blob.data && blob.data[0] === 0x7B) { - try { - const parsed = JSON.parse(blob.data.toString('utf8')); - jsonBlobs.push({ ...blob, parsed }); - } catch { - // skip unparseable blobs - } - } else if (blob.data) { - const parents = []; - let i = 0; - while (i < blob.data.length - 33) { - if (blob.data[i] === 0x0A && blob.data[i + 1] === 0x20) { - const parentHash = blob.data.slice(i + 2, i + 34).toString('hex'); - if (blobMap.has(parentHash)) { - parents.push(parentHash); - } - i += 34; - } else { - i++; - } - } - if (parents.length > 0) { - parentRefs.set(blob.id, parents); - for (const parentId of parents) { - if (!childRefs.has(parentId)) childRefs.set(parentId, []); - childRefs.get(parentId).push(blob.id); - } - } - } - } - - // Topological sort (DFS) - const visited = new Set(); - const sorted = []; - function visit(nodeId) { - if (visited.has(nodeId)) return; - visited.add(nodeId); - for (const pid of (parentRefs.get(nodeId) || [])) visit(pid); - const b = blobMap.get(nodeId); - if (b) sorted.push(b); - } - for (const blob of allBlobs) { - if (!parentRefs.has(blob.id)) visit(blob.id); - } - for (const blob of allBlobs) visit(blob.id); - - // Order JSON blobs by DAG appearance - const messageOrder = new Map(); - let orderIndex = 0; - for (const blob of sorted) { - if (blob.data && blob.data[0] !== 0x7B) { - for (const jb of jsonBlobs) { - try { - const idBytes = Buffer.from(jb.id, 'hex'); - if (blob.data.includes(idBytes) && !messageOrder.has(jb.id)) { - messageOrder.set(jb.id, orderIndex++); - } - } catch { /* skip */ } - } - } - } - - const sortedJsonBlobs = jsonBlobs.sort((a, b) => { - const oa = messageOrder.get(a.id) ?? Number.MAX_SAFE_INTEGER; - const ob = messageOrder.get(b.id) ?? Number.MAX_SAFE_INTEGER; - return oa !== ob ? oa - ob : a.rowid - b.rowid; - }); - - const messages = []; - for (let idx = 0; idx < sortedJsonBlobs.length; idx++) { - const blob = sortedJsonBlobs[idx]; - const parsed = blob.parsed; - if (!parsed) continue; - const role = parsed?.role || parsed?.message?.role; - if (role === 'system') continue; - messages.push({ - id: blob.id, - sequence: idx + 1, - rowid: blob.rowid, - content: parsed, - }); - } - - return messages; - } finally { - await db.close(); - } -} - -/** - * Normalize a realtime NDJSON event from Cursor CLI into NormalizedMessage(s). - * History uses normalizeCursorBlobs (SQLite DAG), this handles streaming NDJSON. - * @param {object|string} raw - A parsed NDJSON event or a raw text line - * @param {string} sessionId - * @returns {import('../types.js').NormalizedMessage[]} - */ -export function normalizeMessage(raw, sessionId) { - // Structured assistant message with content array - if (raw && typeof raw === 'object' && raw.type === 'assistant' && raw.message?.content?.[0]?.text) { - return [createNormalizedMessage({ kind: 'stream_delta', content: raw.message.content[0].text, sessionId, provider: PROVIDER })]; - } - // Plain string line (non-JSON output) - if (typeof raw === 'string' && raw.trim()) { - return [createNormalizedMessage({ kind: 'stream_delta', content: raw, sessionId, provider: PROVIDER })]; - } - return []; -} - -/** - * @type {import('../types.js').ProviderAdapter} - */ -export const cursorAdapter = { - normalizeMessage, - /** - * Fetch session history for Cursor from SQLite store.db. - */ - async fetchHistory(sessionId, opts = {}) { - const { projectPath = '', limit = null, offset = 0 } = opts; - - try { - const blobs = await loadCursorBlobs(sessionId, projectPath); - const allNormalized = cursorAdapter.normalizeCursorBlobs(blobs, sessionId); - - // Apply pagination - if (limit !== null && limit > 0) { - const start = offset; - const page = allNormalized.slice(start, start + limit); - return { - messages: page, - total: allNormalized.length, - hasMore: start + limit < allNormalized.length, - offset, - limit, - }; - } - - return { - messages: allNormalized, - total: allNormalized.length, - hasMore: false, - offset: 0, - limit: null, - }; - } catch (error) { - // DB doesn't exist or is unreadable — return empty - console.warn(`[CursorAdapter] Failed to load session ${sessionId}:`, error.message); - return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; - } - }, - - /** - * Normalize raw Cursor blob messages into NormalizedMessage[]. - * @param {any[]} blobs - Raw cursor blobs from store.db ({id, sequence, rowid, content}) - * @param {string} sessionId - * @returns {import('../types.js').NormalizedMessage[]} - */ - normalizeCursorBlobs(blobs, sessionId) { - const messages = []; - const toolUseMap = new Map(); - - // Use a fixed base timestamp so messages have stable, monotonically-increasing - // timestamps based on their sequence number rather than wall-clock time. - const baseTime = Date.now(); - - for (let i = 0; i < blobs.length; i++) { - const blob = blobs[i]; - const content = blob.content; - const ts = new Date(baseTime + (blob.sequence ?? i) * 100).toISOString(); - const baseId = blob.id || generateMessageId('cursor'); - - try { - if (!content?.role || !content?.content) { - // Try nested message format - if (content?.message?.role && content?.message?.content) { - if (content.message.role === 'system') continue; - const role = content.message.role === 'user' ? 'user' : 'assistant'; - let text = ''; - if (Array.isArray(content.message.content)) { - text = content.message.content - .map(p => typeof p === 'string' ? p : p?.text || '') - .filter(Boolean) - .join('\n'); - } else if (typeof content.message.content === 'string') { - text = content.message.content; - } - if (text?.trim()) { - messages.push(createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role, - content: text, - sequence: blob.sequence, - rowid: blob.rowid, - })); - } - } - continue; - } - - if (content.role === 'system') continue; - - // Tool results - if (content.role === 'tool') { - const toolItems = Array.isArray(content.content) ? content.content : []; - for (const item of toolItems) { - if (item?.type !== 'tool-result') continue; - const toolCallId = item.toolCallId || content.id; - messages.push(createNormalizedMessage({ - id: `${baseId}_tr`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_result', - toolId: toolCallId, - content: item.result || '', - isError: false, - })); - } - continue; - } - - const role = content.role === 'user' ? 'user' : 'assistant'; - - if (Array.isArray(content.content)) { - for (let partIdx = 0; partIdx < content.content.length; partIdx++) { - const part = content.content[partIdx]; - - if (part?.type === 'text' && part?.text) { - messages.push(createNormalizedMessage({ - id: `${baseId}_${partIdx}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role, - content: part.text, - sequence: blob.sequence, - rowid: blob.rowid, - })); - } else if (part?.type === 'reasoning' && part?.text) { - messages.push(createNormalizedMessage({ - id: `${baseId}_${partIdx}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'thinking', - content: part.text, - })); - } else if (part?.type === 'tool-call' || part?.type === 'tool_use') { - const toolName = (part.toolName || part.name || 'Unknown Tool') === 'ApplyPatch' - ? 'Edit' : (part.toolName || part.name || 'Unknown Tool'); - const toolId = part.toolCallId || part.id || `tool_${i}_${partIdx}`; - messages.push(createNormalizedMessage({ - id: `${baseId}_${partIdx}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_use', - toolName, - toolInput: part.args || part.input, - toolId, - })); - toolUseMap.set(toolId, messages[messages.length - 1]); - } - } - } else if (typeof content.content === 'string' && content.content.trim()) { - messages.push(createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role, - content: content.content, - sequence: blob.sequence, - rowid: blob.rowid, - })); - } - } catch (error) { - console.warn('Error normalizing cursor blob:', error); - } - } - - // Attach tool results to tool_use messages - for (const msg of messages) { - if (msg.kind === 'tool_result' && msg.toolId && toolUseMap.has(msg.toolId)) { - const toolUse = toolUseMap.get(msg.toolId); - toolUse.toolResult = { - content: msg.content, - isError: msg.isError, - }; - } - } - - // Sort by sequence/rowid - messages.sort((a, b) => { - if (a.sequence !== undefined && b.sequence !== undefined) return a.sequence - b.sequence; - if (a.rowid !== undefined && b.rowid !== undefined) return a.rowid - b.rowid; - return new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime(); - }); - - return messages; - }, -}; diff --git a/server/providers/gemini/adapter.js b/server/providers/gemini/adapter.js deleted file mode 100644 index df303c36..00000000 --- a/server/providers/gemini/adapter.js +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Gemini provider adapter. - * - * Normalizes Gemini CLI session history into NormalizedMessage format. - * @module adapters/gemini - */ - -import sessionManager from '../../sessionManager.js'; -import { getGeminiCliSessionMessages } from '../../projects.js'; -import { createNormalizedMessage, generateMessageId } from '../types.js'; - -const PROVIDER = 'gemini'; - -/** - * Normalize a realtime NDJSON event from Gemini CLI into NormalizedMessage(s). - * Handles: message (delta/final), tool_use, tool_result, result, error. - * @param {object} raw - A parsed NDJSON event - * @param {string} sessionId - * @returns {import('../types.js').NormalizedMessage[]} - */ -export function normalizeMessage(raw, sessionId) { - const ts = raw.timestamp || new Date().toISOString(); - const baseId = raw.uuid || generateMessageId('gemini'); - - if (raw.type === 'message' && raw.role === 'assistant') { - const content = raw.content || ''; - const msgs = []; - if (content) { - msgs.push(createNormalizedMessage({ id: baseId, sessionId, timestamp: ts, provider: PROVIDER, kind: 'stream_delta', content })); - } - // If not a delta, also send stream_end - if (raw.delta !== true) { - msgs.push(createNormalizedMessage({ sessionId, timestamp: ts, provider: PROVIDER, kind: 'stream_end' })); - } - return msgs; - } - - if (raw.type === 'tool_use') { - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'tool_use', toolName: raw.tool_name, toolInput: raw.parameters || {}, - toolId: raw.tool_id || baseId, - })]; - } - - if (raw.type === 'tool_result') { - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'tool_result', toolId: raw.tool_id || '', - content: raw.output === undefined ? '' : String(raw.output), - isError: raw.status === 'error', - })]; - } - - if (raw.type === 'result') { - const msgs = [createNormalizedMessage({ sessionId, timestamp: ts, provider: PROVIDER, kind: 'stream_end' })]; - if (raw.stats?.total_tokens) { - msgs.push(createNormalizedMessage({ - sessionId, timestamp: ts, provider: PROVIDER, - kind: 'status', text: 'Complete', tokens: raw.stats.total_tokens, canInterrupt: false, - })); - } - return msgs; - } - - if (raw.type === 'error') { - return [createNormalizedMessage({ - id: baseId, sessionId, timestamp: ts, provider: PROVIDER, - kind: 'error', content: raw.error || raw.message || 'Unknown Gemini streaming error', - })]; - } - - return []; -} - -/** - * @type {import('../types.js').ProviderAdapter} - */ -export const geminiAdapter = { - normalizeMessage, - /** - * Fetch session history for Gemini. - * First tries in-memory session manager, then falls back to CLI sessions on disk. - */ - async fetchHistory(sessionId, opts = {}) { - let rawMessages; - try { - rawMessages = sessionManager.getSessionMessages(sessionId); - - // Fallback to Gemini CLI sessions on disk - if (rawMessages.length === 0) { - rawMessages = await getGeminiCliSessionMessages(sessionId); - } - } catch (error) { - console.warn(`[GeminiAdapter] Failed to load session ${sessionId}:`, error.message); - return { messages: [], total: 0, hasMore: false, offset: 0, limit: null }; - } - - const normalized = []; - for (let i = 0; i < rawMessages.length; i++) { - const raw = rawMessages[i]; - const ts = raw.timestamp || new Date().toISOString(); - const baseId = raw.uuid || generateMessageId('gemini'); - - // sessionManager format: { type: 'message', message: { role, content }, timestamp } - // CLI format: { role: 'user'|'gemini'|'assistant', content: string|array } - const role = raw.message?.role || raw.role; - const content = raw.message?.content || raw.content; - - if (!role || !content) continue; - - const normalizedRole = (role === 'user') ? 'user' : 'assistant'; - - if (Array.isArray(content)) { - for (let partIdx = 0; partIdx < content.length; partIdx++) { - const part = content[partIdx]; - if (part.type === 'text' && part.text) { - normalized.push(createNormalizedMessage({ - id: `${baseId}_${partIdx}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: normalizedRole, - content: part.text, - })); - } else if (part.type === 'tool_use') { - normalized.push(createNormalizedMessage({ - id: `${baseId}_${partIdx}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_use', - toolName: part.name, - toolInput: part.input, - toolId: part.id || generateMessageId('gemini_tool'), - })); - } else if (part.type === 'tool_result') { - normalized.push(createNormalizedMessage({ - id: `${baseId}_${partIdx}`, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'tool_result', - toolId: part.tool_use_id || '', - content: part.content === undefined ? '' : String(part.content), - isError: Boolean(part.is_error), - })); - } - } - } else if (typeof content === 'string' && content.trim()) { - normalized.push(createNormalizedMessage({ - id: baseId, - sessionId, - timestamp: ts, - provider: PROVIDER, - kind: 'text', - role: normalizedRole, - content, - })); - } - } - - // Attach tool results to tool_use messages - const toolResultMap = new Map(); - for (const msg of normalized) { - if (msg.kind === 'tool_result' && msg.toolId) { - toolResultMap.set(msg.toolId, msg); - } - } - for (const msg of normalized) { - if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) { - const tr = toolResultMap.get(msg.toolId); - msg.toolResult = { content: tr.content, isError: tr.isError }; - } - } - - return { - messages: normalized, - total: normalized.length, - hasMore: false, - offset: 0, - limit: null, - }; - }, -}; diff --git a/server/providers/registry.js b/server/providers/registry.js deleted file mode 100644 index 236c909e..00000000 --- a/server/providers/registry.js +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Provider Registry - * - * Centralizes provider adapter lookup. All code that needs a provider adapter - * should go through this registry instead of importing individual adapters directly. - * - * @module providers/registry - */ - -import { claudeAdapter } from './claude/adapter.js'; -import { cursorAdapter } from './cursor/adapter.js'; -import { codexAdapter } from './codex/adapter.js'; -import { geminiAdapter } from './gemini/adapter.js'; - -/** - * @typedef {import('./types.js').ProviderAdapter} ProviderAdapter - * @typedef {import('./types.js').SessionProvider} SessionProvider - */ - -/** @type {Map} */ -const providers = new Map(); - -// Register built-in providers -providers.set('claude', claudeAdapter); -providers.set('cursor', cursorAdapter); -providers.set('codex', codexAdapter); -providers.set('gemini', geminiAdapter); - -/** - * Get a provider adapter by name. - * @param {string} name - Provider name (e.g., 'claude', 'cursor', 'codex', 'gemini') - * @returns {ProviderAdapter | undefined} - */ -export function getProvider(name) { - return providers.get(name); -} - -/** - * Get all registered provider names. - * @returns {string[]} - */ -export function getAllProviders() { - return Array.from(providers.keys()); -} diff --git a/server/providers/types.js b/server/providers/types.js deleted file mode 100644 index 5541525b..00000000 --- a/server/providers/types.js +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Provider Types & Interface - * - * Defines the normalized message format and the provider adapter interface. - * All providers normalize their native formats into NormalizedMessage - * before sending over REST or WebSocket. - * - * @module providers/types - */ - -// ─── Session Provider ──────────────────────────────────────────────────────── - -/** - * @typedef {'claude' | 'cursor' | 'codex' | 'gemini'} SessionProvider - */ - -// ─── Message Kind ──────────────────────────────────────────────────────────── - -/** - * @typedef {'text' | 'tool_use' | 'tool_result' | 'thinking' | 'stream_delta' | 'stream_end' - * | 'error' | 'complete' | 'status' | 'permission_request' | 'permission_cancelled' - * | 'session_created' | 'interactive_prompt' | 'task_notification'} MessageKind - */ - -// ─── NormalizedMessage ─────────────────────────────────────────────────────── - -/** - * @typedef {Object} NormalizedMessage - * @property {string} id - Unique message id (for dedup between server + realtime) - * @property {string} sessionId - * @property {string} timestamp - ISO 8601 - * @property {SessionProvider} provider - * @property {MessageKind} kind - * - * Additional fields depending on kind: - * - text: role ('user'|'assistant'), content, images? - * - tool_use: toolName, toolInput, toolId - * - tool_result: toolId, content, isError - * - thinking: content - * - stream_delta: content - * - stream_end: (no extra fields) - * - error: content - * - complete: (no extra fields) - * - status: text, tokens?, canInterrupt? - * - permission_request: requestId, toolName, input, context? - * - permission_cancelled: requestId - * - session_created: newSessionId - * - interactive_prompt: content - * - task_notification: status, summary - */ - -// ─── Fetch History ─────────────────────────────────────────────────────────── - -/** - * @typedef {Object} FetchHistoryOptions - * @property {string} [projectName] - Project name (required for Claude) - * @property {string} [projectPath] - Absolute project path (required for Cursor cwdId hash) - * @property {number|null} [limit] - Page size (null = all messages) - * @property {number} [offset] - Pagination offset (default: 0) - */ - -/** - * @typedef {Object} FetchHistoryResult - * @property {NormalizedMessage[]} messages - Normalized messages - * @property {number} total - Total number of messages in the session - * @property {boolean} hasMore - Whether more messages exist before the current page - * @property {number} offset - Current offset - * @property {number|null} limit - Page size used - * @property {object} [tokenUsage] - Token usage data (provider-specific) - */ - -// ─── Provider Adapter Interface ────────────────────────────────────────────── - -/** - * Every provider adapter MUST implement this interface. - * - * @typedef {Object} ProviderAdapter - * - * @property {(sessionId: string, opts?: FetchHistoryOptions) => Promise} fetchHistory - * Read persisted session messages from disk/database and return them as NormalizedMessage[]. - * The backend calls this from the unified GET /api/sessions/:id/messages endpoint. - * - * Provider implementations: - * - Claude: reads ~/.claude/projects/{projectName}/*.jsonl - * - Cursor: reads from SQLite store.db (via normalizeCursorBlobs helper) - * - Codex: reads ~/.codex/sessions/*.jsonl - * - Gemini: reads from in-memory sessionManager or ~/.gemini/tmp/ JSON files - * - * @property {(raw: any, sessionId: string) => NormalizedMessage[]} normalizeMessage - * Normalize a provider-specific event (JSONL entry or live SDK event) into NormalizedMessage[]. - * Used by provider files to convert both history and realtime events. - */ - -// ─── Runtime Helpers ───────────────────────────────────────────────────────── - -/** - * Generate a unique message ID. - * Uses crypto.randomUUID() to avoid collisions across server restarts and workers. - * @param {string} [prefix='msg'] - Optional prefix - * @returns {string} - */ -export function generateMessageId(prefix = 'msg') { - return `${prefix}_${crypto.randomUUID()}`; -} - -/** - * Create a NormalizedMessage with common fields pre-filled. - * @param {Partial & {kind: MessageKind, provider: SessionProvider}} fields - * @returns {NormalizedMessage} - */ -export function createNormalizedMessage(fields) { - return { - ...fields, - id: fields.id || generateMessageId(fields.kind), - sessionId: fields.sessionId || '', - timestamp: fields.timestamp || new Date().toISOString(), - provider: fields.provider, - }; -} diff --git a/server/providers/utils.js b/server/providers/utils.js deleted file mode 100644 index 1ec1382f..00000000 --- a/server/providers/utils.js +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Shared provider utilities. - * - * @module providers/utils - */ - -/** - * Prefixes that indicate internal/system content which should be hidden from the UI. - * @type {readonly string[]} - */ -export const INTERNAL_CONTENT_PREFIXES = Object.freeze([ - '', - '', - '', - '', - '', - 'Caveat:', - 'This session is being continued from a previous', - '[Request interrupted', -]); - -/** - * Check if user text content is internal/system that should be skipped. - * @param {string} content - * @returns {boolean} - */ -export function isInternalContent(content) { - return INTERNAL_CONTENT_PREFIXES.some(prefix => content.startsWith(prefix)); -} diff --git a/server/routes/messages.js b/server/routes/messages.js index 8eb14b37..b981d8ab 100644 --- a/server/routes/messages.js +++ b/server/routes/messages.js @@ -10,7 +10,7 @@ */ import express from 'express'; -import { getProvider, getAllProviders } from '../providers/registry.js'; +import { providersService } from '../modules/providers/services/providers.service.js'; const router = express.Router(); @@ -29,7 +29,7 @@ const router = express.Router(); router.get('/:sessionId/messages', async (req, res) => { try { const { sessionId } = req.params; - const provider = req.query.provider || 'claude'; + const provider = String(req.query.provider || 'claude').trim().toLowerCase(); const projectName = req.query.projectName || ''; const projectPath = req.query.projectPath || ''; const limitParam = req.query.limit; @@ -38,13 +38,13 @@ router.get('/:sessionId/messages', async (req, res) => { : null; const offset = parseInt(req.query.offset || '0', 10); - const adapter = getProvider(provider); - if (!adapter) { - const available = getAllProviders().join(', '); + const availableProviders = providersService.listProviderIds(); + if (!availableProviders.includes(provider)) { + const available = availableProviders.join(', '); return res.status(400).json({ error: `Unknown provider: ${provider}. Available: ${available}` }); } - const result = await adapter.fetchHistory(sessionId, { + const result = await providersService.fetchHistory(provider, sessionId, { projectName, projectPath, limit, diff --git a/server/shared/interfaces.ts b/server/shared/interfaces.ts index 697ac240..2bc3d7da 100644 --- a/server/shared/interfaces.ts +++ b/server/shared/interfaces.ts @@ -1,7 +1,10 @@ import type { + FetchHistoryOptions, + FetchHistoryResult, LLMProvider, McpScope, McpTransport, + NormalizedMessage, ProviderMcpServer, UpsertProviderMcpServerInput, } from '@/shared/types.js'; @@ -30,11 +33,16 @@ export interface IProviderMcpRuntime { }>; } - /** - * Provider contract that both SDK and CLI families implement. + * Main provider contract for CLI and SDK integrations. + * + * Each concrete provider owns its MCP runtime plus the provider-specific logic + * for converting native events/history into the app's normalized message shape. */ export interface IProvider { readonly id: LLMProvider; readonly mcp: IProviderMcpRuntime; -} \ No newline at end of file + + normalizeMessage(raw: unknown, sessionId: string | null): NormalizedMessage[]; + fetchHistory(sessionId: string, options?: FetchHistoryOptions): Promise; +} diff --git a/server/shared/types.ts b/server/shared/types.ts index 1a3859fe..2c24ece3 100644 --- a/server/shared/types.ts +++ b/server/shared/types.ts @@ -20,6 +20,93 @@ export type LLMProvider = 'claude' | 'codex' | 'gemini' | 'cursor'; // --------------------------------------------------------------------------------------------- +export type MessageKind = + | 'text' + | 'tool_use' + | 'tool_result' + | 'thinking' + | 'stream_delta' + | 'stream_end' + | 'error' + | 'complete' + | 'status' + | 'permission_request' + | 'permission_cancelled' + | 'session_created' + | 'interactive_prompt' + | 'task_notification'; + +/** + * Provider-neutral message event emitted over REST and realtime transports. + * + * Providers all produce their own native SDK/CLI event shapes, so this type keeps + * the common envelope strict while allowing provider-specific details to ride + * along as optional properties. + */ +export type NormalizedMessage = { + id: string; + sessionId: string; + timestamp: string; + provider: LLMProvider; + kind: MessageKind; + role?: 'user' | 'assistant'; + content?: string; + images?: unknown; + toolName?: string; + toolInput?: unknown; + toolId?: string; + toolResult?: { + content?: string; + isError?: boolean; + toolUseResult?: unknown; + }; + isError?: boolean; + text?: string; + tokens?: number; + canInterrupt?: boolean; + requestId?: string; + input?: unknown; + context?: unknown; + reason?: string; + newSessionId?: string; + status?: string; + summary?: string; + tokenBudget?: unknown; + subagentTools?: unknown; + toolUseResult?: unknown; + sequence?: number; + rowid?: number; + [key: string]: unknown; +}; + +/** + * Pagination and provider lookup options for reading persisted session history. + */ +export type FetchHistoryOptions = { + /** Claude project folder name. Required by Claude history lookup. */ + projectName?: string; + /** Absolute workspace path. Required by Cursor to compute its chat hash. */ + projectPath?: string; + /** Page size. `null` means all messages. */ + limit?: number | null; + /** Pagination offset from the newest messages. */ + offset?: number; +}; + +/** + * Provider-neutral history result returned by the unified messages endpoint. + */ +export type FetchHistoryResult = { + messages: NormalizedMessage[]; + total: number; + hasMore: boolean; + offset: number; + limit: number | null; + tokenUsage?: unknown; +}; + +// --------------------------------------------------------------------------------------------- + export type AppErrorOptions = { code?: string; statusCode?: number; diff --git a/server/shared/utils.ts b/server/shared/utils.ts index 3b6bd8dc..60eea254 100644 --- a/server/shared/utils.ts +++ b/server/shared/utils.ts @@ -1,10 +1,25 @@ +import { randomUUID } from 'node:crypto'; import { mkdir, readFile, writeFile } from 'node:fs/promises'; import path from 'node:path'; import type { NextFunction, Request, RequestHandler, Response } from 'express'; -import type { ApiErrorShape, ApiSuccessShape, AppErrorOptions } from '@/shared/types.js'; +import type { + ApiErrorShape, + ApiSuccessShape, + AppErrorOptions, + NormalizedMessage, +} from '@/shared/types.js'; + +type NormalizedMessageInput = + { + kind: NormalizedMessage['kind']; + provider: NormalizedMessage['provider']; + id?: string | null; + sessionId?: string | null; + timestamp?: string | null; + } & Record; export function createApiSuccessResponse( data: TData, @@ -55,6 +70,33 @@ export class AppError extends Error { // ------------------------------------------------------------------------------------------- +// ------------------------ Normalized provider message helpers ------------------------ +/** + * Generates a stable unique id for normalized provider messages. + */ +export function generateMessageId(prefix = 'msg'): string { + return `${prefix}_${randomUUID()}`; +} + +/** + * Creates a normalized provider message and fills the shared envelope fields. + * + * Provider adapters and live SDK handlers pass through provider-specific fields, + * while this helper guarantees every emitted event has an id, session id, + * timestamp, and provider marker. + */ +export function createNormalizedMessage(fields: NormalizedMessageInput): NormalizedMessage { + return { + ...fields, + id: fields.id || generateMessageId(fields.kind), + sessionId: fields.sessionId || '', + timestamp: fields.timestamp || new Date().toISOString(), + provider: fields.provider, + }; +} + +// ------------------------------------------------------------------------------------------- + // ------------------------ The following are mainly for provider MCP runtimes ------------------------ /** * Safely narrows an unknown value to a plain object record.