feat: add chat unifier setup

This commit is contained in:
Haileyesus
2026-04-06 20:12:07 +03:00
parent bdab5a806f
commit 28a523b7a3
10 changed files with 4196 additions and 16 deletions

View File

@@ -0,0 +1,337 @@
import assert from 'node:assert/strict';
import test from 'node:test';
import { llmMessagesUnifier } from './messages-unifier.service.js';
/**
* This test covers helper-3 Claude normalization: user/assistant/thinking/tool-use/tool-result/error.
*/
test('llmMessagesUnifier normalizes claude message categories', () => {
const sessionId = 'claude-session-1';
const thinking = llmMessagesUnifier.normalizeUnknown('claude', sessionId, {
type: 'assistant',
timestamp: '2026-04-06T10:00:00.000Z',
message: {
content: [
{ type: 'thinking', thinking: '' },
{ type: 'text', text: 'Assistant response' },
{ type: 'tool_use', id: 'toolu_1', name: 'Read', input: { file_path: 'a.txt' } },
],
},
});
assert.equal(thinking[0]?.type, 'thinking_message');
assert.equal(thinking[0]?.text, 'Thinking');
assert.equal(thinking[1]?.type, 'assistant_message');
assert.equal(thinking[2]?.type, 'tool_use_request');
const user = llmMessagesUnifier.normalizeUnknown('claude', sessionId, {
type: 'user',
message: {
content: [
{ type: 'text', text: 'hello there' },
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/png',
data: 'image-b64',
},
},
],
},
});
assert.equal(user[0]?.type, 'user_message');
assert.equal(user[0]?.text, 'hello there');
assert.deepEqual(user[0]?.images, ['image-b64']);
const toolResult = llmMessagesUnifier.normalizeUnknown('claude', sessionId, {
type: 'user',
toolUseResult: { success: false, reason: 'denied' },
});
assert.equal(toolResult[0]?.type, 'tool_call_error');
const toolResultSuccess = llmMessagesUnifier.normalizeUnknown('claude', sessionId, {
type: 'user',
toolUseResult: { type: 'create', filePath: 'hello.py' },
});
assert.equal(toolResultSuccess[0]?.type, 'tool_call_success');
const todo = llmMessagesUnifier.normalizeUnknown('claude', sessionId, {
type: 'assistant',
message: {
content: [
{
type: 'tool_use',
id: 'toolu_todo',
name: 'TaskUpdate',
input: { taskId: '1', status: 'in_progress' },
},
],
},
});
assert.equal(todo[0]?.type, 'todo_task_list');
assert.equal(todo[0]?.has_progress_indicator, true);
const assistantError = llmMessagesUnifier.normalizeUnknown('claude', sessionId, {
type: 'assistant',
error: 'rate_limit',
message: { content: [] },
});
assert.equal(assistantError[0]?.type, 'assistant_error_message');
});
/**
* This test covers helper-3 Codex normalization: user_message, reasoning fallback, tool request/success/error, todo plan updates.
*/
test('llmMessagesUnifier normalizes codex message categories', () => {
const sessionId = 'codex-session-1';
const user = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'event_msg',
payload: {
type: 'user_message',
message: 'run command',
local_images: ['a.png'],
images: ['b.png'],
},
});
assert.equal(user[0]?.type, 'user_message');
assert.deepEqual(user[0]?.images, ['a.png', 'b.png']);
const reasoning = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'response_item',
payload: {
type: 'reasoning',
summary: [],
},
});
assert.equal(reasoning[0]?.type, 'thinking_message');
assert.equal(reasoning[0]?.text, 'Reasoning');
const toolRequest = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'response_item',
payload: {
type: 'function_call',
name: 'shell_command',
arguments: '{"command":"echo hi"}',
call_id: 'call_1',
},
});
assert.equal(toolRequest[0]?.type, 'tool_use_request');
const assistantMessage = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'response_item',
payload: {
type: 'message',
role: 'assistant',
content: [{ type: 'output_text', text: 'Command finished' }],
},
});
assert.equal(assistantMessage[0]?.type, 'assistant_message');
assert.equal(assistantMessage[0]?.text, 'Command finished');
const todo = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'response_item',
payload: {
type: 'function_call',
name: 'update_plan',
arguments: '{"plan":[{"step":"A","status":"in_progress"}]}',
call_id: 'call_2',
},
});
assert.equal(todo[0]?.type, 'todo_task_list');
assert.equal(todo[0]?.has_progress_indicator, true);
const toolError = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'event_msg',
payload: {
type: 'exec_command_end',
status: 'failed',
call_id: 'call_3',
},
});
assert.equal(toolError[0]?.type, 'tool_call_error');
const toolSuccess = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'response_item',
payload: {
type: 'function_call_output',
call_id: 'call_4',
output: 'Exit code: 0\nWall time: 0.1 seconds',
},
});
assert.equal(toolSuccess[0]?.type, 'tool_call_success');
const interruptedTurn = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'response_item',
payload: {
type: 'message',
role: 'user',
content: [{ type: 'input_text', text: '<turn_aborted>\nInterrupted\n</turn_aborted>' }],
},
});
assert.equal(interruptedTurn[0]?.type, 'session_interrupted');
const payloadError = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'response_item',
payload: {
type: 'error',
message: 'codex payload error',
},
});
assert.equal(payloadError[0]?.type, 'assistant_error_message');
const streamError = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
type: 'error',
message: 'codex stream error',
});
assert.equal(streamError[0]?.type, 'assistant_error_message');
});
/**
* This test covers helper-3 Gemini normalization from JSON history: user/assistant/thought/tool-call success and error.
*/
test('llmMessagesUnifier normalizes gemini history categories', () => {
const sessionId = 'gemini-session-1';
const messages = llmMessagesUnifier.normalizeUnknown('gemini', sessionId, {
sessionId,
messages: [
{
type: 'user',
timestamp: '2026-04-01T10:00:00.000Z',
content: [{ text: 'create files' }],
},
{
type: 'gemini',
timestamp: '2026-04-01T10:00:01.000Z',
content: 'I will do it',
thoughts: [{ subject: 'Planning', description: 'Thinking path' }],
toolCalls: [
{ id: 't1', name: 'write_file', displayName: 'Write File', status: 'success' },
{ id: 't2', name: 'write_file', status: 'error' },
],
},
],
});
assert.ok(messages.some((message) => message.type === 'user_message'));
assert.ok(messages.some((message) => message.type === 'assistant_message'));
assert.ok(messages.some((message) => message.type === 'thinking_message'));
assert.ok(messages.some((message) => message.type === 'tool_call_success'));
assert.ok(messages.some((message) => message.type === 'tool_call_error'));
const assistantIndex = messages.findIndex((message) => message.type === 'assistant_message');
const thinkingIndex = messages.findIndex((message) => message.type === 'thinking_message');
assert.ok(assistantIndex >= 0);
assert.ok(thinkingIndex > assistantIndex);
const successfulTool = messages.find((message) => message.type === 'tool_call_success');
assert.equal(successfulTool?.toolName, 'Write File');
});
/**
* This test covers helper-3 Cursor normalization: strip user_query tags and parse CreatePlan as todo with no progress indicator.
*/
test('llmMessagesUnifier normalizes cursor categories and strips user_query tags', () => {
const sessionId = 'cursor-session-1';
const user = llmMessagesUnifier.normalizeUnknown('cursor', sessionId, {
role: 'user',
message: {
content: [{ type: 'text', text: '<user_query>\nhello world\n</user_query>' }],
},
});
assert.equal(user[0]?.type, 'user_message');
assert.equal(user[0]?.text, 'hello world');
const assistant = llmMessagesUnifier.normalizeUnknown('cursor', sessionId, {
role: 'assistant',
message: {
content: [
{ type: 'text', text: 'Starting work' },
{
type: 'tool_use',
name: 'CreatePlan',
input: {
todos: [{ id: '1', content: 'Do it' }],
},
},
{
type: 'tool_use',
name: 'ApplyPatch',
input: {
patch: '*** Begin Patch',
},
},
],
},
});
assert.ok(assistant.some((message) => message.type === 'assistant_message'));
const todoMessage = assistant.find((message) => message.type === 'todo_task_list');
assert.equal(todoMessage?.has_progress_indicator, false);
assert.ok(assistant.some((message) => message.type === 'tool_call_success'));
});
/**
* This test covers shared session status normalization: started/completed/interrupted payloads.
*/
test('llmMessagesUnifier normalizes shared session status events', () => {
const sessionId = 'shared-session-1';
const started = llmMessagesUnifier.normalizeUnknown('codex', sessionId, {
sessionId,
sessionStatus: 'STARTED',
});
assert.equal(started[0]?.type, 'session_started');
const completed = llmMessagesUnifier.normalizeUnknown('gemini', sessionId, {
sessionId,
sessionStatus: 'COMPLETED',
});
assert.equal(completed[0]?.type, 'session_completed');
const interrupted = llmMessagesUnifier.normalizeUnknown('claude', sessionId, {
sessionId,
sessionStatus: 'SESSION_ABORTED',
});
assert.equal(interrupted[0]?.type, 'session_interrupted');
});
/**
* This test covers helper-3 notification flow: Claude permission callbacks should surface as tool_use_request.
*/
test('llmMessagesUnifier normalizes pre-unified tool_use_request payloads', () => {
const sessionId = 'permission-session-1';
const messages = llmMessagesUnifier.normalizeUnknown('claude', sessionId, {
type: 'tool_use_request',
toolName: 'Read',
input: { filePath: 'notes.txt' },
toolUseID: 'toolu_123',
title: 'Claude wants to read notes.txt',
});
assert.equal(messages[0]?.type, 'tool_use_request');
assert.equal(messages[0]?.toolName, 'Read');
assert.equal(messages[0]?.toolCallId, 'toolu_123');
});
/**
* This test covers helper-3 runtime-event fallback behavior for non-JSON stdout/stderr stream messages.
*/
test('llmMessagesUnifier normalizes fallback session events with channel-aware error typing', () => {
const messages = llmMessagesUnifier.normalizeSessionEvents('gemini', 'runtime-session-1', [
{
timestamp: '2026-04-06T12:00:00.000Z',
channel: 'stdout',
message: 'Process started',
},
{
timestamp: '2026-04-06T12:00:01.000Z',
channel: 'error',
message: 'Process failed',
},
]);
assert.equal(messages[0]?.type, 'assistant_message');
assert.equal(messages[1]?.type, 'assistant_error_message');
});

