Files
claudecodeui/server/modules/providers/list/codex/codex-sessions.provider.ts

559 lines
16 KiB
TypeScript

import fsSync from 'node:fs';
import readline from 'node:readline';
import { sessionsDb } from '@/modules/database/index.js';
import type { IProviderSessions } from '@/shared/interfaces.js';
import type { AnyRecord, FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js';
import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js';
const PROVIDER = 'codex';
type CodexHistoryResult =
| AnyRecord[]
| {
messages?: AnyRecord[];
total?: number;
hasMore?: boolean;
offset?: number;
limit?: number | null;
tokenUsage?: unknown;
};
function isVisibleCodexUserMessage(payload: AnyRecord | null | undefined): boolean {
if (!payload || payload.type !== 'user_message') {
return false;
}
if (payload.kind && payload.kind !== 'plain') {
return false;
}
return typeof payload.message === 'string' && payload.message.trim().length > 0;
}
function extractCodexTextContent(content: unknown): string {
if (!Array.isArray(content)) {
return typeof content === 'string' ? content : '';
}
return content
.map((item) => {
if (!item || typeof item !== 'object') {
return '';
}
const record = item as AnyRecord;
if (
(record.type === 'input_text' || record.type === 'output_text' || record.type === 'text')
&& typeof record.text === 'string'
) {
return record.text;
}
return '';
})
.filter(Boolean)
.join('\n');
}
async function getCodexSessionMessages(
sessionId: string,
limit: number | null = null,
offset = 0,
): Promise<CodexHistoryResult> {
try {
const sessionFilePath = sessionsDb.getSessionById(sessionId)?.jsonl_path;
if (!sessionFilePath) {
console.warn(`Codex session file not found for session ${sessionId}`);
return { messages: [], total: 0, hasMore: false };
}
const messages: AnyRecord[] = [];
let tokenUsage: AnyRecord | null = null;
const fileStream = fsSync.createReadStream(sessionFilePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});
for await (const line of rl) {
if (!line.trim()) {
continue;
}
try {
const entry = JSON.parse(line) as AnyRecord;
if (entry.type === 'event_msg' && entry.payload?.type === 'token_count' && entry.payload?.info) {
const info = entry.payload.info as AnyRecord;
if (info.total_token_usage) {
const usage = info.total_token_usage as AnyRecord;
tokenUsage = {
used: usage.total_tokens || 0,
total: info.model_context_window || 200000,
};
}
}
if (entry.type === 'event_msg' && isVisibleCodexUserMessage(entry.payload as AnyRecord)) {
messages.push({
type: 'user',
timestamp: entry.timestamp,
message: {
role: 'user',
content: entry.payload.message,
},
});
}
if (
entry.type === 'response_item' &&
entry.payload?.type === 'message' &&
entry.payload.role === 'assistant'
) {
const textContent = extractCodexTextContent(entry.payload.content);
if (textContent.trim()) {
messages.push({
type: 'assistant',
timestamp: entry.timestamp,
message: {
role: 'assistant',
content: textContent,
},
});
}
}
if (entry.type === 'response_item' && entry.payload?.type === 'reasoning') {
const summaryText = Array.isArray(entry.payload.summary)
? entry.payload.summary
.map((item: AnyRecord) => item?.text)
.filter(Boolean)
.join('\n')
: '';
if (summaryText.trim()) {
messages.push({
type: 'thinking',
timestamp: entry.timestamp,
message: {
role: 'assistant',
content: summaryText,
},
});
}
}
if (entry.type === 'response_item' && entry.payload?.type === 'function_call') {
let toolName = entry.payload.name;
let toolInput = entry.payload.arguments;
if (toolName === 'shell_command') {
toolName = 'Bash';
try {
const args = JSON.parse(entry.payload.arguments) as AnyRecord;
toolInput = JSON.stringify({ command: args.command });
} catch {
// Keep original arguments when parsing fails.
}
}
messages.push({
type: 'tool_use',
timestamp: entry.timestamp,
toolName,
toolInput,
toolCallId: entry.payload.call_id,
});
}
if (entry.type === 'response_item' && entry.payload?.type === 'function_call_output') {
messages.push({
type: 'tool_result',
timestamp: entry.timestamp,
toolCallId: entry.payload.call_id,
output: entry.payload.output,
});
}
if (entry.type === 'response_item' && entry.payload?.type === 'custom_tool_call') {
const toolName = entry.payload.name || 'custom_tool';
const input = entry.payload.input || '';
if (toolName === 'apply_patch') {
const fileMatch = String(input).match(/\*\*\* Update File: (.+)/);
const filePath = fileMatch ? fileMatch[1].trim() : 'unknown';
const lines = String(input).split('\n');
const oldLines: string[] = [];
const newLines: string[] = [];
for (const lineContent of lines) {
if (lineContent.startsWith('-') && !lineContent.startsWith('---')) {
oldLines.push(lineContent.slice(1));
} else if (lineContent.startsWith('+') && !lineContent.startsWith('+++')) {
newLines.push(lineContent.slice(1));
}
}
messages.push({
type: 'tool_use',
timestamp: entry.timestamp,
toolName: 'Edit',
toolInput: JSON.stringify({
file_path: filePath,
old_string: oldLines.join('\n'),
new_string: newLines.join('\n'),
}),
toolCallId: entry.payload.call_id,
});
} else {
messages.push({
type: 'tool_use',
timestamp: entry.timestamp,
toolName,
toolInput: input,
toolCallId: entry.payload.call_id,
});
}
}
if (entry.type === 'response_item' && entry.payload?.type === 'custom_tool_call_output') {
messages.push({
type: 'tool_result',
timestamp: entry.timestamp,
toolCallId: entry.payload.call_id,
output: entry.payload.output || '',
});
}
} catch {
// Skip malformed lines.
}
}
messages.sort(
(a, b) => new Date(a.timestamp || 0).getTime() - new Date(b.timestamp || 0).getTime(),
);
const total = messages.length;
if (limit !== null) {
const startIndex = Math.max(0, total - offset - limit);
const endIndex = total - offset;
const paginatedMessages = messages.slice(startIndex, endIndex);
const hasMore = startIndex > 0;
return {
messages: paginatedMessages,
total,
hasMore,
offset,
limit,
tokenUsage,
};
}
return { messages, tokenUsage };
} catch (error) {
console.error(`Error reading Codex session messages for ${sessionId}:`, error);
return { messages: [], total: 0, hasMore: false };
}
}
export class CodexSessionsProvider implements IProviderSessions {
/**
* Normalizes a persisted Codex JSONL entry.
*
* Live Codex SDK events are transformed before they reach normalizeMessage(),
* while history entries already use a compact message/tool shape from projects.js.
*/
private normalizeHistoryEntry(raw: AnyRecord, sessionId: string | null): NormalizedMessage[] {
const ts = raw.timestamp || new Date().toISOString();
const baseId = raw.uuid || generateMessageId('codex');
if (raw.message?.role === 'user') {
const content = typeof raw.message.content === 'string'
? raw.message.content
: Array.isArray(raw.message.content)
? raw.message.content
.map((part: string | AnyRecord) => typeof part === 'string' ? part : part?.text || '')
.filter(Boolean)
.join('\n')
: String(raw.message.content || '');
if (!content.trim()) {
return [];
}
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'user',
content,
})];
}
if (raw.message?.role === 'assistant') {
const content = typeof raw.message.content === 'string'
? raw.message.content
: Array.isArray(raw.message.content)
? raw.message.content
.map((part: string | AnyRecord) => typeof part === 'string' ? part : part?.text || '')
.filter(Boolean)
.join('\n')
: '';
if (!content.trim()) {
return [];
}
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'assistant',
content,
})];
}
if (raw.type === 'thinking' || raw.isReasoning) {
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'thinking',
content: raw.message?.content || '',
})];
}
if (raw.type === 'tool_use' || raw.toolName) {
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: raw.toolName || 'Unknown',
toolInput: raw.toolInput,
toolId: raw.toolCallId || baseId,
})];
}
if (raw.type === 'tool_result') {
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_result',
toolId: raw.toolCallId || '',
content: raw.output || '',
isError: Boolean(raw.isError),
})];
}
return [];
}
/**
* Normalizes either a Codex history entry or a transformed live SDK event.
*/
normalizeMessage(rawMessage: unknown, sessionId: string | null): NormalizedMessage[] {
const raw = readObjectRecord(rawMessage);
if (!raw) {
return [];
}
if (raw.message?.role) {
return this.normalizeHistoryEntry(raw, sessionId);
}
const ts = raw.timestamp || new Date().toISOString();
const baseId = raw.uuid || generateMessageId('codex');
if (raw.type === 'item') {
switch (raw.itemType) {
case 'agent_message':
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'text',
role: 'assistant',
content: raw.message?.content || '',
})];
case 'reasoning':
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'thinking',
content: raw.message?.content || '',
})];
case 'command_execution':
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: 'Bash',
toolInput: { command: raw.command },
toolId: baseId,
output: raw.output,
exitCode: raw.exitCode,
status: raw.status,
})];
case 'file_change':
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: 'FileChanges',
toolInput: raw.changes,
toolId: baseId,
status: raw.status,
})];
case 'mcp_tool_call':
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: raw.tool || 'MCP',
toolInput: raw.arguments,
toolId: baseId,
server: raw.server,
result: raw.result,
error: raw.error,
status: raw.status,
})];
case 'web_search':
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: 'WebSearch',
toolInput: { query: raw.query },
toolId: baseId,
})];
case 'todo_list':
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: 'TodoList',
toolInput: { items: raw.items },
toolId: baseId,
})];
case 'error':
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'error',
content: raw.message?.content || 'Unknown error',
})];
default:
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'tool_use',
toolName: raw.itemType || 'Unknown',
toolInput: raw.item || raw,
toolId: baseId,
})];
}
}
if (raw.type === 'turn_complete') {
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'complete',
})];
}
if (raw.type === 'turn_failed') {
return [createNormalizedMessage({
id: baseId,
sessionId,
timestamp: ts,
provider: PROVIDER,
kind: 'error',
content: raw.error?.message || 'Turn failed',
})];
}
return [];
}
/**
* Loads Codex JSONL history and keeps token usage metadata when projects.js
* provides it.
*/
async fetchHistory(
sessionId: string,
options: FetchHistoryOptions = {},
): Promise<FetchHistoryResult> {
const { limit = null, offset = 0 } = options;
let result: CodexHistoryResult;
try {
result = await getCodexSessionMessages(sessionId, limit, offset);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.warn(`[CodexProvider] Failed to load session ${sessionId}:`, message);
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
}
const rawMessages = Array.isArray(result) ? result : (result.messages || []);
const total = Array.isArray(result) ? rawMessages.length : (result.total || 0);
const hasMore = Array.isArray(result) ? false : Boolean(result.hasMore);
const tokenUsage = Array.isArray(result) ? undefined : result.tokenUsage;
const normalized: NormalizedMessage[] = [];
for (const raw of rawMessages) {
normalized.push(...this.normalizeHistoryEntry(raw, sessionId));
}
const toolResultMap = new Map<string, NormalizedMessage>();
for (const msg of normalized) {
if (msg.kind === 'tool_result' && msg.toolId) {
toolResultMap.set(msg.toolId, msg);
}
}
for (const msg of normalized) {
if (msg.kind === 'tool_use' && msg.toolId && toolResultMap.has(msg.toolId)) {
const toolResult = toolResultMap.get(msg.toolId);
if (toolResult) {
msg.toolResult = { content: toolResult.content, isError: toolResult.isError };
}
}
}
return {
messages: normalized,
total,
hasMore,
offset,
limit,
tokenUsage,
};
}
}