mirror of
https://github.com/siteboon/claudecodeui.git
synced 2026-05-11 23:34:42 +00:00
Fix/websocket streaming issues (#748)
This commit is contained in:
@@ -143,7 +143,7 @@ function transformCodexEvent(event) {
|
||||
case 'thread.started':
|
||||
return {
|
||||
type: 'thread_started',
|
||||
threadId: event.id
|
||||
threadId: event.thread_id || event.id
|
||||
};
|
||||
|
||||
case 'error':
|
||||
@@ -207,7 +207,8 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
|
||||
let codex;
|
||||
let thread;
|
||||
let currentSessionId = sessionId;
|
||||
let capturedSessionId = sessionId;
|
||||
let sessionCreatedSent = false;
|
||||
let terminalFailure = null;
|
||||
const abortController = new AbortController();
|
||||
|
||||
@@ -231,20 +232,23 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
thread = codex.startThread(threadOptions);
|
||||
}
|
||||
|
||||
// Get the thread ID
|
||||
currentSessionId = thread.id || sessionId || `codex-${Date.now()}`;
|
||||
const registerSession = (id) => {
|
||||
if (!id) {
|
||||
return;
|
||||
}
|
||||
activeCodexSessions.set(id, {
|
||||
thread,
|
||||
codex,
|
||||
status: 'running',
|
||||
abortController,
|
||||
startedAt: new Date().toISOString()
|
||||
});
|
||||
};
|
||||
|
||||
// Track the session
|
||||
activeCodexSessions.set(currentSessionId, {
|
||||
thread,
|
||||
codex,
|
||||
status: 'running',
|
||||
abortController,
|
||||
startedAt: new Date().toISOString()
|
||||
});
|
||||
|
||||
// Send session created event
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'session_created', newSessionId: currentSessionId, sessionId: currentSessionId, provider: 'codex' }));
|
||||
// Existing sessions can be tracked immediately; new sessions are tracked after thread.started.
|
||||
if (capturedSessionId) {
|
||||
registerSession(capturedSessionId);
|
||||
}
|
||||
|
||||
// Execute with streaming
|
||||
const streamedTurn = await thread.runStreamed(command, {
|
||||
@@ -252,11 +256,34 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
});
|
||||
|
||||
for await (const event of streamedTurn.events) {
|
||||
// Capture thread/session id lazily from the stream (Codex emits this asynchronously).
|
||||
if (event.type === 'thread.started') {
|
||||
const discoveredSessionId = event.thread_id || event.id || null;
|
||||
if (discoveredSessionId && !capturedSessionId) {
|
||||
capturedSessionId = discoveredSessionId;
|
||||
registerSession(capturedSessionId);
|
||||
|
||||
if (ws.setSessionId && typeof ws.setSessionId === 'function') {
|
||||
ws.setSessionId(capturedSessionId);
|
||||
}
|
||||
|
||||
if (!sessionId && !sessionCreatedSent) {
|
||||
sessionCreatedSent = true;
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'codex' }));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if session was aborted
|
||||
const session = activeCodexSessions.get(currentSessionId);
|
||||
if (!session || session.status === 'aborted') {
|
||||
if (abortController.signal.aborted) {
|
||||
break;
|
||||
}
|
||||
if (capturedSessionId) {
|
||||
const session = activeCodexSessions.get(capturedSessionId);
|
||||
if (session?.status === 'aborted') {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (event.type === 'item.started' || event.type === 'item.updated') {
|
||||
continue;
|
||||
@@ -265,7 +292,7 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
const transformed = transformCodexEvent(event);
|
||||
|
||||
// Normalize the transformed event into NormalizedMessage(s) via adapter
|
||||
const normalizedMsgs = sessionsService.normalizeMessage('codex', transformed, currentSessionId);
|
||||
const normalizedMsgs = sessionsService.normalizeMessage('codex', transformed, capturedSessionId || sessionId || null);
|
||||
for (const msg of normalizedMsgs) {
|
||||
sendMessage(ws, msg);
|
||||
}
|
||||
@@ -275,7 +302,7 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
notifyRunFailed({
|
||||
userId: ws?.userId || null,
|
||||
provider: 'codex',
|
||||
sessionId: currentSessionId,
|
||||
sessionId: capturedSessionId || sessionId || null,
|
||||
sessionName: sessionSummary,
|
||||
error: terminalFailure
|
||||
});
|
||||
@@ -284,24 +311,29 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
// Extract and send token usage if available (normalized to match Claude format)
|
||||
if (event.type === 'turn.completed' && event.usage) {
|
||||
const totalTokens = (event.usage.input_tokens || 0) + (event.usage.output_tokens || 0);
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: { used: totalTokens, total: 200000 }, sessionId: currentSessionId, provider: 'codex' }));
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: { used: totalTokens, total: 200000 }, sessionId: capturedSessionId || sessionId || null, provider: 'codex' }));
|
||||
}
|
||||
}
|
||||
|
||||
// Send completion event
|
||||
if (!terminalFailure) {
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'complete', actualSessionId: thread.id, sessionId: currentSessionId, provider: 'codex' }));
|
||||
sendMessage(ws, createNormalizedMessage({
|
||||
kind: 'complete',
|
||||
actualSessionId: capturedSessionId || thread.id || sessionId || null,
|
||||
sessionId: capturedSessionId || sessionId || null,
|
||||
provider: 'codex'
|
||||
}));
|
||||
notifyRunStopped({
|
||||
userId: ws?.userId || null,
|
||||
provider: 'codex',
|
||||
sessionId: currentSessionId,
|
||||
sessionId: capturedSessionId || sessionId || null,
|
||||
sessionName: sessionSummary,
|
||||
stopReason: 'completed'
|
||||
});
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
const session = currentSessionId ? activeCodexSessions.get(currentSessionId) : null;
|
||||
const session = capturedSessionId ? activeCodexSessions.get(capturedSessionId) : null;
|
||||
const wasAborted =
|
||||
session?.status === 'aborted' ||
|
||||
error?.name === 'AbortError' ||
|
||||
@@ -316,12 +348,12 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
? 'Codex CLI is not configured. Please set up authentication first.'
|
||||
: error.message;
|
||||
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: currentSessionId, provider: 'codex' }));
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'codex' }));
|
||||
if (!terminalFailure) {
|
||||
notifyRunFailed({
|
||||
userId: ws?.userId || null,
|
||||
provider: 'codex',
|
||||
sessionId: currentSessionId,
|
||||
sessionId: capturedSessionId || sessionId || null,
|
||||
sessionName: sessionSummary,
|
||||
error
|
||||
});
|
||||
@@ -330,8 +362,8 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
|
||||
} finally {
|
||||
// Update session status
|
||||
if (currentSessionId) {
|
||||
const session = activeCodexSessions.get(currentSessionId);
|
||||
if (capturedSessionId) {
|
||||
const session = activeCodexSessions.get(capturedSessionId);
|
||||
if (session) {
|
||||
session.status = session.status === 'aborted' ? 'aborted' : 'completed';
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user