View File

@@ -11,6 +11,7 @@ import { llmAssetsService } from '@/modules/llm/assets.service.js';
import type { McpScope, McpTransport, UpsertMcpServerInput } from '@/modules/llm/mcp.service.js';
import { llmMcpService } from '@/modules/llm/mcp.service.js';
import { llmSkillsService } from '@/modules/llm/skills.service.js';
import { llmMessagesUnifier } from '@/modules/llm/messages-unifier.service.js';
import type { LLMProvider } from '@/shared/types/app.js';
import { logger } from '@/shared/utils/logger.js';
@@ -215,6 +216,25 @@ const parseProvider = (value: unknown): LLMProvider => {
});
};
/**
* Enriches provider session snapshots with normalized message types for frontend rendering.
*/
const formatSessionSnapshot = (
provider: LLMProvider,
snapshot: {
sessionId: string;
events: Array<{
timestamp: string;
channel: 'sdk' | 'stdout' | 'stderr' | 'json' | 'system' | 'error';
message?: string;
data?: unknown;
}>;
},
) => ({
...snapshot,
messages: llmMessagesUnifier.normalizeSessionEvents(provider, snapshot.sessionId, snapshot.events),
});
router.get(
'/providers',
asyncHandler(async (_req: Request, res: Response) => {
@@ -235,7 +255,7 @@ router.get(
'/providers/:provider/sessions',
asyncHandler(async (req: Request, res: Response) => {
const provider = parseProvider(req.params.provider);
const sessions = llmService.listSessions(provider);
const sessions = llmService.listSessions(provider).map((session) => formatSessionSnapshot(provider, session));
res.json(createApiSuccessResponse({ provider, sessions }));
}),
);
@@ -253,7 +273,7 @@ router.get(
});
}
res.json(createApiSuccessResponse({ provider, session }));
res.json(createApiSuccessResponse({ provider, session: formatSessionSnapshot(provider, session) }));
}),
);
@@ -265,17 +285,19 @@ router.post(
const waitForCompletion = parseWaitForCompletion(req);
if (!waitForCompletion) {
const formattedSnapshot = formatSessionSnapshot(provider, snapshot);
res.status(202).json(
createApiSuccessResponse({
provider,
session: snapshot,
session: formattedSnapshot,
}),
);
return;
}
const completedSnapshot = await llmService.waitForSession(provider, snapshot.sessionId);
res.json(createApiSuccessResponse({ provider, session: completedSnapshot ?? snapshot }));
const finalSnapshot = completedSnapshot ?? snapshot;
res.json(createApiSuccessResponse({ provider, session: formatSessionSnapshot(provider, finalSnapshot) }));
}),
);
@@ -289,12 +311,13 @@ router.post(
const waitForCompletion = parseWaitForCompletion(req);
if (!waitForCompletion) {
res.status(202).json(createApiSuccessResponse({ provider, session: snapshot }));
res.status(202).json(createApiSuccessResponse({ provider, session: formatSessionSnapshot(provider, snapshot) }));
return;
}
const completedSnapshot = await llmService.waitForSession(provider, sessionId);
res.json(createApiSuccessResponse({ provider, session: completedSnapshot ?? snapshot }));
const finalSnapshot = completedSnapshot ?? snapshot;
res.json(createApiSuccessResponse({ provider, session: formatSessionSnapshot(provider, finalSnapshot) }));
}),
);
@@ -532,6 +555,19 @@ router.get(
}),
);
router.get(
'/sessions/:sessionId/messages',
asyncHandler(async (req: Request, res: Response) => {
const sessionId = readPathParam(req.params.sessionId, 'sessionId');
const history = await llmSessionsService.getSessionHistory(sessionId);
res.json(createApiSuccessResponse({
sessionId,
provider: history.provider,
messages: history.messages,
}));
}),
);
router.get(
'/sessions/:sessionId/history',
asyncHandler(async (req: Request, res: Response) => {

View File

@@ -0,0 +1,908 @@
import type { ProviderSessionEvent } from '@/modules/llm/providers/provider.interface.js';
import type { LLMProvider } from '@/shared/types/app.js';
export type UnifiedMessageType =
| 'user_message'
| 'thinking_message'
| 'assistant_message'
| 'assistant_error_message'
| 'tool_use_request'
| 'tool_call_success'
| 'tool_call_error'
| 'todo_task_list'
| 'session_started'
| 'session_completed'
| 'session_interrupted';
export type UnifiedSessionStatus = 'STARTED' | 'COMPLETED' | 'SESSION_ABORTED';
export type UnifiedChatMessage = {
timestamp: string;
provider: LLMProvider;
sessionId: string;
type: UnifiedMessageType;
text?: string;
images?: string[];
toolName?: string;
toolCallId?: string;
status?: 'success' | 'error';
has_progress_indicator?: boolean;
sessionStatus?: UnifiedSessionStatus;
data?: unknown;
raw?: unknown;
};
type MessageContext = {
provider: LLMProvider;
sessionId: string;
timestamp?: string;
};
/**
* Unifies provider-specific history/event payloads into one frontend-safe message contract.
*/
export const llmMessagesUnifier = {
/**
* Converts in-memory provider session events to unified chat messages.
*/
normalizeSessionEvents(
provider: LLMProvider,
sessionId: string,
events: ProviderSessionEvent[],
): UnifiedChatMessage[] {
const messages: UnifiedChatMessage[] = [];
for (const event of events) {
const normalized = this.normalizeUnknown(provider, sessionId, event.data ?? event.message ?? event, event.timestamp);
if (normalized.length === 0 && event.message) {
messages.push(createMessage({
provider,
sessionId,
timestamp: event.timestamp,
type: event.channel === 'error' ? 'assistant_error_message' : 'assistant_message',
text: event.message,
raw: event,
}));
continue;
}
messages.push(...normalized);
}
return messages;
},
/**
* Converts DB history payload entries to unified chat messages.
*/
normalizeHistoryEntries(
provider: LLMProvider,
sessionId: string,
entries: unknown[],
): UnifiedChatMessage[] {
const messages: UnifiedChatMessage[] = [];
for (const entry of entries) {
messages.push(...this.normalizeUnknown(provider, sessionId, entry));
}
return messages;
},
/**
* Converts one raw provider payload to zero-or-more normalized messages.
*/
normalizeUnknown(
provider: LLMProvider,
sessionId: string,
raw: unknown,
timestamp?: string,
): UnifiedChatMessage[] {
const context: MessageContext = { provider, sessionId, timestamp };
if (!raw || typeof raw !== 'object') {
return [];
}
const preUnified = normalizePreUnifiedPayload(raw as Record<string, unknown>, context);
if (preUnified) {
return preUnified;
}
if (provider === 'claude') {
return normalizeClaudePayload(raw as Record<string, unknown>, context);
}
if (provider === 'codex') {
return normalizeCodexPayload(raw as Record<string, unknown>, context);
}
if (provider === 'gemini') {
return normalizeGeminiPayload(raw as Record<string, unknown>, context);
}
return normalizeCursorPayload(raw as Record<string, unknown>, context);
},
};
/**
* Maps already-unified custom payloads (for example permission callbacks) without provider parsing.
*/
function normalizePreUnifiedPayload(
raw: Record<string, unknown>,
context: MessageContext,
): UnifiedChatMessage[] | null {
const type = readString(raw.type);
if (!type) {
return null;
}
if (
type !== 'user_message' &&
type !== 'thinking_message' &&
type !== 'assistant_message' &&
type !== 'assistant_error_message' &&
type !== 'tool_use_request' &&
type !== 'tool_call_success' &&
type !== 'tool_call_error' &&
type !== 'todo_task_list' &&
type !== 'session_started' &&
type !== 'session_completed' &&
type !== 'session_interrupted'
) {
return null;
}
const statusValue = readString(raw.status);
const status =
statusValue === 'success' || statusValue === 'error'
? statusValue
: undefined;
const sessionStatus = readString(raw.sessionStatus);
const normalizedSessionStatus =
sessionStatus === 'STARTED' || sessionStatus === 'COMPLETED' || sessionStatus === 'SESSION_ABORTED'
? sessionStatus
: undefined;
const hasProgressIndicator =
readBoolean(raw.has_progress_indicator) ?? readBoolean(raw.hasProgressIndicator);
return [
createMessage({
...context,
timestamp: readString(raw.timestamp) ?? context.timestamp,
type,
text: readString(raw.text) ?? readString(raw.message),
images: readStringArray(raw.images),
toolName: readString(raw.toolName) ?? readString(raw.name),
toolCallId: readString(raw.toolCallId) ?? readString(raw.toolUseID) ?? readString(raw.call_id),
status,
has_progress_indicator: hasProgressIndicator,
sessionStatus: normalizedSessionStatus,
data: raw.data ?? raw.input ?? raw.payload,
raw,
}),
];
}
/**
* Normalizes Claude payloads from both SDK stream and disk history.
*/
function normalizeClaudePayload(
raw: Record<string, unknown>,
context: MessageContext,
): UnifiedChatMessage[] {
const sessionStatusMessage = normalizeSessionStatus(raw, context);
if (sessionStatusMessage) {
return [sessionStatusMessage];
}
const type = readString(raw.type);
const timestamp = readString(raw.timestamp) ?? context.timestamp;
if (type === 'assistant') {
const messages: UnifiedChatMessage[] = [];
if (readString(raw.error)) {
messages.push(createMessage({
...context,
timestamp,
type: 'assistant_error_message',
text: readString(raw.error),
raw,
}));
}
const messageRecord = readRecord(raw.message);
const contentBlocks = readArray(messageRecord?.content);
for (const contentBlock of contentBlocks) {
const block = readRecord(contentBlock);
if (!block) {
continue;
}
const blockType = readString(block.type);
if (blockType === 'thinking') {
const thinkingText = readString(block.thinking) ?? 'Thinking';
messages.push(createMessage({
...context,
timestamp,
type: 'thinking_message',
text: thinkingText.length ? thinkingText : 'Thinking',
raw: block,
}));
continue;
}
if (blockType === 'text') {
const text = readString(block.text);
if (text) {
messages.push(createMessage({
...context,
timestamp,
type: 'assistant_message',
text,
raw: block,
}));
}
continue;
}
if (blockType === 'tool_use') {
const toolName = readString(block.name);
const toolInput = readRecord(block.input) ?? block.input;
if (toolName === 'TaskCreate' || toolName === 'TaskUpdate') {
messages.push(createMessage({
...context,
timestamp,
type: 'todo_task_list',
toolName,
has_progress_indicator: true,
data: toolInput,
raw: block,
}));
continue;
}
messages.push(createMessage({
...context,
timestamp,
type: 'tool_use_request',
toolName,
toolCallId: readString(block.id),
data: toolInput,
raw: block,
}));
}
}
return messages;
}
if (type === 'user') {
// Tool results are emitted as user messages in Claude JSONL and should be treated as assistant tool results.
if (raw.toolUseResult !== undefined) {
const toolUseResult = readRecord(raw.toolUseResult) ?? raw.toolUseResult;
const successValue = readBoolean((toolUseResult as Record<string, unknown>)?.success);
const status: 'success' | 'error' = successValue === false ? 'error' : 'success';
return [
createMessage({
...context,
timestamp,
type: status === 'success' ? 'tool_call_success' : 'tool_call_error',
status,
data: toolUseResult,
raw,
}),
];
}
const messageRecord = readRecord(raw.message);
const content = readArray(messageRecord?.content);
const textParts: string[] = [];
const images: string[] = [];
for (const contentBlock of content) {
const block = readRecord(contentBlock);
if (!block) {
continue;
}
if (readString(block.type) === 'text') {
const text = readString(block.text);
if (text) {
textParts.push(text);
}
}
if (readString(block.type) === 'image') {
const source = readRecord(block.source);
const data = readString(source?.data);
if (data) {
images.push(data);
}
}
}
if (!textParts.length && !images.length) {
return [];
}
return [
createMessage({
...context,
timestamp,
type: 'user_message',
text: textParts.join('\n'),
images: images.length ? images : undefined,
raw,
}),
];
}
return [];
}
/**
* Normalizes Codex payloads from SDK stream/history JSONL.
*/
function normalizeCodexPayload(
raw: Record<string, unknown>,
context: MessageContext,
): UnifiedChatMessage[] {
const sessionStatusMessage = normalizeSessionStatus(raw, context);
if (sessionStatusMessage) {
return [sessionStatusMessage];
}
const timestamp = readString(raw.timestamp) ?? context.timestamp;
const type = readString(raw.type);
if (type === 'error') {
return [
createMessage({
...context,
timestamp,
type: 'assistant_error_message',
text: readString(raw.message) ?? 'Codex stream error',
raw,
}),
];
}
if (type === 'event_msg') {
const payload = readRecord(raw.payload);
const payloadType = readString(payload?.type);
if (payloadType === 'user_message') {
const text = readString(payload?.message);
const localImages = readStringArray(payload?.local_images);
const remoteImages = readStringArray(payload?.images);
return [
createMessage({
...context,
timestamp,
type: 'user_message',
text,
images: [...localImages, ...remoteImages],
raw,
}),
];
}
if (payloadType === 'exec_command_end') {
const status = readString(payload?.status) === 'failed' ? 'error' : 'success';
return [
createMessage({
...context,
timestamp,
type: status === 'success' ? 'tool_call_success' : 'tool_call_error',
status,
toolName: 'shell_command',
toolCallId: readString(payload?.call_id),
data: payload,
raw,
}),
];
}
}
if (type === 'response_item') {
const payload = readRecord(raw.payload);
const payloadType = readString(payload?.type);
if (payloadType === 'reasoning') {
const summary = readArray(payload?.summary);
const summaryText = summary
.map((entry) => {
if (typeof entry === 'string') {
return entry;
}
const record = readRecord(entry);
return readString(record?.text) ?? readString(record?.summary) ?? '';
})
.filter((entry) => entry.length > 0)
.join('\n');
return [
createMessage({
...context,
timestamp,
type: 'thinking_message',
text: summaryText || 'Reasoning',
data: payload,
raw,
}),
];
}
if (payloadType === 'function_call') {
const toolName = readString(payload?.name);
const toolCallId = readString(payload?.call_id);
const argsText = readString(payload?.arguments);
const parsedArgs = parseJsonSafely(argsText) ?? argsText;
if (toolName === 'update_plan') {
return [
createMessage({
...context,
timestamp,
type: 'todo_task_list',
toolName,
toolCallId,
has_progress_indicator: true,
data: parsedArgs,
raw,
}),
];
}
return [
createMessage({
...context,
timestamp,
type: 'tool_use_request',
toolName,
toolCallId,
data: parsedArgs,
raw,
}),
];
}
if (payloadType === 'function_call_output') {
const output = readString(payload?.output) ?? '';
const status: 'success' | 'error' = /exit code:\s*0/i.test(output) ? 'success' : 'error';
return [
createMessage({
...context,
timestamp,
type: status === 'success' ? 'tool_call_success' : 'tool_call_error',
status,
toolCallId: readString(payload?.call_id),
text: output,
data: payload,
raw,
}),
];
}
if (payloadType === 'message') {
const role = readString(payload?.role);
const content = readArray(payload?.content);
const text = content
.map((entry) => {
const block = readRecord(entry);
return readString(block?.text) ?? '';
})
.filter(Boolean)
.join('\n');
if (role === 'user' && text.includes('<turn_aborted>')) {
return [
createMessage({
...context,
timestamp,
type: 'session_interrupted',
sessionStatus: 'SESSION_ABORTED',
text,
raw,
}),
];
}
return [
createMessage({
...context,
timestamp,
type: role === 'user' ? 'user_message' : 'assistant_message',
text,
data: payload,
raw,
}),
];
}
if (payloadType === 'error') {
return [
createMessage({
...context,
timestamp,
type: 'assistant_error_message',
text: readString(payload?.message) ?? 'Codex error',
data: payload,
raw,
}),
];
}
}
// SDK thread item-based events
const item = readRecord(raw.item);
if (!item) {
return [];
}
const itemType = readString(item.type);
if (itemType === 'reasoning') {
const text = readString(item.summary) ?? 'Reasoning';
return [createMessage({ ...context, timestamp, type: 'thinking_message', text, raw })];
}
if (itemType === 'error') {
return [
createMessage({
...context,
timestamp,
type: 'assistant_error_message',
text: readString(item.message) ?? 'Codex item error',
raw,
}),
];
}
if (itemType === 'todo_list') {
return [
createMessage({
...context,
timestamp,
type: 'todo_task_list',
has_progress_indicator: true,
data: item,
raw,
}),
];
}
if (itemType === 'agent_message') {
return [
createMessage({
...context,
timestamp,
type: 'assistant_message',
text: readString(item.message) ?? '',
raw,
}),
];
}
return [];
}
/**
* Normalizes Gemini payloads from JSON history files and runtime stream chunks.
*/
function normalizeGeminiPayload(
raw: Record<string, unknown>,
context: MessageContext,
): UnifiedChatMessage[] {
const sessionStatusMessage = normalizeSessionStatus(raw, context);
if (sessionStatusMessage) {
return [sessionStatusMessage];
}
if (Array.isArray(raw.messages)) {
const messages: UnifiedChatMessage[] = [];
for (const message of raw.messages) {
const parsedMessage = readRecord(message);
if (!parsedMessage) {
continue;
}
messages.push(...normalizeGeminiPayload(parsedMessage, context));
}
return messages;
}
const timestamp = readString(raw.timestamp) ?? context.timestamp;
const type = readString(raw.type);
const unified: UnifiedChatMessage[] = [];
if (type === 'user') {
const text = readArray(raw.content)
.map((entry) => readString(readRecord(entry)?.text) ?? '')
.filter(Boolean)
.join('\n');
unified.push(createMessage({
...context,
timestamp,
type: 'user_message',
text,
raw,
}));
}
if (type === 'gemini') {
const assistantText = readString(raw.content) ?? '';
if (assistantText.length) {
unified.push(createMessage({
...context,
timestamp,
type: 'assistant_message',
text: assistantText,
raw,
}));
}
}
const thoughts = readArray(raw.thoughts);
for (const thought of thoughts) {
const thoughtRecord = readRecord(thought);
if (!thoughtRecord) {
continue;
}
const text = readString(thoughtRecord.description) ?? readString(thoughtRecord.subject) ?? 'Thinking';
unified.push(createMessage({
...context,
timestamp: readString(thoughtRecord.timestamp) ?? timestamp,
type: 'thinking_message',
text,
raw: thoughtRecord,
}));
}
const toolCalls = readArray(raw.toolCalls);
for (const toolCall of toolCalls) {
const toolRecord = readRecord(toolCall);
if (!toolRecord) {
continue;
}
const status = readString(toolRecord.status) === 'error' ? 'error' : 'success';
unified.push(createMessage({
...context,
timestamp: readString(toolRecord.timestamp) ?? timestamp,
type: status === 'success' ? 'tool_call_success' : 'tool_call_error',
status,
toolName: readString(toolRecord.displayName) ?? readString(toolRecord.name),
toolCallId: readString(toolRecord.id),
data: {
args: toolRecord.args,
result: toolRecord.result,
resultDisplay: toolRecord.resultDisplay,
},
raw: toolRecord,
}));
}
return unified;
}
/**
* Normalizes Cursor payloads from JSONL entries.
*/
function normalizeCursorPayload(
raw: Record<string, unknown>,
context: MessageContext,
): UnifiedChatMessage[] {
const sessionStatusMessage = normalizeSessionStatus(raw, context);
if (sessionStatusMessage) {
return [sessionStatusMessage];
}
const role = readString(raw.role);
const timestamp = readString(raw.timestamp) ?? context.timestamp;
const message = readRecord(raw.message);
const content = readArray(message?.content);
const normalized: UnifiedChatMessage[] = [];
if (role === 'user') {
const text = content
.map((entry) => readString(readRecord(entry)?.text) ?? '')
.filter(Boolean)
.join('\n');
const strippedText = stripCursorUserQueryTags(text);
if (!strippedText) {
return [];
}
return [
createMessage({
...context,
timestamp,
type: 'user_message',
text: strippedText,
raw,
}),
];
}
if (role !== 'assistant') {
return [];
}
for (const entry of content) {
const block = readRecord(entry);
if (!block) {
continue;
}
const blockType = readString(block.type);
if (blockType === 'text') {
const text = readString(block.text);
if (!text) {
continue;
}
normalized.push(createMessage({
...context,
timestamp,
type: 'assistant_message',
text,
raw: block,
}));
continue;
}
if (blockType === 'tool_use') {
const toolName = readString(block.name);
const input = block.input;
if (toolName === 'CreatePlan') {
normalized.push(createMessage({
...context,
timestamp,
type: 'todo_task_list',
toolName,
has_progress_indicator: false,
data: input,
raw: block,
}));
continue;
}
normalized.push(createMessage({
...context,
timestamp,
type: 'tool_call_success',
status: 'success',
toolName,
data: input,
raw: block,
}));
}
}
return normalized;
}
/**
* Maps shared session status payloads into unified session event message types.
*/
function normalizeSessionStatus(
raw: Record<string, unknown>,
context: MessageContext,
): UnifiedChatMessage | null {
const sessionStatus = readString(raw.sessionStatus);
if (!sessionStatus) {
return null;
}
if (sessionStatus === 'STARTED') {
return createMessage({
...context,
timestamp: readString(raw.timestamp) ?? context.timestamp,
type: 'session_started',
sessionStatus: 'STARTED',
raw,
});
}
if (sessionStatus === 'COMPLETED') {
return createMessage({
...context,
timestamp: readString(raw.timestamp) ?? context.timestamp,
type: 'session_completed',
sessionStatus: 'COMPLETED',
raw,
});
}
if (sessionStatus === 'SESSION_ABORTED') {
return createMessage({
...context,
timestamp: readString(raw.timestamp) ?? context.timestamp,
type: 'session_interrupted',
sessionStatus: 'SESSION_ABORTED',
raw,
});
}
return null;
}
/**
* Strips cursor `<user_query>...</user_query>` wrappers from user messages.
*/
function stripCursorUserQueryTags(value: string): string {
return value
.replace(/<user_query>/gi, '')
.replace(/<\/user_query>/gi, '')
.trim();
}
/**
* Creates one normalized message with defaults.
*/
function createMessage(input: Omit<UnifiedChatMessage, 'timestamp'> & { timestamp?: string }): UnifiedChatMessage {
return {
...input,
timestamp: input.timestamp ?? new Date().toISOString(),
};
}
/**
* Safe object record cast.
*/
function readRecord(value: unknown): Record<string, unknown> | null {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
/**
* Safe array cast.
*/
function readArray(value: unknown): unknown[] {
return Array.isArray(value) ? value : [];
}
/**
* Safe string parser.
*/
function readString(value: unknown): string | undefined {
if (typeof value !== 'string') {
return undefined;
}
const normalized = value.trim();
return normalized.length > 0 ? normalized : undefined;
}
/**
* Safe boolean parser.
*/
function readBoolean(value: unknown): boolean | undefined {
return typeof value === 'boolean' ? value : undefined;
}
/**
* Safe string-array parser.
*/
function readStringArray(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
return value.filter((entry): entry is string => typeof entry === 'string');
}
/**
* Best-effort JSON parse helper.
*/
function parseJsonSafely(value?: string): unknown {
if (!value) {
return null;
}
try {
return JSON.parse(value);
} catch {
return null;
}
}

View File

@@ -93,6 +93,10 @@ export abstract class AbstractProvider implements IProvider {
timestamp: new Date().toISOString(),
channel: 'system',
message: 'Session stop requested.',
data: {
sessionId,
sessionStatus: 'SESSION_ABORTED',
},
});
}
@@ -220,6 +224,16 @@ export abstract class AbstractProvider implements IProvider {
thinkingMode: input.thinkingMode,
});
this.appendEvent(session, {
timestamp: session.startedAt,
channel: 'system',
message: 'Session started.',
data: {
sessionId,
sessionStatus: 'STARTED',
},
});
return session;
}

