mirror of
https://github.com/siteboon/claudecodeui.git
synced 2026-06-11 16:23:03 +08:00
Replace the chat processing banner with a minimal activity indicator and
rebuild the state model underneath it. The old banner was driven by five
overlapping pieces of state (isLoading, canAbortSession, claudeStatus in the
chat, plus two app-level Sets updated in lockstep through four callbacks)
that had to be kept in sync imperatively. Because completion and status
events mutated the *viewed* session's flags regardless of which session they
belonged to, a background session finishing could hide the indicator for a
still-running session, returning to a finished session could briefly show a
stale banner, and a late status reply could override a newer request.
The fix is structural rather than patch-by-patch: a single
Map<sessionId, {statusText, canInterrupt, startedAt}> in useSessionProtection
is now the only source of truth for "this session is working". The indicator,
stop button, composer streaming state, and session protection are all derived
from the viewed session's entry on render, so there is no stale local copy to
restore or reset when switching sessions. A PENDING_SESSION_ID sentinel
covers the window before a new conversation receives its real session id.
Terminal events delete the entry atomically, which is why the indicator
disappears the instant the final chunk arrives. Stale check-session-status
replies are discarded via an ifStartedBefore guard (an idle reply older than
the entry's startedAt describes a previous request, not the current one).
The second half unifies the provider lifecycle contract, because the frontend
could not be made race-free while each provider terminated differently:
- cursor emitted complete twice per run (result line + process close), which
double-played the completion sound and let a late close-complete clear a
newer request's indicator
- aborts produced two completes (the abort-session reply plus the provider's
own non-aborted one), so cancelling a run played the celebration sound
- codex omitted exitCode; others attached ad-hoc fields (resultText, isError,
isNewSession) the client had to know about
- claude/codex failures ended with only an error event while gemini/cursor
also emit kind:'error' for mid-run stderr noise, so 'error' was ambiguous
between "the run died" and "a process wrote to stderr"
Every run now ends with exactly one complete built by createCompleteMessage()
({sessionId, actualSessionId, exitCode, success, aborted}); abort-session
sends it on behalf of cancelled runs and providers detect the abort and skip
their own. error is demoted to an informational row, so stderr noise no
longer kills the indicator mid-run, and the client celebrates only
success: true completes.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
506 lines
14 KiB
JavaScript
506 lines
14 KiB
JavaScript
/**
|
|
* OpenAI Codex SDK Integration
|
|
* =============================
|
|
*
|
|
* This module provides integration with the OpenAI Codex SDK for non-interactive
|
|
* chat sessions. It mirrors the pattern used in claude-sdk.js for consistency.
|
|
*
|
|
* ## Usage
|
|
*
|
|
* - queryCodex(command, options, ws) - Execute a prompt with streaming via WebSocket
|
|
* - abortCodexSession(sessionId) - Cancel an active session
|
|
* - isCodexSessionActive(sessionId) - Check if a session is running
|
|
* - getActiveCodexSessions() - List all active sessions
|
|
*/
|
|
|
|
import { Codex } from '@openai/codex-sdk';
|
|
import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js';
|
|
import { sessionsService } from './modules/providers/services/sessions.service.js';
|
|
import { providerAuthService } from './modules/providers/services/provider-auth.service.js';
|
|
import { providerModelsService } from './modules/providers/services/provider-models.service.js';
|
|
import { createCompleteMessage, createNormalizedMessage } from './shared/utils.js';
|
|
|
|
// Track active sessions
|
|
const activeCodexSessions = new Map();
|
|
|
|
function readUsageNumber(value) {
|
|
const parsed = Number(value);
|
|
return Number.isFinite(parsed) ? parsed : 0;
|
|
}
|
|
|
|
function extractCodexTokenBudget(event) {
|
|
const info = event?.info || event?.payload?.info || event?.usage?.info;
|
|
const usage = info?.total_token_usage || event?.usage?.total_token_usage || event?.usage;
|
|
if (!usage || typeof usage !== 'object') {
|
|
return null;
|
|
}
|
|
|
|
const inputTokens = readUsageNumber(usage.input_tokens);
|
|
const outputTokens = readUsageNumber(usage.output_tokens);
|
|
const used = readUsageNumber(usage.total_tokens) || inputTokens + outputTokens;
|
|
|
|
return {
|
|
used,
|
|
total: readUsageNumber(info?.model_context_window || event?.usage?.model_context_window) || 200000,
|
|
inputTokens,
|
|
outputTokens,
|
|
breakdown: {
|
|
input: inputTokens,
|
|
output: outputTokens,
|
|
},
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Transform Codex SDK event to WebSocket message format
|
|
* @param {object} event - SDK event
|
|
* @returns {object} - Transformed event for WebSocket
|
|
*/
|
|
function transformCodexEvent(event) {
|
|
// Map SDK event types to a consistent format
|
|
switch (event.type) {
|
|
case 'item.started':
|
|
case 'item.updated':
|
|
case 'item.completed':
|
|
const item = event.item;
|
|
if (!item) {
|
|
return { type: event.type, item: null };
|
|
}
|
|
|
|
// Transform based on item type
|
|
switch (item.type) {
|
|
case 'agent_message':
|
|
return {
|
|
type: 'item',
|
|
itemType: 'agent_message',
|
|
message: {
|
|
role: 'assistant',
|
|
content: item.text
|
|
}
|
|
};
|
|
|
|
case 'reasoning':
|
|
return {
|
|
type: 'item',
|
|
itemType: 'reasoning',
|
|
message: {
|
|
role: 'assistant',
|
|
content: item.text,
|
|
isReasoning: true
|
|
}
|
|
};
|
|
|
|
case 'command_execution':
|
|
return {
|
|
type: 'item',
|
|
itemType: 'command_execution',
|
|
command: item.command,
|
|
output: item.aggregated_output,
|
|
exitCode: item.exit_code,
|
|
status: item.status
|
|
};
|
|
|
|
case 'file_change':
|
|
return {
|
|
type: 'item',
|
|
itemType: 'file_change',
|
|
changes: item.changes,
|
|
status: item.status
|
|
};
|
|
|
|
case 'mcp_tool_call':
|
|
return {
|
|
type: 'item',
|
|
itemType: 'mcp_tool_call',
|
|
server: item.server,
|
|
tool: item.tool,
|
|
arguments: item.arguments,
|
|
result: item.result,
|
|
error: item.error,
|
|
status: item.status
|
|
};
|
|
|
|
case 'web_search':
|
|
return {
|
|
type: 'item',
|
|
itemType: 'web_search',
|
|
query: item.query
|
|
};
|
|
|
|
case 'todo_list':
|
|
return {
|
|
type: 'item',
|
|
itemType: 'todo_list',
|
|
items: item.items
|
|
};
|
|
|
|
case 'error':
|
|
return {
|
|
type: 'item',
|
|
itemType: 'error',
|
|
message: {
|
|
role: 'error',
|
|
content: item.message
|
|
}
|
|
};
|
|
|
|
default:
|
|
return {
|
|
type: 'item',
|
|
itemType: item.type,
|
|
item: item
|
|
};
|
|
}
|
|
|
|
case 'turn.started':
|
|
return {
|
|
type: 'turn_started'
|
|
};
|
|
|
|
case 'turn.completed':
|
|
return {
|
|
type: 'turn_complete',
|
|
usage: event.usage
|
|
};
|
|
|
|
case 'turn.failed':
|
|
return {
|
|
type: 'turn_failed',
|
|
error: event.error
|
|
};
|
|
|
|
case 'thread.started':
|
|
return {
|
|
type: 'thread_started',
|
|
threadId: event.thread_id || event.id
|
|
};
|
|
|
|
case 'error':
|
|
return {
|
|
type: 'error',
|
|
message: event.message
|
|
};
|
|
|
|
default:
|
|
return {
|
|
type: event.type,
|
|
data: event
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Map permission mode to Codex SDK options
|
|
* @param {string} permissionMode - 'default', 'acceptEdits', or 'bypassPermissions'
|
|
* @returns {object} - { sandboxMode, approvalPolicy }
|
|
*/
|
|
function mapPermissionModeToCodexOptions(permissionMode) {
|
|
switch (permissionMode) {
|
|
case 'acceptEdits':
|
|
return {
|
|
sandboxMode: 'workspace-write',
|
|
approvalPolicy: 'never'
|
|
};
|
|
case 'bypassPermissions':
|
|
return {
|
|
sandboxMode: 'danger-full-access',
|
|
approvalPolicy: 'never'
|
|
};
|
|
case 'default':
|
|
default:
|
|
return {
|
|
sandboxMode: 'workspace-write',
|
|
approvalPolicy: 'untrusted'
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute a Codex query with streaming
|
|
* @param {string} command - The prompt to send
|
|
* @param {object} options - Options including cwd, sessionId, model, permissionMode
|
|
* @param {WebSocket|object} ws - WebSocket connection or response writer
|
|
*/
|
|
export async function queryCodex(command, options = {}, ws) {
|
|
const {
|
|
sessionId,
|
|
sessionSummary,
|
|
cwd,
|
|
projectPath,
|
|
model,
|
|
permissionMode = 'default'
|
|
} = options;
|
|
|
|
const resolvedModel = await providerModelsService.resolveResumeModel(
|
|
'codex',
|
|
sessionId,
|
|
model,
|
|
);
|
|
|
|
const workingDirectory = cwd || projectPath || process.cwd();
|
|
const { sandboxMode, approvalPolicy } = mapPermissionModeToCodexOptions(permissionMode);
|
|
|
|
let codex;
|
|
let thread;
|
|
let capturedSessionId = sessionId;
|
|
let sessionCreatedSent = false;
|
|
let terminalFailure = null;
|
|
const abortController = new AbortController();
|
|
|
|
try {
|
|
// Initialize Codex SDK
|
|
codex = new Codex();
|
|
|
|
// Thread options with sandbox and approval settings
|
|
const threadOptions = {
|
|
workingDirectory,
|
|
skipGitRepoCheck: true,
|
|
sandboxMode,
|
|
approvalPolicy,
|
|
model: resolvedModel
|
|
};
|
|
|
|
// Start or resume thread
|
|
if (sessionId) {
|
|
thread = codex.resumeThread(sessionId, threadOptions);
|
|
} else {
|
|
thread = codex.startThread(threadOptions);
|
|
}
|
|
|
|
const registerSession = (id) => {
|
|
if (!id) {
|
|
return;
|
|
}
|
|
activeCodexSessions.set(id, {
|
|
thread,
|
|
codex,
|
|
status: 'running',
|
|
abortController,
|
|
startedAt: new Date().toISOString()
|
|
});
|
|
};
|
|
|
|
// Existing sessions can be tracked immediately; new sessions are tracked after thread.started.
|
|
if (capturedSessionId) {
|
|
registerSession(capturedSessionId);
|
|
}
|
|
|
|
// Execute with streaming
|
|
const streamedTurn = await thread.runStreamed(command, {
|
|
signal: abortController.signal
|
|
});
|
|
|
|
for await (const event of streamedTurn.events) {
|
|
// Capture thread/session id lazily from the stream (Codex emits this asynchronously).
|
|
if (event.type === 'thread.started') {
|
|
const discoveredSessionId = event.thread_id || event.id || null;
|
|
if (discoveredSessionId && !capturedSessionId) {
|
|
capturedSessionId = discoveredSessionId;
|
|
registerSession(capturedSessionId);
|
|
|
|
if (ws.setSessionId && typeof ws.setSessionId === 'function') {
|
|
ws.setSessionId(capturedSessionId);
|
|
}
|
|
|
|
if (!sessionId && !sessionCreatedSent) {
|
|
sessionCreatedSent = true;
|
|
sendMessage(ws, createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'codex' }));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check if session was aborted
|
|
if (abortController.signal.aborted) {
|
|
break;
|
|
}
|
|
if (capturedSessionId) {
|
|
const session = activeCodexSessions.get(capturedSessionId);
|
|
if (session?.status === 'aborted') {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (event.type === 'item.started' || event.type === 'item.updated') {
|
|
continue;
|
|
}
|
|
|
|
const transformed = transformCodexEvent(event);
|
|
|
|
// Normalize the transformed event into NormalizedMessage(s) via adapter
|
|
const normalizedMsgs = sessionsService.normalizeMessage('codex', transformed, capturedSessionId || sessionId || null);
|
|
for (const msg of normalizedMsgs) {
|
|
sendMessage(ws, msg);
|
|
}
|
|
|
|
if (event.type === 'turn.failed' && !terminalFailure) {
|
|
terminalFailure = event.error || new Error('Turn failed');
|
|
notifyRunFailed({
|
|
userId: ws?.userId || null,
|
|
provider: 'codex',
|
|
sessionId: capturedSessionId || sessionId || null,
|
|
sessionName: sessionSummary,
|
|
error: terminalFailure
|
|
});
|
|
}
|
|
|
|
// Extract and send token usage if available (normalized to match Claude format)
|
|
if (event.type === 'turn.completed') {
|
|
const tokenBudget = extractCodexTokenBudget(event);
|
|
if (tokenBudget) {
|
|
sendMessage(ws, createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget, sessionId: capturedSessionId || sessionId || null, provider: 'codex' }));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send the terminal completion event — skipped for aborted runs, whose
|
|
// terminal `complete` (aborted: true) was already sent by abort-session.
|
|
const runSession = capturedSessionId ? activeCodexSessions.get(capturedSessionId) : null;
|
|
const runAborted = runSession?.status === 'aborted' || abortController.signal.aborted;
|
|
if (!runAborted) {
|
|
sendMessage(ws, createCompleteMessage({
|
|
provider: 'codex',
|
|
sessionId: capturedSessionId || sessionId || null,
|
|
actualSessionId: capturedSessionId || thread.id || sessionId || null,
|
|
exitCode: terminalFailure ? 1 : 0,
|
|
}));
|
|
if (!terminalFailure) {
|
|
notifyRunStopped({
|
|
userId: ws?.userId || null,
|
|
provider: 'codex',
|
|
sessionId: capturedSessionId || sessionId || null,
|
|
sessionName: sessionSummary,
|
|
stopReason: 'completed'
|
|
});
|
|
}
|
|
}
|
|
|
|
} catch (error) {
|
|
const session = capturedSessionId ? activeCodexSessions.get(capturedSessionId) : null;
|
|
const wasAborted =
|
|
session?.status === 'aborted' ||
|
|
error?.name === 'AbortError' ||
|
|
String(error?.message || '').toLowerCase().includes('aborted');
|
|
|
|
if (!wasAborted) {
|
|
console.error('[Codex] Error:', error);
|
|
|
|
// Check if Codex SDK is available for a clearer error message
|
|
const installed = await providerAuthService.isProviderInstalled('codex');
|
|
const errorContent = !installed
|
|
? 'Codex CLI is not configured. Please set up authentication first.'
|
|
: error.message;
|
|
|
|
sendMessage(ws, createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'codex' }));
|
|
sendMessage(ws, createCompleteMessage({
|
|
provider: 'codex',
|
|
sessionId: capturedSessionId || sessionId || null,
|
|
exitCode: 1,
|
|
}));
|
|
if (!terminalFailure) {
|
|
notifyRunFailed({
|
|
userId: ws?.userId || null,
|
|
provider: 'codex',
|
|
sessionId: capturedSessionId || sessionId || null,
|
|
sessionName: sessionSummary,
|
|
error
|
|
});
|
|
}
|
|
}
|
|
|
|
} finally {
|
|
// Update session status
|
|
if (capturedSessionId) {
|
|
const session = activeCodexSessions.get(capturedSessionId);
|
|
if (session) {
|
|
session.status = session.status === 'aborted' ? 'aborted' : 'completed';
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Abort an active Codex session
|
|
* @param {string} sessionId - Session ID to abort
|
|
* @returns {boolean} - Whether abort was successful
|
|
*/
|
|
export function abortCodexSession(sessionId) {
|
|
const session = activeCodexSessions.get(sessionId);
|
|
|
|
if (!session) {
|
|
return false;
|
|
}
|
|
|
|
session.status = 'aborted';
|
|
try {
|
|
session.abortController?.abort();
|
|
} catch (error) {
|
|
console.warn(`[Codex] Failed to abort session ${sessionId}:`, error);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Check if a session is active
|
|
* @param {string} sessionId - Session ID to check
|
|
* @returns {boolean} - Whether session is active
|
|
*/
|
|
export function isCodexSessionActive(sessionId) {
|
|
const session = activeCodexSessions.get(sessionId);
|
|
return session?.status === 'running';
|
|
}
|
|
|
|
/**
|
|
* Get all active sessions
|
|
* @returns {Array} - Array of active session info
|
|
*/
|
|
export function getActiveCodexSessions() {
|
|
const sessions = [];
|
|
|
|
for (const [id, session] of activeCodexSessions.entries()) {
|
|
if (session.status === 'running') {
|
|
sessions.push({
|
|
id,
|
|
status: session.status,
|
|
startedAt: session.startedAt
|
|
});
|
|
}
|
|
}
|
|
|
|
return sessions;
|
|
}
|
|
|
|
/**
|
|
* Helper to send message via WebSocket or writer
|
|
* @param {WebSocket|object} ws - WebSocket or response writer
|
|
* @param {object} data - Data to send
|
|
*/
|
|
function sendMessage(ws, data) {
|
|
try {
|
|
if (ws.isSSEStreamWriter || ws.isWebSocketWriter) {
|
|
// Writer handles stringification (SSEStreamWriter or WebSocketWriter)
|
|
ws.send(data);
|
|
} else if (typeof ws.send === 'function') {
|
|
// Raw WebSocket - stringify here
|
|
ws.send(JSON.stringify(data));
|
|
}
|
|
} catch (error) {
|
|
console.error('[Codex] Error sending message:', error);
|
|
}
|
|
}
|
|
|
|
// Clean up old completed sessions periodically
|
|
setInterval(() => {
|
|
const now = Date.now();
|
|
const maxAge = 30 * 60 * 1000; // 30 minutes
|
|
|
|
for (const [id, session] of activeCodexSessions.entries()) {
|
|
if (session.status !== 'running') {
|
|
const startedAt = new Date(session.startedAt).getTime();
|
|
if (now - startedAt > maxAge) {
|
|
activeCodexSessions.delete(id);
|
|
}
|
|
}
|
|
}
|
|
}, 5 * 60 * 1000); // Every 5 minutes
|