mirror of
https://github.com/siteboon/claudecodeui.git
synced 2026-05-09 22:18:19 +00:00
fix(codex): align session id lifecycle with async thread.start events
Codex session initialization assumed thread.id was immediately available at thread creation time. In practice, for new sessions the SDK discovers/emits the real thread id asynchronously via stream events (thread.started), matching Claude’s behavior. This early assumption caused the backend to create and publish session state before the canonical id existed. Why this matters: - Session identity is the join key for live streaming, abort routing, notifications, and history sync. - Emitting session_created with a guessed/fallback id risks mismatched client state and stale references. - Registering active sessions too early can make abort/lookup logic depend on non-canonical ids. - Divergence from Claude’s flow created provider inconsistency and made cross-provider behavior harder to reason about. What changed: - Stop treating thread.id as authoritative at startup for new Codex sessions. - Capture the real session id from thread.started (prefer event.thread_id, fallback event.id). - Emit session_created only after the canonical id is discovered (new sessions only). - Track active session state immediately only for resumed sessions (known sessionId), otherwise defer registration until id discovery. - Thread the captured id through normalization, token budget updates, completion payloads, and failure notifications. - Preserve writer/session wiring by setting ws.setSessionId once the id is known. Result: Codex now follows the same async session-id contract as Claude, removing a race around initial session identity and making session lifecycle handling deterministic across providers.
This commit is contained in:
@@ -143,7 +143,7 @@ function transformCodexEvent(event) {
|
||||
case 'thread.started':
|
||||
return {
|
||||
type: 'thread_started',
|
||||
threadId: event.id
|
||||
threadId: event.thread_id || event.id
|
||||
};
|
||||
|
||||
case 'error':
|
||||
@@ -207,7 +207,8 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
|
||||
let codex;
|
||||
let thread;
|
||||
let currentSessionId = sessionId;
|
||||
let capturedSessionId = sessionId;
|
||||
let sessionCreatedSent = false;
|
||||
let terminalFailure = null;
|
||||
const abortController = new AbortController();
|
||||
|
||||
@@ -231,20 +232,27 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
thread = codex.startThread(threadOptions);
|
||||
}
|
||||
|
||||
// Get the thread ID
|
||||
currentSessionId = thread.id || sessionId || `codex-${Date.now()}`;
|
||||
console.log(`[Codex] Started thread, sandboxMode: ${sandboxMode}, approvalPolicy: ${approvalPolicy}`);
|
||||
|
||||
// Track the session
|
||||
activeCodexSessions.set(currentSessionId, {
|
||||
thread,
|
||||
codex,
|
||||
status: 'running',
|
||||
abortController,
|
||||
startedAt: new Date().toISOString()
|
||||
});
|
||||
const registerSession = (id) => {
|
||||
if (!id) {
|
||||
return;
|
||||
}
|
||||
if (!activeCodexSessions.has(id)) {
|
||||
activeCodexSessions.set(id, {
|
||||
thread,
|
||||
codex,
|
||||
status: 'running',
|
||||
abortController,
|
||||
startedAt: new Date().toISOString()
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Send session created event
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'session_created', newSessionId: currentSessionId, sessionId: currentSessionId, provider: 'codex' }));
|
||||
// Existing sessions can be tracked immediately; new sessions are tracked after thread.started.
|
||||
if (capturedSessionId) {
|
||||
registerSession(capturedSessionId);
|
||||
}
|
||||
|
||||
// Execute with streaming
|
||||
const streamedTurn = await thread.runStreamed(command, {
|
||||
@@ -252,11 +260,34 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
});
|
||||
|
||||
for await (const event of streamedTurn.events) {
|
||||
// Capture thread/session id lazily from the stream (Codex emits this asynchronously).
|
||||
if (event.type === 'thread.started') {
|
||||
const discoveredSessionId = event.thread_id || event.id || null;
|
||||
if (discoveredSessionId && !capturedSessionId) {
|
||||
capturedSessionId = discoveredSessionId;
|
||||
registerSession(capturedSessionId);
|
||||
|
||||
if (ws.setSessionId && typeof ws.setSessionId === 'function') {
|
||||
ws.setSessionId(capturedSessionId);
|
||||
}
|
||||
|
||||
if (!sessionId && !sessionCreatedSent) {
|
||||
sessionCreatedSent = true;
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'codex' }));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if session was aborted
|
||||
const session = activeCodexSessions.get(currentSessionId);
|
||||
if (!session || session.status === 'aborted') {
|
||||
if (abortController.signal.aborted) {
|
||||
break;
|
||||
}
|
||||
if (capturedSessionId) {
|
||||
const session = activeCodexSessions.get(capturedSessionId);
|
||||
if (session?.status === 'aborted') {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (event.type === 'item.started' || event.type === 'item.updated') {
|
||||
continue;
|
||||
@@ -265,7 +296,7 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
const transformed = transformCodexEvent(event);
|
||||
|
||||
// Normalize the transformed event into NormalizedMessage(s) via adapter
|
||||
const normalizedMsgs = sessionsService.normalizeMessage('codex', transformed, currentSessionId);
|
||||
const normalizedMsgs = sessionsService.normalizeMessage('codex', transformed, capturedSessionId || sessionId || null);
|
||||
for (const msg of normalizedMsgs) {
|
||||
sendMessage(ws, msg);
|
||||
}
|
||||
@@ -275,7 +306,7 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
notifyRunFailed({
|
||||
userId: ws?.userId || null,
|
||||
provider: 'codex',
|
||||
sessionId: currentSessionId,
|
||||
sessionId: capturedSessionId || sessionId || null,
|
||||
sessionName: sessionSummary,
|
||||
error: terminalFailure
|
||||
});
|
||||
@@ -284,24 +315,29 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
// Extract and send token usage if available (normalized to match Claude format)
|
||||
if (event.type === 'turn.completed' && event.usage) {
|
||||
const totalTokens = (event.usage.input_tokens || 0) + (event.usage.output_tokens || 0);
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: { used: totalTokens, total: 200000 }, sessionId: currentSessionId, provider: 'codex' }));
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: { used: totalTokens, total: 200000 }, sessionId: capturedSessionId || sessionId || null, provider: 'codex' }));
|
||||
}
|
||||
}
|
||||
|
||||
// Send completion event
|
||||
if (!terminalFailure) {
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'complete', actualSessionId: thread.id, sessionId: currentSessionId, provider: 'codex' }));
|
||||
sendMessage(ws, createNormalizedMessage({
|
||||
kind: 'complete',
|
||||
actualSessionId: capturedSessionId || thread.id || sessionId || null,
|
||||
sessionId: capturedSessionId || sessionId || null,
|
||||
provider: 'codex'
|
||||
}));
|
||||
notifyRunStopped({
|
||||
userId: ws?.userId || null,
|
||||
provider: 'codex',
|
||||
sessionId: currentSessionId,
|
||||
sessionId: capturedSessionId || sessionId || null,
|
||||
sessionName: sessionSummary,
|
||||
stopReason: 'completed'
|
||||
});
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
const session = currentSessionId ? activeCodexSessions.get(currentSessionId) : null;
|
||||
const session = capturedSessionId ? activeCodexSessions.get(capturedSessionId) : null;
|
||||
const wasAborted =
|
||||
session?.status === 'aborted' ||
|
||||
error?.name === 'AbortError' ||
|
||||
@@ -316,12 +352,12 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
? 'Codex CLI is not configured. Please set up authentication first.'
|
||||
: error.message;
|
||||
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: currentSessionId, provider: 'codex' }));
|
||||
sendMessage(ws, createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'codex' }));
|
||||
if (!terminalFailure) {
|
||||
notifyRunFailed({
|
||||
userId: ws?.userId || null,
|
||||
provider: 'codex',
|
||||
sessionId: currentSessionId,
|
||||
sessionId: capturedSessionId || sessionId || null,
|
||||
sessionName: sessionSummary,
|
||||
error
|
||||
});
|
||||
@@ -330,8 +366,8 @@ export async function queryCodex(command, options = {}, ws) {
|
||||
|
||||
} finally {
|
||||
// Update session status
|
||||
if (currentSessionId) {
|
||||
const session = activeCodexSessions.get(currentSessionId);
|
||||
if (capturedSessionId) {
|
||||
const session = activeCodexSessions.get(capturedSessionId);
|
||||
if (session) {
|
||||
session.status = session.status === 'aborted' ? 'aborted' : 'completed';
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user