View File

@@ -240,6 +240,15 @@ export abstract class BaseCliProvider extends AbstractProvider {
if (code === 0) {
this.updateSessionStatus(session, 'completed');
this.appendEvent(session, {
timestamp: new Date().toISOString(),
channel: 'system',
message: 'Session completed.',
data: {
sessionId: session.sessionId,
sessionStatus: 'COMPLETED',
},
});
return;
}

View File

@@ -13,6 +13,7 @@ import type { LLMProvider } from '@/shared/types/app.js';
type CreateSdkExecutionInput = StartSessionInput & {
sessionId: string;
isResume: boolean;
emitEvent?: (event: ProviderSessionEvent) => void;
};
type SdkExecution = {
@@ -86,6 +87,9 @@ export abstract class BaseSdkProvider extends AbstractProvider {
...input,
model: effectiveModel,
thinkingMode: effectiveThinking,
emitEvent: (event) => {
this.appendEvent(session, event);
},
});
} catch (error) {
const message = error instanceof Error ? error.message : 'Failed to start SDK session';
@@ -123,6 +127,15 @@ export abstract class BaseSdkProvider extends AbstractProvider {
if (session.status === 'running') {
this.updateSessionStatus(session, 'completed');
this.appendEvent(session, {
timestamp: new Date().toISOString(),
channel: 'system',
message: 'Session completed.',
data: {
sessionId: session.sessionId,
sessionStatus: 'COMPLETED',
},
});
}
} catch (error) {
const message = error instanceof Error ? error.message : 'Unknown SDK execution failure';

View File

@@ -18,6 +18,7 @@ import type {
type ClaudeExecutionInput = StartSessionInput & {
sessionId: string;
isResume: boolean;
emitEvent?: (event: ProviderSessionEvent) => void;
};
const CLAUDE_THINKING_LEVELS = new Set(['low', 'medium', 'high', 'max']);
@@ -52,6 +53,18 @@ type ClaudeUserPromptMessage = {
timestamp: string;
};
/**
* Safely reads one optional string value from unknown data.
*/
const readString = (value: unknown): string | undefined => {
if (typeof value !== 'string') {
return undefined;
}
const normalized = value.trim();
return normalized.length ? normalized : undefined;
};
/**
* Claude SDK provider implementation.
*/
@@ -97,7 +110,7 @@ export class ClaudeProvider extends BaseSdkProvider {
cwd: input.workspacePath,
model: input.model,
effort: this.resolveClaudeEffort(input.thinkingMode),
canUseTool: this.resolvePermissionHandler(input.runtimePermissionMode),
canUseTool: this.resolvePermissionHandler(input.runtimePermissionMode, input.emitEvent),
};
if (input.isResume) {
@@ -232,20 +245,59 @@ export class ClaudeProvider extends BaseSdkProvider {
/**
* Builds a runtime permission callback when explicit allow/deny is requested.
*/
private resolvePermissionHandler(mode?: RuntimePermissionMode): CanUseTool | undefined {
private resolvePermissionHandler(
mode?: RuntimePermissionMode,
emitEvent?: (event: ProviderSessionEvent) => void,
): CanUseTool | undefined {
if (!mode || mode === 'ask') {
return undefined;
}
if (mode === 'allow') {
return async () => ({ behavior: 'allow' });
return async (toolName, input, options) => {
const optionsRecord = options as Record<string, unknown>;
emitEvent?.({
timestamp: new Date().toISOString(),
channel: 'system',
message: `Tool permission requested for "${toolName}".`,
data: {
type: 'tool_use_request',
toolName,
input,
toolUseID: options.toolUseID,
title: readString(optionsRecord.title),
displayName: readString(optionsRecord.displayName),
description: readString(optionsRecord.description),
blockedPath: options.blockedPath,
},
});
return { behavior: 'allow' };
};
}
return async () => ({
behavior: 'deny',
message: 'Permission denied by runtime permission mode.',
interrupt: false,
});
return async (toolName, input, options) => {
const optionsRecord = options as Record<string, unknown>;
emitEvent?.({
timestamp: new Date().toISOString(),
channel: 'system',
message: `Tool permission denied for "${toolName}".`,
data: {
type: 'tool_use_request',
toolName,
input,
toolUseID: options.toolUseID,
title: readString(optionsRecord.title),
displayName: readString(optionsRecord.displayName),
description: readString(optionsRecord.description),
blockedPath: options.blockedPath,
},
});
return {
behavior: 'deny',
message: 'Permission denied by runtime permission mode.',
interrupt: false,
};
};
}
/**

View File

@@ -6,6 +6,7 @@ import { sessionsDb } from '@/shared/database/repositories/sessions.db.js';
import type { LLMProvider } from '@/shared/types/app.js';
import { AppError } from '@/shared/utils/app-error.js';
import { sessionIndexers } from '@/modules/llm/session-indexers/index.js';
import { llmMessagesUnifier, type UnifiedChatMessage } from '@/modules/llm/messages-unifier.service.js';
type SyncResult = {
processedByProvider: Record<LLMProvider, number>;
@@ -19,6 +20,7 @@ type SessionHistoryPayload = {
filePath: string;
fileType: 'jsonl' | 'json';
entries: unknown[];
messages: UnifiedChatMessage[];
};
const SESSION_ID_PATTERN = /^[a-zA-Z0-9._-]{1,120}$/;
@@ -223,6 +225,7 @@ export const llmSessionsService = {
const fileContent = await readFile(filePath, 'utf8');
const extension = path.extname(filePath).toLowerCase();
const isGeminiJson = session.provider === 'gemini' || extension === '.json';
const entries = isGeminiJson ? parseJson(fileContent) : parseJsonl(fileContent);
return {
sessionId: session.session_id,
@@ -230,7 +233,12 @@ export const llmSessionsService = {
workspacePath: session.workspace_path,
filePath,
fileType: isGeminiJson ? 'json' : 'jsonl',
entries: isGeminiJson ? parseJson(fileContent) : parseJsonl(fileContent),
entries,
messages: llmMessagesUnifier.normalizeHistoryEntries(
session.provider as LLMProvider,
session.session_id,
entries,
),
};
},
};