diff --git a/server/openai-codex.js b/server/openai-codex.js index 5a7a9007..825ee179 100644 --- a/server/openai-codex.js +++ b/server/openai-codex.js @@ -143,7 +143,7 @@ function transformCodexEvent(event) { case 'thread.started': return { type: 'thread_started', - threadId: event.id + threadId: event.thread_id || event.id }; case 'error': @@ -207,7 +207,8 @@ export async function queryCodex(command, options = {}, ws) { let codex; let thread; - let currentSessionId = sessionId; + let capturedSessionId = sessionId; + let sessionCreatedSent = false; let terminalFailure = null; const abortController = new AbortController(); @@ -231,20 +232,27 @@ export async function queryCodex(command, options = {}, ws) { thread = codex.startThread(threadOptions); } - // Get the thread ID - currentSessionId = thread.id || sessionId || `codex-${Date.now()}`; + console.log(`[Codex] Started thread, sandboxMode: ${sandboxMode}, approvalPolicy: ${approvalPolicy}`); - // Track the session - activeCodexSessions.set(currentSessionId, { - thread, - codex, - status: 'running', - abortController, - startedAt: new Date().toISOString() - }); + const registerSession = (id) => { + if (!id) { + return; + } + if (!activeCodexSessions.has(id)) { + activeCodexSessions.set(id, { + thread, + codex, + status: 'running', + abortController, + startedAt: new Date().toISOString() + }); + } + }; - // Send session created event - sendMessage(ws, createNormalizedMessage({ kind: 'session_created', newSessionId: currentSessionId, sessionId: currentSessionId, provider: 'codex' })); + // Existing sessions can be tracked immediately; new sessions are tracked after thread.started. + if (capturedSessionId) { + registerSession(capturedSessionId); + } // Execute with streaming const streamedTurn = await thread.runStreamed(command, { @@ -252,11 +260,34 @@ export async function queryCodex(command, options = {}, ws) { }); for await (const event of streamedTurn.events) { + // Capture thread/session id lazily from the stream (Codex emits this asynchronously). + if (event.type === 'thread.started') { + const discoveredSessionId = event.thread_id || event.id || null; + if (discoveredSessionId && !capturedSessionId) { + capturedSessionId = discoveredSessionId; + registerSession(capturedSessionId); + + if (ws.setSessionId && typeof ws.setSessionId === 'function') { + ws.setSessionId(capturedSessionId); + } + + if (!sessionId && !sessionCreatedSent) { + sessionCreatedSent = true; + sendMessage(ws, createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'codex' })); + } + } + } + // Check if session was aborted - const session = activeCodexSessions.get(currentSessionId); - if (!session || session.status === 'aborted') { + if (abortController.signal.aborted) { break; } + if (capturedSessionId) { + const session = activeCodexSessions.get(capturedSessionId); + if (session?.status === 'aborted') { + break; + } + } if (event.type === 'item.started' || event.type === 'item.updated') { continue; @@ -265,7 +296,7 @@ export async function queryCodex(command, options = {}, ws) { const transformed = transformCodexEvent(event); // Normalize the transformed event into NormalizedMessage(s) via adapter - const normalizedMsgs = sessionsService.normalizeMessage('codex', transformed, currentSessionId); + const normalizedMsgs = sessionsService.normalizeMessage('codex', transformed, capturedSessionId || sessionId || null); for (const msg of normalizedMsgs) { sendMessage(ws, msg); } @@ -275,7 +306,7 @@ export async function queryCodex(command, options = {}, ws) { notifyRunFailed({ userId: ws?.userId || null, provider: 'codex', - sessionId: currentSessionId, + sessionId: capturedSessionId || sessionId || null, sessionName: sessionSummary, error: terminalFailure }); @@ -284,24 +315,29 @@ export async function queryCodex(command, options = {}, ws) { // Extract and send token usage if available (normalized to match Claude format) if (event.type === 'turn.completed' && event.usage) { const totalTokens = (event.usage.input_tokens || 0) + (event.usage.output_tokens || 0); - sendMessage(ws, createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: { used: totalTokens, total: 200000 }, sessionId: currentSessionId, provider: 'codex' })); + sendMessage(ws, createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: { used: totalTokens, total: 200000 }, sessionId: capturedSessionId || sessionId || null, provider: 'codex' })); } } // Send completion event if (!terminalFailure) { - sendMessage(ws, createNormalizedMessage({ kind: 'complete', actualSessionId: thread.id, sessionId: currentSessionId, provider: 'codex' })); + sendMessage(ws, createNormalizedMessage({ + kind: 'complete', + actualSessionId: capturedSessionId || thread.id || sessionId || null, + sessionId: capturedSessionId || sessionId || null, + provider: 'codex' + })); notifyRunStopped({ userId: ws?.userId || null, provider: 'codex', - sessionId: currentSessionId, + sessionId: capturedSessionId || sessionId || null, sessionName: sessionSummary, stopReason: 'completed' }); } } catch (error) { - const session = currentSessionId ? activeCodexSessions.get(currentSessionId) : null; + const session = capturedSessionId ? activeCodexSessions.get(capturedSessionId) : null; const wasAborted = session?.status === 'aborted' || error?.name === 'AbortError' || @@ -316,12 +352,12 @@ export async function queryCodex(command, options = {}, ws) { ? 'Codex CLI is not configured. Please set up authentication first.' : error.message; - sendMessage(ws, createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: currentSessionId, provider: 'codex' })); + sendMessage(ws, createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'codex' })); if (!terminalFailure) { notifyRunFailed({ userId: ws?.userId || null, provider: 'codex', - sessionId: currentSessionId, + sessionId: capturedSessionId || sessionId || null, sessionName: sessionSummary, error }); @@ -330,8 +366,8 @@ export async function queryCodex(command, options = {}, ws) { } finally { // Update session status - if (currentSessionId) { - const session = activeCodexSessions.get(currentSessionId); + if (capturedSessionId) { + const session = activeCodexSessions.get(capturedSessionId); if (session) { session.status = session.status === 'aborted' ? 'aborted' : 'completed'; }