diff --git a/server/claude-sdk.js b/server/claude-sdk.js index 67ba103..1abf708 100644 --- a/server/claude-sdk.js +++ b/server/claude-sdk.js @@ -398,10 +398,10 @@ async function queryClaudeSDK(command, options = {}, ws) { // Send session-created event only once for new sessions if (!sessionId && !sessionCreatedSent) { sessionCreatedSent = true; - ws.send(JSON.stringify({ + ws.send({ type: 'session-created', sessionId: capturedSessionId - })); + }); } else { console.log('Not sending session-created. sessionId:', sessionId, 'sessionCreatedSent:', sessionCreatedSent); } @@ -411,20 +411,20 @@ async function queryClaudeSDK(command, options = {}, ws) { // Transform and send message to WebSocket const transformedMessage = transformMessage(message); - ws.send(JSON.stringify({ + ws.send({ type: 'claude-response', data: transformedMessage - })); + }); // Extract and send token budget updates from result messages if (message.type === 'result') { const tokenBudget = extractTokenBudget(message); if (tokenBudget) { console.log('Token budget from modelUsage:', tokenBudget); - ws.send(JSON.stringify({ + ws.send({ type: 'token-budget', data: tokenBudget - })); + }); } } } @@ -439,12 +439,12 @@ async function queryClaudeSDK(command, options = {}, ws) { // Send completion event console.log('Streaming complete, sending claude-complete event'); - ws.send(JSON.stringify({ + ws.send({ type: 'claude-complete', sessionId: capturedSessionId, exitCode: 0, isNewSession: !sessionId && !!command - })); + }); console.log('claude-complete event sent'); } catch (error) { @@ -459,10 +459,10 @@ async function queryClaudeSDK(command, options = {}, ws) { await cleanupTempFiles(tempImagePaths, tempDir); // Send error to WebSocket - ws.send(JSON.stringify({ + ws.send({ type: 'claude-error', error: error.message - })); + }); throw error; } diff --git a/server/cursor-cli.js b/server/cursor-cli.js index 30b892b..1e5c2e9 100644 --- a/server/cursor-cli.js +++ b/server/cursor-cli.js @@ -102,29 +102,29 @@ async function spawnCursor(command, options = {}, ws) { // Send session-created event only once for new sessions if (!sessionId && !sessionCreatedSent) { sessionCreatedSent = true; - ws.send(JSON.stringify({ + ws.send({ type: 'session-created', sessionId: capturedSessionId, model: response.model, cwd: response.cwd - })); + }); } } // Send system info to frontend - ws.send(JSON.stringify({ + ws.send({ type: 'cursor-system', data: response - })); + }); } break; case 'user': // Forward user message - ws.send(JSON.stringify({ + ws.send({ type: 'cursor-user', data: response - })); + }); break; case 'assistant': @@ -134,7 +134,7 @@ async function spawnCursor(command, options = {}, ws) { messageBuffer += textContent; // Send as Claude-compatible format for frontend - ws.send(JSON.stringify({ + ws.send({ type: 'claude-response', data: { type: 'content_block_delta', @@ -143,7 +143,7 @@ async function spawnCursor(command, options = {}, ws) { text: textContent } } - })); + }); } break; @@ -153,37 +153,37 @@ async function spawnCursor(command, options = {}, ws) { // Send final message if we have buffered content if (messageBuffer) { - ws.send(JSON.stringify({ + ws.send({ type: 'claude-response', data: { type: 'content_block_stop' } - })); + }); } // Send completion event - ws.send(JSON.stringify({ + ws.send({ type: 'cursor-result', sessionId: capturedSessionId || sessionId, data: response, success: response.subtype === 'success' - })); + }); break; default: // Forward any other message types - ws.send(JSON.stringify({ + ws.send({ type: 'cursor-response', data: response - })); + }); } } catch (parseError) { console.log('📄 Non-JSON response:', line); // If not JSON, send as raw text - ws.send(JSON.stringify({ + ws.send({ type: 'cursor-output', data: line - })); + }); } } }); @@ -191,10 +191,10 @@ async function spawnCursor(command, options = {}, ws) { // Handle stderr cursorProcess.stderr.on('data', (data) => { console.error('Cursor CLI stderr:', data.toString()); - ws.send(JSON.stringify({ + ws.send({ type: 'cursor-error', error: data.toString() - })); + }); }); // Handle process completion @@ -205,12 +205,12 @@ async function spawnCursor(command, options = {}, ws) { const finalSessionId = capturedSessionId || sessionId || processKey; activeCursorProcesses.delete(finalSessionId); - ws.send(JSON.stringify({ + ws.send({ type: 'claude-complete', sessionId: finalSessionId, exitCode: code, isNewSession: !sessionId && !!command // Flag to indicate this was a new session - })); + }); if (code === 0) { resolve(); @@ -226,12 +226,12 @@ async function spawnCursor(command, options = {}, ws) { // Clean up process reference on error const finalSessionId = capturedSessionId || sessionId || processKey; activeCursorProcesses.delete(finalSessionId); - - ws.send(JSON.stringify({ + + ws.send({ type: 'cursor-error', error: error.message - })); - + }); + reject(error); }); diff --git a/server/index.js b/server/index.js index b273037..240d456 100755 --- a/server/index.js +++ b/server/index.js @@ -717,6 +717,32 @@ wss.on('connection', (ws, request) => { } }); +/** + * WebSocket Writer - Wrapper for WebSocket to match SSEStreamWriter interface + */ +class WebSocketWriter { + constructor(ws) { + this.ws = ws; + this.sessionId = null; + this.isWebSocketWriter = true; // Marker for transport detection + } + + send(data) { + if (this.ws.readyState === 1) { // WebSocket.OPEN + // Providers send raw objects, we stringify for WebSocket + this.ws.send(JSON.stringify(data)); + } + } + + setSessionId(sessionId) { + this.sessionId = sessionId; + } + + getSessionId() { + return this.sessionId; + } +} + // Handle chat WebSocket connections function handleChatConnection(ws) { console.log('[INFO] Chat WebSocket connected'); @@ -724,6 +750,9 @@ function handleChatConnection(ws) { // Add to connected clients for project updates connectedClients.add(ws); + // Wrap WebSocket with writer for consistent interface with SSEStreamWriter + const writer = new WebSocketWriter(ws); + ws.on('message', async (message) => { try { const data = JSON.parse(message); @@ -734,19 +763,19 @@ function handleChatConnection(ws) { console.log('🔄 Session:', data.options?.sessionId ? 'Resume' : 'New'); // Use Claude Agents SDK - await queryClaudeSDK(data.command, data.options, ws); + await queryClaudeSDK(data.command, data.options, writer); } else if (data.type === 'cursor-command') { console.log('[DEBUG] Cursor message:', data.command || '[Continue/Resume]'); console.log('📁 Project:', data.options?.cwd || 'Unknown'); console.log('🔄 Session:', data.options?.sessionId ? 'Resume' : 'New'); console.log('🤖 Model:', data.options?.model || 'default'); - await spawnCursor(data.command, data.options, ws); + await spawnCursor(data.command, data.options, writer); } else if (data.type === 'codex-command') { console.log('[DEBUG] Codex message:', data.command || '[Continue/Resume]'); console.log('📁 Project:', data.options?.projectPath || data.options?.cwd || 'Unknown'); console.log('🔄 Session:', data.options?.sessionId ? 'Resume' : 'New'); console.log('🤖 Model:', data.options?.model || 'default'); - await queryCodex(data.command, data.options, ws); + await queryCodex(data.command, data.options, writer); } else if (data.type === 'cursor-resume') { // Backward compatibility: treat as cursor-command with resume and no prompt console.log('[DEBUG] Cursor resume session (compat):', data.sessionId); @@ -754,7 +783,7 @@ function handleChatConnection(ws) { sessionId: data.sessionId, resume: true, cwd: data.options?.cwd - }, ws); + }, writer); } else if (data.type === 'abort-session') { console.log('[DEBUG] Abort session request:', data.sessionId); const provider = data.provider || 'claude'; @@ -769,21 +798,21 @@ function handleChatConnection(ws) { success = await abortClaudeSDKSession(data.sessionId); } - ws.send(JSON.stringify({ + writer.send({ type: 'session-aborted', sessionId: data.sessionId, provider, success - })); + }); } else if (data.type === 'cursor-abort') { console.log('[DEBUG] Abort Cursor session:', data.sessionId); const success = abortCursorSession(data.sessionId); - ws.send(JSON.stringify({ + writer.send({ type: 'session-aborted', sessionId: data.sessionId, provider: 'cursor', success - })); + }); } else if (data.type === 'check-session-status') { // Check if a specific session is currently processing const provider = data.provider || 'claude'; @@ -799,12 +828,12 @@ function handleChatConnection(ws) { isActive = isClaudeSDKSessionActive(sessionId); } - ws.send(JSON.stringify({ + writer.send({ type: 'session-status', sessionId, provider, isProcessing: isActive - })); + }); } else if (data.type === 'get-active-sessions') { // Get all currently active sessions const activeSessions = { @@ -812,17 +841,17 @@ function handleChatConnection(ws) { cursor: getActiveCursorSessions(), codex: getActiveCodexSessions() }; - ws.send(JSON.stringify({ + writer.send({ type: 'active-sessions', sessions: activeSessions - })); + }); } } catch (error) { console.error('[ERROR] Chat WebSocket error:', error.message); - ws.send(JSON.stringify({ + writer.send({ type: 'error', error: error.message - })); + }); } }); diff --git a/server/openai-codex.js b/server/openai-codex.js index f4f00ef..ddc023f 100644 --- a/server/openai-codex.js +++ b/server/openai-codex.js @@ -360,12 +360,12 @@ export function getActiveCodexSessions() { */ function sendMessage(ws, data) { try { - if (typeof ws.send === 'function') { - // WebSocket + if (ws.isSSEStreamWriter || ws.isWebSocketWriter) { + // Writer handles stringification (SSEStreamWriter or WebSocketWriter) + ws.send(data); + } else if (typeof ws.send === 'function') { + // Raw WebSocket - stringify here ws.send(JSON.stringify(data)); - } else if (typeof ws.write === 'function') { - // SSE writer (for agent API) - ws.write(`data: ${JSON.stringify(data)}\n\n`); } } catch (error) { console.error('[Codex] Error sending message:', error); diff --git a/server/routes/agent.js b/server/routes/agent.js index 04e6339..f633034 100644 --- a/server/routes/agent.js +++ b/server/routes/agent.js @@ -451,6 +451,7 @@ class SSEStreamWriter { constructor(res) { this.res = res; this.sessionId = null; + this.isSSEStreamWriter = true; // Marker for transport detection } send(data) { @@ -458,7 +459,7 @@ class SSEStreamWriter { return; } - // Format as SSE + // Format as SSE - providers send raw objects, we stringify this.res.write(`data: ${JSON.stringify(data)}\n\n`); }