mirror of
https://github.com/siteboon/claudecodeui.git
synced 2026-05-25 02:35:54 +00:00
Merge branch 'main' into docs/add-read-me-for-provider-adding
This commit is contained in:
4
server/modules/providers/index.ts
Normal file
4
server/modules/providers/index.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
export { sessionSynchronizerService } from './services/session-synchronizer.service.js';
|
||||
|
||||
export { initializeSessionsWatcher } from './services/sessions-watcher.service.js';
|
||||
export { closeSessionsWatcher } from './services/sessions-watcher.service.js';
|
||||
@@ -0,0 +1,110 @@
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
|
||||
import { sessionsDb } from '@/modules/database/index.js';
|
||||
import {
|
||||
buildLookupMap,
|
||||
extractFirstValidJsonlData,
|
||||
findFilesRecursivelyCreatedAfter,
|
||||
normalizeSessionName,
|
||||
readFileTimestamps,
|
||||
} from '@/shared/utils.js';
|
||||
import type { IProviderSessionSynchronizer } from '@/shared/interfaces.js';
|
||||
|
||||
type ParsedSession = {
|
||||
sessionId: string;
|
||||
projectPath: string;
|
||||
sessionName?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Session indexer for Claude transcript artifacts.
|
||||
*/
|
||||
export class ClaudeSessionSynchronizer implements IProviderSessionSynchronizer {
|
||||
private readonly provider = 'claude' as const;
|
||||
private readonly claudeHome = path.join(os.homedir(), '.claude');
|
||||
|
||||
/**
|
||||
* Scans ~/.claude/projects and upserts discovered sessions into DB.
|
||||
*/
|
||||
async synchronize(since?: Date): Promise<number> {
|
||||
const nameMap = await buildLookupMap(path.join(this.claudeHome, 'history.jsonl'), 'sessionId', 'display');
|
||||
const files = await findFilesRecursivelyCreatedAfter(
|
||||
path.join(this.claudeHome, 'projects'),
|
||||
'.jsonl',
|
||||
since ?? null
|
||||
);
|
||||
|
||||
let processed = 0;
|
||||
for (const filePath of files) {
|
||||
const parsed = await this.processSessionFile(filePath, nameMap);
|
||||
if (!parsed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const timestamps = await readFileTimestamps(filePath);
|
||||
sessionsDb.createSession(
|
||||
parsed.sessionId,
|
||||
this.provider,
|
||||
parsed.projectPath,
|
||||
parsed.sessionName,
|
||||
timestamps.createdAt,
|
||||
timestamps.updatedAt,
|
||||
filePath
|
||||
);
|
||||
processed += 1;
|
||||
}
|
||||
|
||||
return processed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses and upserts one Claude session JSONL file.
|
||||
*/
|
||||
async synchronizeFile(filePath: string): Promise<string | null> {
|
||||
if (!filePath.endsWith('.jsonl')) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const nameMap = await buildLookupMap(path.join(this.claudeHome, 'history.jsonl'), 'sessionId', 'display');
|
||||
const parsed = await this.processSessionFile(filePath, nameMap);
|
||||
if (!parsed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const timestamps = await readFileTimestamps(filePath);
|
||||
return sessionsDb.createSession(
|
||||
parsed.sessionId,
|
||||
this.provider,
|
||||
parsed.projectPath,
|
||||
parsed.sessionName,
|
||||
timestamps.createdAt,
|
||||
timestamps.updatedAt,
|
||||
filePath
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts session metadata from one Claude JSONL session file.
|
||||
*/
|
||||
private async processSessionFile(
|
||||
filePath: string,
|
||||
nameMap: Map<string, string>
|
||||
): Promise<ParsedSession | null> {
|
||||
return extractFirstValidJsonlData(filePath, (rawData) => {
|
||||
const data = rawData as Record<string, unknown>;
|
||||
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : undefined;
|
||||
const projectPath = typeof data.cwd === 'string' ? data.cwd : undefined;
|
||||
|
||||
if (!sessionId || !projectPath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
projectPath,
|
||||
sessionName: normalizeSessionName(nameMap.get(sessionId), 'Untitled Claude Session'),
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,12 @@
|
||||
import { getSessionMessages } from '@/projects.js';
|
||||
import fs from 'node:fs';
|
||||
import fsp from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
import readline from 'node:readline';
|
||||
|
||||
import type { IProviderSessions } from '@/shared/interfaces.js';
|
||||
import type { AnyRecord, FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js';
|
||||
import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js';
|
||||
import { sessionsDb } from '@/modules/database/index.js';
|
||||
|
||||
const PROVIDER = 'claude';
|
||||
|
||||
@@ -15,17 +20,184 @@ type ClaudeToolResult = {
|
||||
type ClaudeHistoryResult =
|
||||
| AnyRecord[]
|
||||
| {
|
||||
messages?: AnyRecord[];
|
||||
total?: number;
|
||||
hasMore?: boolean;
|
||||
};
|
||||
messages?: AnyRecord[];
|
||||
total?: number;
|
||||
hasMore?: boolean;
|
||||
};
|
||||
|
||||
const loadClaudeSessionMessages = getSessionMessages as unknown as (
|
||||
projectName: string,
|
||||
type ClaudeHistoryMessagesResult =
|
||||
| AnyRecord[]
|
||||
| {
|
||||
messages: AnyRecord[];
|
||||
total: number;
|
||||
hasMore: boolean;
|
||||
offset?: number;
|
||||
limit?: number | null;
|
||||
};
|
||||
|
||||
async function parseAgentTools(filePath: string): Promise<AnyRecord[]> {
|
||||
const tools: AnyRecord[] = [];
|
||||
|
||||
try {
|
||||
const fileStream = fs.createReadStream(filePath);
|
||||
const rl = readline.createInterface({
|
||||
input: fileStream,
|
||||
crlfDelay: Infinity,
|
||||
});
|
||||
|
||||
for await (const line of rl) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const entry = JSON.parse(line) as AnyRecord;
|
||||
|
||||
if (entry.message?.role === 'assistant' && Array.isArray(entry.message?.content)) {
|
||||
for (const part of entry.message.content as AnyRecord[]) {
|
||||
if (part.type === 'tool_use') {
|
||||
tools.push({
|
||||
toolId: part.id,
|
||||
toolName: part.name,
|
||||
toolInput: part.input,
|
||||
timestamp: entry.timestamp,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.message?.role === 'user' && Array.isArray(entry.message?.content)) {
|
||||
for (const part of entry.message.content as AnyRecord[]) {
|
||||
if (part.type !== 'tool_result') {
|
||||
continue;
|
||||
}
|
||||
|
||||
const tool = tools.find((candidate) => candidate.toolId === part.tool_use_id);
|
||||
if (!tool) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tool.toolResult = {
|
||||
content: typeof part.content === 'string'
|
||||
? part.content
|
||||
: Array.isArray(part.content)
|
||||
? part.content
|
||||
.map((contentPart: AnyRecord) => contentPart?.text || '')
|
||||
.join('\n')
|
||||
: JSON.stringify(part.content),
|
||||
isError: Boolean(part.is_error),
|
||||
};
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Skip malformed lines that can happen during concurrent writes.
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.warn(`Error parsing agent file ${filePath}:`, message);
|
||||
}
|
||||
|
||||
return tools;
|
||||
}
|
||||
|
||||
async function getSessionMessages(
|
||||
sessionId: string,
|
||||
limit: number | null,
|
||||
offset: number,
|
||||
) => Promise<ClaudeHistoryResult>;
|
||||
): Promise<ClaudeHistoryMessagesResult> {
|
||||
try {
|
||||
const jsonLPath = sessionsDb.getSessionById(sessionId)?.jsonl_path;
|
||||
|
||||
if (!jsonLPath) {
|
||||
return { messages: [], total: 0, hasMore: false };
|
||||
}
|
||||
|
||||
const projectDir = path.dirname(jsonLPath);
|
||||
const files = await fsp.readdir(projectDir);
|
||||
const agentFiles = files.filter((file) => file.endsWith('.jsonl') && file.startsWith('agent-'));
|
||||
|
||||
const messages: AnyRecord[] = [];
|
||||
const agentToolsCache = new Map<string, AnyRecord[]>();
|
||||
|
||||
const fileStream = fs.createReadStream(jsonLPath);
|
||||
const rl = readline.createInterface({
|
||||
input: fileStream,
|
||||
crlfDelay: Infinity,
|
||||
});
|
||||
|
||||
for await (const line of rl) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const entry = JSON.parse(line) as AnyRecord;
|
||||
if (entry.sessionId === sessionId) {
|
||||
messages.push(entry);
|
||||
}
|
||||
} catch {
|
||||
// Skip malformed JSONL lines that can happen during concurrent writes.
|
||||
}
|
||||
}
|
||||
|
||||
const agentIds = new Set<string>();
|
||||
for (const message of messages) {
|
||||
const agentId = message.toolUseResult?.agentId;
|
||||
if (agentId) {
|
||||
agentIds.add(String(agentId));
|
||||
}
|
||||
}
|
||||
|
||||
for (const agentId of agentIds) {
|
||||
const agentFileName = `agent-${agentId}.jsonl`;
|
||||
if (!agentFiles.includes(agentFileName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const agentFilePath = path.join(projectDir, agentFileName);
|
||||
const tools = await parseAgentTools(agentFilePath);
|
||||
agentToolsCache.set(agentId, tools);
|
||||
}
|
||||
|
||||
for (const message of messages) {
|
||||
const agentId = message.toolUseResult?.agentId;
|
||||
if (!agentId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const agentTools = agentToolsCache.get(String(agentId));
|
||||
if (agentTools && agentTools.length > 0) {
|
||||
message.subagentTools = agentTools;
|
||||
}
|
||||
}
|
||||
|
||||
const sortedMessages = messages.sort(
|
||||
(a, b) => new Date(a.timestamp || 0).getTime() - new Date(b.timestamp || 0).getTime(),
|
||||
);
|
||||
const total = sortedMessages.length;
|
||||
|
||||
if (limit === null) {
|
||||
return sortedMessages;
|
||||
}
|
||||
|
||||
const startIndex = Math.max(0, total - offset - limit);
|
||||
const endIndex = total - offset;
|
||||
const paginatedMessages = sortedMessages.slice(startIndex, endIndex);
|
||||
const hasMore = startIndex > 0;
|
||||
|
||||
return {
|
||||
messages: paginatedMessages,
|
||||
total,
|
||||
hasMore,
|
||||
offset,
|
||||
limit,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error(`Error reading messages for session ${sessionId}:`, error);
|
||||
return limit === null ? [] : { messages: [], total: 0, hasMore: false };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Claude writes internal command and system reminder entries into history.
|
||||
@@ -238,14 +410,11 @@ export class ClaudeSessionsProvider implements IProviderSessions {
|
||||
sessionId: string,
|
||||
options: FetchHistoryOptions = {},
|
||||
): Promise<FetchHistoryResult> {
|
||||
const { projectName, limit = null, offset = 0 } = options;
|
||||
if (!projectName) {
|
||||
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
|
||||
}
|
||||
const { limit = null, offset = 0 } = options;
|
||||
|
||||
let result: ClaudeHistoryResult;
|
||||
try {
|
||||
result = await loadClaudeSessionMessages(projectName, sessionId, limit, offset);
|
||||
result = await getSessionMessages(sessionId, limit, offset);
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.warn(`[ClaudeProvider] Failed to load session ${sessionId}:`, message);
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js';
|
||||
import { ClaudeProviderAuth } from '@/modules/providers/list/claude/claude-auth.provider.js';
|
||||
import { ClaudeMcpProvider } from '@/modules/providers/list/claude/claude-mcp.provider.js';
|
||||
import { ClaudeSessionSynchronizer } from '@/modules/providers/list/claude/claude-session-synchronizer.provider.js';
|
||||
import { ClaudeSessionsProvider } from '@/modules/providers/list/claude/claude-sessions.provider.js';
|
||||
import type { IProviderAuth, IProviderSessions } from '@/shared/interfaces.js';
|
||||
import type { IProviderAuth, IProviderSessionSynchronizer, IProviderSessions } from '@/shared/interfaces.js';
|
||||
|
||||
export class ClaudeProvider extends AbstractProvider {
|
||||
readonly mcp = new ClaudeMcpProvider();
|
||||
readonly auth: IProviderAuth = new ClaudeProviderAuth();
|
||||
readonly sessions: IProviderSessions = new ClaudeSessionsProvider();
|
||||
readonly sessionSynchronizer: IProviderSessionSynchronizer = new ClaudeSessionSynchronizer();
|
||||
|
||||
constructor() {
|
||||
super('claude');
|
||||
|
||||
@@ -0,0 +1,119 @@
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
|
||||
import { sessionsDb } from '@/modules/database/index.js';
|
||||
import {
|
||||
buildLookupMap,
|
||||
extractFirstValidJsonlData,
|
||||
findFilesRecursivelyCreatedAfter,
|
||||
normalizeSessionName,
|
||||
readFileTimestamps,
|
||||
} from '@/shared/utils.js';
|
||||
import type { IProviderSessionSynchronizer } from '@/shared/interfaces.js';
|
||||
|
||||
type ParsedSession = {
|
||||
sessionId: string;
|
||||
projectPath: string;
|
||||
sessionName?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Session indexer for Codex transcript artifacts.
|
||||
*/
|
||||
export class CodexSessionSynchronizer implements IProviderSessionSynchronizer {
|
||||
private readonly provider = 'codex' as const;
|
||||
private readonly codexHome = path.join(os.homedir(), '.codex');
|
||||
|
||||
/**
|
||||
* Scans ~/.codex/sessions and upserts discovered sessions into DB.
|
||||
*/
|
||||
async synchronize(since?: Date): Promise<number> {
|
||||
const nameMap = await buildLookupMap(path.join(this.codexHome, 'session_index.jsonl'), 'id', 'thread_name');
|
||||
const files = await findFilesRecursivelyCreatedAfter(
|
||||
path.join(this.codexHome, 'sessions'),
|
||||
'.jsonl',
|
||||
since ?? null
|
||||
);
|
||||
|
||||
let processed = 0;
|
||||
for (const filePath of files) {
|
||||
const parsed = await this.processSessionFile(filePath, nameMap);
|
||||
if (!parsed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const existingSession = sessionsDb.getSessionById(parsed.sessionId);
|
||||
if (existingSession) {
|
||||
// If session name is untitled and we now have a name, update it
|
||||
if (existingSession.custom_name === 'Untitled Codex Session' && parsed.sessionName && parsed.sessionName !== 'Untitled Codex Session') {
|
||||
sessionsDb.updateSessionCustomName(parsed.sessionId, parsed.sessionName);
|
||||
}
|
||||
}
|
||||
|
||||
const timestamps = await readFileTimestamps(filePath);
|
||||
sessionsDb.createSession(
|
||||
parsed.sessionId,
|
||||
this.provider,
|
||||
parsed.projectPath,
|
||||
parsed.sessionName,
|
||||
timestamps.createdAt,
|
||||
timestamps.updatedAt,
|
||||
filePath
|
||||
);
|
||||
processed += 1;
|
||||
}
|
||||
|
||||
return processed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses and upserts one Codex session JSONL file.
|
||||
*/
|
||||
async synchronizeFile(filePath: string): Promise<string | null> {
|
||||
if (!filePath.endsWith('.jsonl')) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const nameMap = await buildLookupMap(path.join(this.codexHome, 'session_index.jsonl'), 'id', 'thread_name');
|
||||
const parsed = await this.processSessionFile(filePath, nameMap);
|
||||
if (!parsed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const timestamps = await readFileTimestamps(filePath);
|
||||
return sessionsDb.createSession(
|
||||
parsed.sessionId,
|
||||
this.provider,
|
||||
parsed.projectPath,
|
||||
parsed.sessionName,
|
||||
timestamps.createdAt,
|
||||
timestamps.updatedAt,
|
||||
filePath
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts session metadata from one Codex JSONL session file.
|
||||
*/
|
||||
private async processSessionFile(
|
||||
filePath: string,
|
||||
nameMap: Map<string, string>
|
||||
): Promise<ParsedSession | null> {
|
||||
return extractFirstValidJsonlData(filePath, (rawData) => {
|
||||
const data = rawData as Record<string, unknown>;
|
||||
const payload = data.payload as Record<string, unknown> | undefined;
|
||||
const sessionId = typeof payload?.id === 'string' ? payload.id : undefined;
|
||||
const projectPath = typeof payload?.cwd === 'string' ? payload.cwd : undefined;
|
||||
|
||||
if (!sessionId || !projectPath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
projectPath,
|
||||
sessionName: normalizeSessionName(nameMap.get(sessionId), 'Untitled Codex Session'),
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,7 @@
|
||||
import { getCodexSessionMessages } from '@/projects.js';
|
||||
import fsSync from 'node:fs';
|
||||
import readline from 'node:readline';
|
||||
|
||||
import { sessionsDb } from '@/modules/database/index.js';
|
||||
import type { IProviderSessions } from '@/shared/interfaces.js';
|
||||
import type { AnyRecord, FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js';
|
||||
import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js';
|
||||
@@ -11,14 +14,250 @@ type CodexHistoryResult =
|
||||
messages?: AnyRecord[];
|
||||
total?: number;
|
||||
hasMore?: boolean;
|
||||
offset?: number;
|
||||
limit?: number | null;
|
||||
tokenUsage?: unknown;
|
||||
};
|
||||
|
||||
const loadCodexSessionMessages = getCodexSessionMessages as unknown as (
|
||||
function isVisibleCodexUserMessage(payload: AnyRecord | null | undefined): boolean {
|
||||
if (!payload || payload.type !== 'user_message') {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (payload.kind && payload.kind !== 'plain') {
|
||||
return false;
|
||||
}
|
||||
|
||||
return typeof payload.message === 'string' && payload.message.trim().length > 0;
|
||||
}
|
||||
|
||||
function extractCodexTextContent(content: unknown): string {
|
||||
if (!Array.isArray(content)) {
|
||||
return typeof content === 'string' ? content : '';
|
||||
}
|
||||
|
||||
return content
|
||||
.map((item) => {
|
||||
if (!item || typeof item !== 'object') {
|
||||
return '';
|
||||
}
|
||||
|
||||
const record = item as AnyRecord;
|
||||
if (
|
||||
(record.type === 'input_text' || record.type === 'output_text' || record.type === 'text')
|
||||
&& typeof record.text === 'string'
|
||||
) {
|
||||
return record.text;
|
||||
}
|
||||
|
||||
return '';
|
||||
})
|
||||
.filter(Boolean)
|
||||
.join('\n');
|
||||
}
|
||||
|
||||
async function getCodexSessionMessages(
|
||||
sessionId: string,
|
||||
limit: number | null,
|
||||
offset: number,
|
||||
) => Promise<CodexHistoryResult>;
|
||||
limit: number | null = null,
|
||||
offset = 0,
|
||||
): Promise<CodexHistoryResult> {
|
||||
try {
|
||||
const sessionFilePath = sessionsDb.getSessionById(sessionId)?.jsonl_path;
|
||||
|
||||
if (!sessionFilePath) {
|
||||
console.warn(`Codex session file not found for session ${sessionId}`);
|
||||
return { messages: [], total: 0, hasMore: false };
|
||||
}
|
||||
|
||||
const messages: AnyRecord[] = [];
|
||||
let tokenUsage: AnyRecord | null = null;
|
||||
const fileStream = fsSync.createReadStream(sessionFilePath);
|
||||
const rl = readline.createInterface({
|
||||
input: fileStream,
|
||||
crlfDelay: Infinity,
|
||||
});
|
||||
|
||||
for await (const line of rl) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const entry = JSON.parse(line) as AnyRecord;
|
||||
|
||||
if (entry.type === 'event_msg' && entry.payload?.type === 'token_count' && entry.payload?.info) {
|
||||
const info = entry.payload.info as AnyRecord;
|
||||
if (info.total_token_usage) {
|
||||
const usage = info.total_token_usage as AnyRecord;
|
||||
tokenUsage = {
|
||||
used: usage.total_tokens || 0,
|
||||
total: info.model_context_window || 200000,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.type === 'event_msg' && isVisibleCodexUserMessage(entry.payload as AnyRecord)) {
|
||||
messages.push({
|
||||
type: 'user',
|
||||
timestamp: entry.timestamp,
|
||||
message: {
|
||||
role: 'user',
|
||||
content: entry.payload.message,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
entry.type === 'response_item' &&
|
||||
entry.payload?.type === 'message' &&
|
||||
entry.payload.role === 'assistant'
|
||||
) {
|
||||
const textContent = extractCodexTextContent(entry.payload.content);
|
||||
if (textContent.trim()) {
|
||||
messages.push({
|
||||
type: 'assistant',
|
||||
timestamp: entry.timestamp,
|
||||
message: {
|
||||
role: 'assistant',
|
||||
content: textContent,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.type === 'response_item' && entry.payload?.type === 'reasoning') {
|
||||
const summaryText = Array.isArray(entry.payload.summary)
|
||||
? entry.payload.summary
|
||||
.map((item: AnyRecord) => item?.text)
|
||||
.filter(Boolean)
|
||||
.join('\n')
|
||||
: '';
|
||||
|
||||
if (summaryText.trim()) {
|
||||
messages.push({
|
||||
type: 'thinking',
|
||||
timestamp: entry.timestamp,
|
||||
message: {
|
||||
role: 'assistant',
|
||||
content: summaryText,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.type === 'response_item' && entry.payload?.type === 'function_call') {
|
||||
let toolName = entry.payload.name;
|
||||
let toolInput = entry.payload.arguments;
|
||||
|
||||
if (toolName === 'shell_command') {
|
||||
toolName = 'Bash';
|
||||
try {
|
||||
const args = JSON.parse(entry.payload.arguments) as AnyRecord;
|
||||
toolInput = JSON.stringify({ command: args.command });
|
||||
} catch {
|
||||
// Keep original arguments when parsing fails.
|
||||
}
|
||||
}
|
||||
|
||||
messages.push({
|
||||
type: 'tool_use',
|
||||
timestamp: entry.timestamp,
|
||||
toolName,
|
||||
toolInput,
|
||||
toolCallId: entry.payload.call_id,
|
||||
});
|
||||
}
|
||||
|
||||
if (entry.type === 'response_item' && entry.payload?.type === 'function_call_output') {
|
||||
messages.push({
|
||||
type: 'tool_result',
|
||||
timestamp: entry.timestamp,
|
||||
toolCallId: entry.payload.call_id,
|
||||
output: entry.payload.output,
|
||||
});
|
||||
}
|
||||
|
||||
if (entry.type === 'response_item' && entry.payload?.type === 'custom_tool_call') {
|
||||
const toolName = entry.payload.name || 'custom_tool';
|
||||
const input = entry.payload.input || '';
|
||||
|
||||
if (toolName === 'apply_patch') {
|
||||
const fileMatch = String(input).match(/\*\*\* Update File: (.+)/);
|
||||
const filePath = fileMatch ? fileMatch[1].trim() : 'unknown';
|
||||
const lines = String(input).split('\n');
|
||||
const oldLines: string[] = [];
|
||||
const newLines: string[] = [];
|
||||
|
||||
for (const lineContent of lines) {
|
||||
if (lineContent.startsWith('-') && !lineContent.startsWith('---')) {
|
||||
oldLines.push(lineContent.slice(1));
|
||||
} else if (lineContent.startsWith('+') && !lineContent.startsWith('+++')) {
|
||||
newLines.push(lineContent.slice(1));
|
||||
}
|
||||
}
|
||||
|
||||
messages.push({
|
||||
type: 'tool_use',
|
||||
timestamp: entry.timestamp,
|
||||
toolName: 'Edit',
|
||||
toolInput: JSON.stringify({
|
||||
file_path: filePath,
|
||||
old_string: oldLines.join('\n'),
|
||||
new_string: newLines.join('\n'),
|
||||
}),
|
||||
toolCallId: entry.payload.call_id,
|
||||
});
|
||||
} else {
|
||||
messages.push({
|
||||
type: 'tool_use',
|
||||
timestamp: entry.timestamp,
|
||||
toolName,
|
||||
toolInput: input,
|
||||
toolCallId: entry.payload.call_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.type === 'response_item' && entry.payload?.type === 'custom_tool_call_output') {
|
||||
messages.push({
|
||||
type: 'tool_result',
|
||||
timestamp: entry.timestamp,
|
||||
toolCallId: entry.payload.call_id,
|
||||
output: entry.payload.output || '',
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// Skip malformed lines.
|
||||
}
|
||||
}
|
||||
|
||||
messages.sort(
|
||||
(a, b) => new Date(a.timestamp || 0).getTime() - new Date(b.timestamp || 0).getTime(),
|
||||
);
|
||||
const total = messages.length;
|
||||
|
||||
if (limit !== null) {
|
||||
const startIndex = Math.max(0, total - offset - limit);
|
||||
const endIndex = total - offset;
|
||||
const paginatedMessages = messages.slice(startIndex, endIndex);
|
||||
const hasMore = startIndex > 0;
|
||||
|
||||
return {
|
||||
messages: paginatedMessages,
|
||||
total,
|
||||
hasMore,
|
||||
offset,
|
||||
limit,
|
||||
tokenUsage,
|
||||
};
|
||||
}
|
||||
|
||||
return { messages, tokenUsage };
|
||||
} catch (error) {
|
||||
console.error(`Error reading Codex session messages for ${sessionId}:`, error);
|
||||
return { messages: [], total: 0, hasMore: false };
|
||||
}
|
||||
}
|
||||
|
||||
export class CodexSessionsProvider implements IProviderSessions {
|
||||
/**
|
||||
@@ -31,6 +270,23 @@ export class CodexSessionsProvider implements IProviderSessions {
|
||||
const ts = raw.timestamp || new Date().toISOString();
|
||||
const baseId = raw.uuid || generateMessageId('codex');
|
||||
|
||||
if (raw.type === 'thinking' || raw.isReasoning) {
|
||||
const thinkingContent = typeof raw.message?.content === 'string'
|
||||
? raw.message.content
|
||||
: '';
|
||||
if (!thinkingContent.trim()) {
|
||||
return [];
|
||||
}
|
||||
return [createNormalizedMessage({
|
||||
id: baseId,
|
||||
sessionId,
|
||||
timestamp: ts,
|
||||
provider: PROVIDER,
|
||||
kind: 'thinking',
|
||||
content: thinkingContent,
|
||||
})];
|
||||
}
|
||||
|
||||
if (raw.message?.role === 'user') {
|
||||
const content = typeof raw.message.content === 'string'
|
||||
? raw.message.content
|
||||
@@ -77,17 +333,6 @@ export class CodexSessionsProvider implements IProviderSessions {
|
||||
})];
|
||||
}
|
||||
|
||||
if (raw.type === 'thinking' || raw.isReasoning) {
|
||||
return [createNormalizedMessage({
|
||||
id: baseId,
|
||||
sessionId,
|
||||
timestamp: ts,
|
||||
provider: PROVIDER,
|
||||
kind: 'thinking',
|
||||
content: raw.message?.content || '',
|
||||
})];
|
||||
}
|
||||
|
||||
if (raw.type === 'tool_use' || raw.toolName) {
|
||||
return [createNormalizedMessage({
|
||||
id: baseId,
|
||||
@@ -275,7 +520,7 @@ export class CodexSessionsProvider implements IProviderSessions {
|
||||
|
||||
let result: CodexHistoryResult;
|
||||
try {
|
||||
result = await loadCodexSessionMessages(sessionId, limit, offset);
|
||||
result = await getCodexSessionMessages(sessionId, limit, offset);
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.warn(`[CodexProvider] Failed to load session ${sessionId}:`, message);
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js';
|
||||
import { CodexProviderAuth } from '@/modules/providers/list/codex/codex-auth.provider.js';
|
||||
import { CodexMcpProvider } from '@/modules/providers/list/codex/codex-mcp.provider.js';
|
||||
import { CodexSessionSynchronizer } from '@/modules/providers/list/codex/codex-session-synchronizer.provider.js';
|
||||
import { CodexSessionsProvider } from '@/modules/providers/list/codex/codex-sessions.provider.js';
|
||||
import type { IProviderAuth, IProviderSessions } from '@/shared/interfaces.js';
|
||||
import type { IProviderAuth, IProviderSessionSynchronizer, IProviderSessions } from '@/shared/interfaces.js';
|
||||
|
||||
export class CodexProvider extends AbstractProvider {
|
||||
readonly mcp = new CodexMcpProvider();
|
||||
readonly auth: IProviderAuth = new CodexProviderAuth();
|
||||
readonly sessions: IProviderSessions = new CodexSessionsProvider();
|
||||
readonly sessionSynchronizer: IProviderSessionSynchronizer = new CodexSessionSynchronizer();
|
||||
|
||||
constructor() {
|
||||
super('codex');
|
||||
|
||||
@@ -0,0 +1,176 @@
|
||||
import crypto from 'node:crypto';
|
||||
import fs from 'node:fs';
|
||||
import fsp from 'node:fs/promises';
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
import readline from 'node:readline';
|
||||
|
||||
import { sessionsDb } from '@/modules/database/index.js';
|
||||
import {
|
||||
extractFirstValidJsonlData,
|
||||
findFilesRecursivelyCreatedAfter,
|
||||
normalizeSessionName,
|
||||
readFileTimestamps,
|
||||
} from '@/shared/utils.js';
|
||||
import type { IProviderSessionSynchronizer } from '@/shared/interfaces.js';
|
||||
|
||||
type ParsedSession = {
|
||||
sessionId: string;
|
||||
projectPath: string;
|
||||
sessionName?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns directory entries or an empty list when the folder is missing.
|
||||
*/
|
||||
async function listDirectoryEntriesSafe(
|
||||
directoryPath: string
|
||||
): Promise<import('node:fs').Dirent[]> {
|
||||
try {
|
||||
return await fsp.readdir(directoryPath, { withFileTypes: true });
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Session indexer for Cursor transcript artifacts.
|
||||
*/
|
||||
export class CursorSessionSynchronizer implements IProviderSessionSynchronizer {
|
||||
private readonly provider = 'cursor' as const;
|
||||
private readonly cursorHome = path.join(os.homedir(), '.cursor');
|
||||
|
||||
/**
|
||||
* Scans Cursor chats and upserts discovered sessions into DB.
|
||||
*/
|
||||
async synchronize(since?: Date): Promise<number> {
|
||||
const projectsDir = path.join(this.cursorHome, 'projects');
|
||||
const projectEntries = await listDirectoryEntriesSafe(projectsDir);
|
||||
const seenProjectPaths = new Set<string>();
|
||||
|
||||
let processed = 0;
|
||||
for (const entry of projectEntries) {
|
||||
if (!entry.isDirectory()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const workerLogPath = path.join(projectsDir, entry.name, 'worker.log');
|
||||
const projectPath = await this.extractProjectPathFromWorkerLog(workerLogPath);
|
||||
if (!projectPath || seenProjectPaths.has(projectPath)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
seenProjectPaths.add(projectPath);
|
||||
const projectHash = this.md5(projectPath);
|
||||
const chatsDir = path.join(this.cursorHome, 'chats', projectHash);
|
||||
const files = await findFilesRecursivelyCreatedAfter(chatsDir, '.jsonl', since ?? null);
|
||||
|
||||
for (const filePath of files) {
|
||||
const parsed = await this.processSessionFile(filePath);
|
||||
if (!parsed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const timestamps = await readFileTimestamps(filePath);
|
||||
sessionsDb.createSession(
|
||||
parsed.sessionId,
|
||||
this.provider,
|
||||
parsed.projectPath,
|
||||
parsed.sessionName,
|
||||
timestamps.createdAt,
|
||||
timestamps.updatedAt,
|
||||
filePath
|
||||
);
|
||||
processed += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return processed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses and upserts one Cursor session JSONL file.
|
||||
*/
|
||||
async synchronizeFile(filePath: string): Promise<string | null> {
|
||||
if (!filePath.endsWith('.jsonl')) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const parsed = await this.processSessionFile(filePath);
|
||||
if (!parsed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const timestamps = await readFileTimestamps(filePath);
|
||||
return sessionsDb.createSession(
|
||||
parsed.sessionId,
|
||||
this.provider,
|
||||
parsed.projectPath,
|
||||
parsed.sessionName,
|
||||
timestamps.createdAt,
|
||||
timestamps.updatedAt,
|
||||
filePath
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Produces the same project hash Cursor uses in chat directory names.
|
||||
*/
|
||||
private md5(input: string): string {
|
||||
return crypto.createHash('md5').update(input).digest('hex');
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts project path from Cursor worker.log.
|
||||
*/
|
||||
private async extractProjectPathFromWorkerLog(filePath: string): Promise<string | null> {
|
||||
try {
|
||||
const fileStream = fs.createReadStream(filePath, { encoding: 'utf8' });
|
||||
const lineReader = readline.createInterface({ input: fileStream, crlfDelay: Infinity });
|
||||
|
||||
for await (const line of lineReader) {
|
||||
const match = line.match(/workspacePath=(.*)$/);
|
||||
const projectPath = match?.[1]?.trim();
|
||||
if (projectPath) {
|
||||
lineReader.close();
|
||||
fileStream.close();
|
||||
return projectPath;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Missing worker logs are valid for partial or incomplete session data.
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts session metadata from one Cursor JSONL session file.
|
||||
*/
|
||||
private async processSessionFile(filePath: string): Promise<ParsedSession | null> {
|
||||
const sessionId = path.basename(filePath, '.jsonl');
|
||||
const grandparentDir = path.dirname(path.dirname(filePath));
|
||||
const workerLogPath = path.join(grandparentDir, 'worker.log');
|
||||
const projectPath = await this.extractProjectPathFromWorkerLog(workerLogPath);
|
||||
|
||||
if (!projectPath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return extractFirstValidJsonlData(filePath, (rawData) => {
|
||||
const data = rawData as Record<string, any>;
|
||||
if (data.role !== 'user') {
|
||||
return null;
|
||||
}
|
||||
|
||||
const text = typeof data.message?.content?.[0]?.text === 'string' ? data.message.content[0].text : '';
|
||||
const firstLine = text.replace(/<\/?user_query>/g, '').trim().split('\n')[0];
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
projectPath,
|
||||
sessionName: normalizeSessionName(firstLine, 'Untitled Cursor Session'),
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,15 @@
|
||||
import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js';
|
||||
import { CursorProviderAuth } from '@/modules/providers/list/cursor/cursor-auth.provider.js';
|
||||
import { CursorMcpProvider } from '@/modules/providers/list/cursor/cursor-mcp.provider.js';
|
||||
import { CursorSessionSynchronizer } from '@/modules/providers/list/cursor/cursor-session-synchronizer.provider.js';
|
||||
import { CursorSessionsProvider } from '@/modules/providers/list/cursor/cursor-sessions.provider.js';
|
||||
import type { IProviderAuth, IProviderSessions } from '@/shared/interfaces.js';
|
||||
import type { IProviderAuth, IProviderSessionSynchronizer, IProviderSessions } from '@/shared/interfaces.js';
|
||||
|
||||
export class CursorProvider extends AbstractProvider {
|
||||
readonly mcp = new CursorMcpProvider();
|
||||
readonly auth: IProviderAuth = new CursorProviderAuth();
|
||||
readonly sessions: IProviderSessions = new CursorSessionsProvider();
|
||||
readonly sessionSynchronizer: IProviderSessionSynchronizer = new CursorSessionSynchronizer();
|
||||
|
||||
constructor() {
|
||||
super('cursor');
|
||||
|
||||
@@ -0,0 +1,401 @@
|
||||
import crypto from 'node:crypto';
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
import { readFile } from 'node:fs/promises';
|
||||
|
||||
import { projectsDb, sessionsDb } from '@/modules/database/index.js';
|
||||
import {
|
||||
findFilesRecursivelyCreatedAfter,
|
||||
normalizeProjectPath,
|
||||
normalizeSessionName,
|
||||
readFileTimestamps,
|
||||
} from '@/shared/utils.js';
|
||||
import type { IProviderSessionSynchronizer } from '@/shared/interfaces.js';
|
||||
import type { AnyRecord } from '@/shared/types.js';
|
||||
|
||||
type ParsedSession = {
|
||||
sessionId: string;
|
||||
projectPath: string;
|
||||
sessionName?: string;
|
||||
};
|
||||
|
||||
type GeminiJsonlMetadata = {
|
||||
sessionId: string;
|
||||
projectPath?: string;
|
||||
projectHash?: string;
|
||||
firstUserMessage?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Session indexer for Gemini transcript artifacts.
|
||||
*/
|
||||
export class GeminiSessionSynchronizer implements IProviderSessionSynchronizer {
|
||||
private readonly provider = 'gemini' as const;
|
||||
private readonly geminiHome = path.join(os.homedir(), '.gemini');
|
||||
|
||||
/**
|
||||
* Scans Gemini legacy JSON and new JSONL artifacts and upserts sessions into DB.
|
||||
*/
|
||||
async synchronize(since?: Date): Promise<number> {
|
||||
const projectHashLookup = this.buildProjectHashLookup();
|
||||
|
||||
const legacySessionFiles = await findFilesRecursivelyCreatedAfter(
|
||||
path.join(this.geminiHome, 'sessions'),
|
||||
'.json',
|
||||
since ?? null
|
||||
);
|
||||
const legacyTempFiles = await findFilesRecursivelyCreatedAfter(
|
||||
path.join(this.geminiHome, 'tmp'),
|
||||
'.json',
|
||||
since ?? null
|
||||
);
|
||||
const jsonlSessionFiles = await findFilesRecursivelyCreatedAfter(
|
||||
path.join(this.geminiHome, 'sessions'),
|
||||
'.jsonl',
|
||||
since ?? null
|
||||
);
|
||||
const jsonlTempFiles = await findFilesRecursivelyCreatedAfter(
|
||||
path.join(this.geminiHome, 'tmp'),
|
||||
'.jsonl',
|
||||
since ?? null
|
||||
);
|
||||
|
||||
// Process legacy JSON first, then JSONL. If both exist for a session id,
|
||||
// the JSONL artifact becomes the canonical jsonl_path via upsert.
|
||||
const files = [
|
||||
...legacySessionFiles,
|
||||
...legacyTempFiles,
|
||||
...jsonlSessionFiles,
|
||||
...jsonlTempFiles,
|
||||
];
|
||||
|
||||
let processed = 0;
|
||||
for (const filePath of files) {
|
||||
if (this.shouldSkipTempArtifact(filePath)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const parsed = filePath.endsWith('.jsonl')
|
||||
? await this.processJsonlSessionFile(filePath, projectHashLookup)
|
||||
: await this.processLegacySessionFile(filePath);
|
||||
if (!parsed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const timestamps = await readFileTimestamps(filePath);
|
||||
sessionsDb.createSession(
|
||||
parsed.sessionId,
|
||||
this.provider,
|
||||
parsed.projectPath,
|
||||
parsed.sessionName,
|
||||
timestamps.createdAt,
|
||||
timestamps.updatedAt,
|
||||
filePath
|
||||
);
|
||||
processed += 1;
|
||||
}
|
||||
|
||||
return processed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses and upserts one Gemini legacy JSON or JSONL artifact.
|
||||
*/
|
||||
async synchronizeFile(filePath: string): Promise<string | null> {
|
||||
if (!filePath.endsWith('.json') && !filePath.endsWith('.jsonl')) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (this.shouldSkipTempArtifact(filePath)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const parsed = filePath.endsWith('.jsonl')
|
||||
? await this.processJsonlSessionFile(filePath, this.buildProjectHashLookup())
|
||||
: await this.processLegacySessionFile(filePath);
|
||||
if (!parsed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const timestamps = await readFileTimestamps(filePath);
|
||||
return sessionsDb.createSession(
|
||||
parsed.sessionId,
|
||||
this.provider,
|
||||
parsed.projectPath,
|
||||
parsed.sessionName,
|
||||
timestamps.createdAt,
|
||||
timestamps.updatedAt,
|
||||
filePath
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts session metadata from one Gemini legacy JSON artifact.
|
||||
*/
|
||||
private async processLegacySessionFile(filePath: string): Promise<ParsedSession | null> {
|
||||
try {
|
||||
const content = await readFile(filePath, 'utf8');
|
||||
const data = JSON.parse(content) as AnyRecord;
|
||||
|
||||
const sessionId =
|
||||
typeof data.sessionId === 'string'
|
||||
? data.sessionId
|
||||
: typeof data.id === 'string'
|
||||
? data.id
|
||||
: undefined;
|
||||
if (!sessionId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const workspaceProjectPath = await this.resolveProjectPathFromChatWorkspace(filePath);
|
||||
const projectPath = typeof data.projectPath === 'string' && data.projectPath.trim().length > 0
|
||||
? data.projectPath
|
||||
: workspaceProjectPath;
|
||||
if (!projectPath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const messages = Array.isArray(data.messages) ? data.messages : [];
|
||||
const firstMessage = messages[0] as AnyRecord | undefined;
|
||||
let rawName: string | undefined;
|
||||
|
||||
if (Array.isArray(firstMessage?.content) && typeof firstMessage.content[0]?.text === 'string') {
|
||||
rawName = firstMessage.content[0].text;
|
||||
} else if (typeof firstMessage?.content === 'string') {
|
||||
rawName = firstMessage.content;
|
||||
}
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
projectPath,
|
||||
sessionName: normalizeSessionName(rawName, 'New Gemini Chat'),
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts session metadata from one Gemini JSONL artifact.
|
||||
*/
|
||||
private async processJsonlSessionFile(
|
||||
filePath: string,
|
||||
projectHashLookup: Map<string, string>
|
||||
): Promise<ParsedSession | null> {
|
||||
const metadata = await this.extractJsonlMetadata(filePath);
|
||||
if (!metadata) {
|
||||
return null;
|
||||
}
|
||||
|
||||
let projectPath = typeof metadata.projectPath === 'string' ? metadata.projectPath.trim() : '';
|
||||
if (!projectPath) {
|
||||
const workspaceProjectPath = await this.resolveProjectPathFromChatWorkspace(filePath);
|
||||
if (workspaceProjectPath) {
|
||||
projectPath = workspaceProjectPath;
|
||||
}
|
||||
}
|
||||
if (!projectPath && typeof metadata.projectHash === 'string') {
|
||||
projectPath = projectHashLookup.get(metadata.projectHash.trim().toLowerCase()) ?? '';
|
||||
}
|
||||
if (!projectPath) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Once we resolve a project hash/path pair, keep it in-memory for this sync run.
|
||||
if (typeof metadata.projectHash === 'string' && metadata.projectHash.trim()) {
|
||||
projectHashLookup.set(metadata.projectHash.trim().toLowerCase(), projectPath);
|
||||
}
|
||||
|
||||
return {
|
||||
sessionId: metadata.sessionId,
|
||||
projectPath,
|
||||
sessionName: normalizeSessionName(metadata.firstUserMessage, 'New Gemini Chat'),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads first useful metadata from Gemini JSONL files.
|
||||
*/
|
||||
private async extractJsonlMetadata(filePath: string): Promise<GeminiJsonlMetadata | null> {
|
||||
try {
|
||||
const content = await readFile(filePath, 'utf8');
|
||||
const lines = content.split('\n');
|
||||
|
||||
let sessionId: string | undefined;
|
||||
let projectPath: string | undefined;
|
||||
let projectHash: string | undefined;
|
||||
let firstUserMessage: string | undefined;
|
||||
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let parsed: AnyRecord;
|
||||
try {
|
||||
parsed = JSON.parse(trimmed) as AnyRecord;
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!sessionId && typeof parsed.sessionId === 'string') {
|
||||
sessionId = parsed.sessionId;
|
||||
}
|
||||
if (!projectPath && typeof parsed.projectPath === 'string') {
|
||||
projectPath = parsed.projectPath;
|
||||
}
|
||||
if (!projectHash && typeof parsed.projectHash === 'string') {
|
||||
projectHash = parsed.projectHash;
|
||||
}
|
||||
|
||||
if (!firstUserMessage && parsed.type === 'user') {
|
||||
firstUserMessage = this.extractGeminiTextContent(parsed.content);
|
||||
}
|
||||
|
||||
if (sessionId && (projectPath || projectHash) && firstUserMessage) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!sessionId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
projectPath,
|
||||
projectHash,
|
||||
firstUserMessage,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to resolve project root from Gemini tmp chat workspaces.
|
||||
*/
|
||||
private async resolveProjectPathFromChatWorkspace(filePath: string): Promise<string> {
|
||||
if (!filePath.includes(`${path.sep}chats${path.sep}`)) {
|
||||
return '';
|
||||
}
|
||||
|
||||
const chatsDir = path.dirname(filePath);
|
||||
const workspaceDir = path.dirname(chatsDir);
|
||||
const projectRootPath = path.join(workspaceDir, '.project_root');
|
||||
|
||||
try {
|
||||
const rootContent = await readFile(projectRootPath, 'utf8');
|
||||
return rootContent.trim();
|
||||
} catch {
|
||||
return '';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a hash->path lookup for Gemini JSONL metadata that stores projectHash.
|
||||
*/
|
||||
private buildProjectHashLookup(): Map<string, string> {
|
||||
const lookup = new Map<string, string>();
|
||||
const knownPaths = new Set<string>();
|
||||
|
||||
for (const project of projectsDb.getProjectPaths()) {
|
||||
if (typeof project.project_path === 'string' && project.project_path.trim()) {
|
||||
knownPaths.add(project.project_path.trim());
|
||||
}
|
||||
}
|
||||
|
||||
for (const session of sessionsDb.getAllSessions()) {
|
||||
if (session.provider === this.provider && typeof session.project_path === 'string' && session.project_path.trim()) {
|
||||
knownPaths.add(session.project_path.trim());
|
||||
}
|
||||
}
|
||||
|
||||
for (const knownPath of knownPaths) {
|
||||
this.addProjectHashCandidates(lookup, knownPath);
|
||||
}
|
||||
|
||||
return lookup;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds likely Gemini hash variants for one project path.
|
||||
*/
|
||||
private addProjectHashCandidates(lookup: Map<string, string>, projectPath: string): void {
|
||||
const trimmed = projectPath.trim();
|
||||
if (!trimmed) {
|
||||
return;
|
||||
}
|
||||
|
||||
const normalized = normalizeProjectPath(trimmed);
|
||||
const resolved = path.resolve(trimmed);
|
||||
const resolvedNormalized = normalizeProjectPath(resolved);
|
||||
|
||||
const candidates = new Set<string>([
|
||||
trimmed,
|
||||
normalized,
|
||||
resolved,
|
||||
resolvedNormalized,
|
||||
]);
|
||||
|
||||
if (process.platform === 'win32') {
|
||||
for (const candidate of [...candidates]) {
|
||||
candidates.add(candidate.toLowerCase());
|
||||
}
|
||||
}
|
||||
|
||||
for (const candidate of candidates) {
|
||||
if (!candidate) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const hash = this.sha256(candidate);
|
||||
if (!lookup.has(hash)) {
|
||||
lookup.set(hash, trimmed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns first user text from Gemini content payload shapes.
|
||||
*/
|
||||
private extractGeminiTextContent(content: unknown): string | undefined {
|
||||
if (typeof content === 'string' && content.trim().length > 0) {
|
||||
return content;
|
||||
}
|
||||
|
||||
if (!Array.isArray(content)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
for (const part of content) {
|
||||
if (typeof part === 'string' && part.trim().length > 0) {
|
||||
return part;
|
||||
}
|
||||
|
||||
if (part && typeof part === 'object' && typeof (part as AnyRecord).text === 'string') {
|
||||
const text = (part as AnyRecord).text;
|
||||
if (text.trim().length > 0) {
|
||||
return text;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Keeps tmp scanning scoped to chat artifacts only.
|
||||
*/
|
||||
private shouldSkipTempArtifact(filePath: string): boolean {
|
||||
return (
|
||||
filePath.startsWith(path.join(this.geminiHome, 'tmp'))
|
||||
&& !filePath.includes(`${path.sep}chats${path.sep}`)
|
||||
);
|
||||
}
|
||||
|
||||
private sha256(value: string): string {
|
||||
return crypto.createHash('sha256').update(value).digest('hex');
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,249 @@
|
||||
import sessionManager from '@/sessionManager.js';
|
||||
import { getGeminiCliSessionMessages } from '@/projects.js';
|
||||
import fsSync from 'node:fs';
|
||||
import fs from 'node:fs/promises';
|
||||
import readline from 'node:readline';
|
||||
|
||||
import { sessionsDb } from '@/modules/database/index.js';
|
||||
import type { IProviderSessions } from '@/shared/interfaces.js';
|
||||
import type { AnyRecord, FetchHistoryOptions, FetchHistoryResult, NormalizedMessage } from '@/shared/types.js';
|
||||
import { createNormalizedMessage, generateMessageId, readObjectRecord } from '@/shared/utils.js';
|
||||
|
||||
const PROVIDER = 'gemini';
|
||||
|
||||
type GeminiHistoryResult = {
|
||||
messages: AnyRecord[];
|
||||
tokenUsage?: unknown;
|
||||
};
|
||||
|
||||
function mapGeminiRole(value: unknown): 'user' | 'assistant' | null {
|
||||
if (value === 'user') {
|
||||
return 'user';
|
||||
}
|
||||
|
||||
if (value === 'gemini' || value === 'assistant') {
|
||||
return 'assistant';
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function extractGeminiTextContent(content: unknown): string {
|
||||
if (typeof content === 'string') {
|
||||
return content;
|
||||
}
|
||||
|
||||
if (!Array.isArray(content)) {
|
||||
return '';
|
||||
}
|
||||
|
||||
return content
|
||||
.map((part) => {
|
||||
if (typeof part === 'string') {
|
||||
return part;
|
||||
}
|
||||
if (!part || typeof part !== 'object') {
|
||||
return '';
|
||||
}
|
||||
|
||||
const record = part as AnyRecord;
|
||||
if (typeof record.text === 'string') {
|
||||
return record.text;
|
||||
}
|
||||
|
||||
return '';
|
||||
})
|
||||
.filter(Boolean)
|
||||
.join('\n');
|
||||
}
|
||||
|
||||
function extractGeminiThoughts(thoughts: unknown): string {
|
||||
if (!Array.isArray(thoughts)) {
|
||||
return '';
|
||||
}
|
||||
|
||||
return thoughts
|
||||
.map((item) => {
|
||||
if (!item || typeof item !== 'object') {
|
||||
return '';
|
||||
}
|
||||
|
||||
const record = item as AnyRecord;
|
||||
const subject = typeof record.subject === 'string' ? record.subject.trim() : '';
|
||||
const description = typeof record.description === 'string' ? record.description.trim() : '';
|
||||
|
||||
if (subject && description) {
|
||||
return `${subject}: ${description}`;
|
||||
}
|
||||
|
||||
return description || subject;
|
||||
})
|
||||
.filter(Boolean)
|
||||
.join('\n');
|
||||
}
|
||||
|
||||
function buildGeminiTokenUsage(tokens: unknown): AnyRecord | undefined {
|
||||
if (!tokens || typeof tokens !== 'object') {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const record = tokens as AnyRecord;
|
||||
const input = Number(record.input || 0);
|
||||
const output = Number(record.output || 0);
|
||||
const cached = Number(record.cached || 0);
|
||||
const thoughts = Number(record.thoughts || 0);
|
||||
const tool = Number(record.tool || 0);
|
||||
|
||||
const totalFromFields = input + output + cached + thoughts + tool;
|
||||
const total = Number(record.total || totalFromFields || 0);
|
||||
|
||||
return {
|
||||
used: total,
|
||||
total: total,
|
||||
breakdown: {
|
||||
input,
|
||||
output,
|
||||
cached,
|
||||
thoughts,
|
||||
tool,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function getGeminiLegacySessionMessages(sessionFilePath: string): Promise<GeminiHistoryResult> {
|
||||
try {
|
||||
const data = await fs.readFile(sessionFilePath, 'utf8');
|
||||
const session = JSON.parse(data) as AnyRecord;
|
||||
const sourceMessages = Array.isArray(session.messages) ? session.messages : [];
|
||||
|
||||
const messages: AnyRecord[] = [];
|
||||
for (const msg of sourceMessages) {
|
||||
const role = mapGeminiRole(msg.type ?? msg.role);
|
||||
if (!role) {
|
||||
continue;
|
||||
}
|
||||
|
||||
messages.push({
|
||||
type: 'message',
|
||||
uuid: typeof msg.id === 'string' ? msg.id : undefined,
|
||||
message: { role, content: msg.content },
|
||||
timestamp: msg.timestamp || null,
|
||||
});
|
||||
}
|
||||
|
||||
return { messages };
|
||||
} catch {
|
||||
return { messages: [] };
|
||||
}
|
||||
}
|
||||
|
||||
async function getGeminiJsonlSessionMessages(sessionFilePath: string): Promise<GeminiHistoryResult> {
|
||||
const messages: AnyRecord[] = [];
|
||||
let tokenUsage: AnyRecord | undefined;
|
||||
|
||||
try {
|
||||
const fileStream = fsSync.createReadStream(sessionFilePath);
|
||||
const lineReader = readline.createInterface({
|
||||
input: fileStream,
|
||||
crlfDelay: Infinity,
|
||||
});
|
||||
|
||||
for await (const line of lineReader) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let entry: AnyRecord;
|
||||
try {
|
||||
entry = JSON.parse(trimmed) as AnyRecord;
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Metadata/update lines (e.g. {$set:{lastUpdated:...}}) do not represent chat messages.
|
||||
if (entry.$set) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const role = mapGeminiRole(entry.type);
|
||||
if (role) {
|
||||
const textContent = extractGeminiTextContent(entry.content);
|
||||
if (textContent.trim()) {
|
||||
messages.push({
|
||||
type: 'message',
|
||||
uuid: typeof entry.id === 'string' ? entry.id : undefined,
|
||||
message: { role, content: textContent },
|
||||
timestamp: entry.timestamp || null,
|
||||
});
|
||||
}
|
||||
|
||||
const thinkingContent = extractGeminiThoughts(entry.thoughts);
|
||||
if (thinkingContent.trim()) {
|
||||
messages.push({
|
||||
type: 'thinking',
|
||||
uuid: typeof entry.id === 'string' ? `${entry.id}_thinking` : undefined,
|
||||
message: { role: 'assistant', content: thinkingContent },
|
||||
timestamp: entry.timestamp || null,
|
||||
isReasoning: true,
|
||||
});
|
||||
}
|
||||
|
||||
if (role === 'assistant') {
|
||||
const usage = buildGeminiTokenUsage(entry.tokens);
|
||||
if (usage) {
|
||||
tokenUsage = usage;
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry.type === 'tool_use') {
|
||||
messages.push({
|
||||
type: 'tool_use',
|
||||
uuid: typeof entry.id === 'string' ? entry.id : undefined,
|
||||
timestamp: entry.timestamp || null,
|
||||
toolName: entry.tool_name || entry.name || 'Tool',
|
||||
toolInput: entry.parameters ?? entry.input ?? entry.arguments ?? '',
|
||||
toolCallId: entry.tool_id || entry.toolCallId || entry.id,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry.type === 'tool_result') {
|
||||
messages.push({
|
||||
type: 'tool_result',
|
||||
uuid: typeof entry.id === 'string' ? entry.id : undefined,
|
||||
timestamp: entry.timestamp || null,
|
||||
toolCallId: entry.tool_id || entry.toolCallId || entry.id || '',
|
||||
output: entry.output ?? entry.result ?? '',
|
||||
isError: Boolean(entry.error) || entry.status === 'error',
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
return { messages: [] };
|
||||
}
|
||||
|
||||
messages.sort(
|
||||
(a, b) => new Date(a.timestamp || 0).getTime() - new Date(b.timestamp || 0).getTime(),
|
||||
);
|
||||
|
||||
return { messages, tokenUsage };
|
||||
}
|
||||
|
||||
async function getGeminiCliSessionMessages(sessionId: string): Promise<GeminiHistoryResult> {
|
||||
const sessionFilePath = sessionsDb.getSessionById(sessionId)?.jsonl_path;
|
||||
if (!sessionFilePath) {
|
||||
return { messages: [] };
|
||||
}
|
||||
|
||||
if (sessionFilePath.endsWith('.jsonl')) {
|
||||
return getGeminiJsonlSessionMessages(sessionFilePath);
|
||||
}
|
||||
|
||||
return getGeminiLegacySessionMessages(sessionFilePath);
|
||||
}
|
||||
|
||||
export class GeminiSessionsProvider implements IProviderSessions {
|
||||
/**
|
||||
* Normalizes live Gemini stream-json events into the shared message shape.
|
||||
@@ -108,8 +346,7 @@ export class GeminiSessionsProvider implements IProviderSessions {
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads Gemini history from the in-memory session manager first, then falls
|
||||
* back to Gemini CLI session files on disk.
|
||||
* Loads Gemini history from Gemini CLI session files on disk.
|
||||
*/
|
||||
async fetchHistory(
|
||||
sessionId: string,
|
||||
@@ -117,28 +354,73 @@ export class GeminiSessionsProvider implements IProviderSessions {
|
||||
): Promise<FetchHistoryResult> {
|
||||
const { limit = null, offset = 0 } = options;
|
||||
|
||||
let rawMessages: AnyRecord[];
|
||||
let result: GeminiHistoryResult;
|
||||
try {
|
||||
rawMessages = sessionManager.getSessionMessages(sessionId) as AnyRecord[];
|
||||
|
||||
if (rawMessages.length === 0) {
|
||||
rawMessages = await getGeminiCliSessionMessages(sessionId) as AnyRecord[];
|
||||
}
|
||||
result = await getGeminiCliSessionMessages(sessionId);
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.warn(`[GeminiProvider] Failed to load session ${sessionId}:`, message);
|
||||
return { messages: [], total: 0, hasMore: false, offset: 0, limit: null };
|
||||
}
|
||||
|
||||
const rawMessages = result.messages;
|
||||
const normalized: NormalizedMessage[] = [];
|
||||
|
||||
for (let i = 0; i < rawMessages.length; i++) {
|
||||
const raw = rawMessages[i];
|
||||
const ts = raw.timestamp || new Date().toISOString();
|
||||
const baseId = raw.uuid || generateMessageId('gemini');
|
||||
|
||||
if (raw.type === 'thinking' || raw.isReasoning) {
|
||||
const thinkingContent = typeof raw.message?.content === 'string'
|
||||
? raw.message.content
|
||||
: typeof raw.content === 'string'
|
||||
? raw.content
|
||||
: '';
|
||||
|
||||
if (thinkingContent.trim()) {
|
||||
normalized.push(createNormalizedMessage({
|
||||
id: baseId,
|
||||
sessionId,
|
||||
timestamp: ts,
|
||||
provider: PROVIDER,
|
||||
kind: 'thinking',
|
||||
content: thinkingContent,
|
||||
}));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (raw.type === 'tool_use' || raw.toolName) {
|
||||
normalized.push(createNormalizedMessage({
|
||||
id: baseId,
|
||||
sessionId,
|
||||
timestamp: ts,
|
||||
provider: PROVIDER,
|
||||
kind: 'tool_use',
|
||||
toolName: raw.toolName || 'Tool',
|
||||
toolInput: raw.toolInput,
|
||||
toolId: raw.toolCallId || baseId,
|
||||
}));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (raw.type === 'tool_result') {
|
||||
normalized.push(createNormalizedMessage({
|
||||
id: baseId,
|
||||
sessionId,
|
||||
timestamp: ts,
|
||||
provider: PROVIDER,
|
||||
kind: 'tool_result',
|
||||
toolId: raw.toolCallId || '',
|
||||
content: raw.output === undefined ? '' : String(raw.output),
|
||||
isError: Boolean(raw.isError),
|
||||
}));
|
||||
continue;
|
||||
}
|
||||
|
||||
const role = raw.message?.role || raw.role;
|
||||
const content = raw.message?.content || raw.content;
|
||||
|
||||
if (!role || !content) {
|
||||
continue;
|
||||
}
|
||||
@@ -147,8 +429,26 @@ export class GeminiSessionsProvider implements IProviderSessions {
|
||||
|
||||
if (Array.isArray(content)) {
|
||||
for (let partIdx = 0; partIdx < content.length; partIdx++) {
|
||||
const part = content[partIdx];
|
||||
if (part.type === 'text' && part.text) {
|
||||
const part = content[partIdx] as AnyRecord | string;
|
||||
|
||||
if (typeof part === 'string' && part.trim()) {
|
||||
normalized.push(createNormalizedMessage({
|
||||
id: `${baseId}_${partIdx}`,
|
||||
sessionId,
|
||||
timestamp: ts,
|
||||
provider: PROVIDER,
|
||||
kind: 'text',
|
||||
role: normalizedRole,
|
||||
content: part,
|
||||
}));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!part || typeof part !== 'object') {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((part.type === 'text' || !part.type) && typeof part.text === 'string' && part.text.trim()) {
|
||||
normalized.push(createNormalizedMessage({
|
||||
id: `${baseId}_${partIdx}`,
|
||||
sessionId,
|
||||
@@ -192,6 +492,19 @@ export class GeminiSessionsProvider implements IProviderSessions {
|
||||
role: normalizedRole,
|
||||
content,
|
||||
}));
|
||||
} else {
|
||||
const textContent = extractGeminiTextContent(content);
|
||||
if (textContent.trim()) {
|
||||
normalized.push(createNormalizedMessage({
|
||||
id: baseId,
|
||||
sessionId,
|
||||
timestamp: ts,
|
||||
provider: PROVIDER,
|
||||
kind: 'text',
|
||||
role: normalizedRole,
|
||||
content: textContent,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,6 +535,7 @@ export class GeminiSessionsProvider implements IProviderSessions {
|
||||
hasMore: pageLimit === null ? false : start + pageLimit < normalized.length,
|
||||
offset: start,
|
||||
limit: pageLimit,
|
||||
tokenUsage: result.tokenUsage,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js';
|
||||
import { GeminiProviderAuth } from '@/modules/providers/list/gemini/gemini-auth.provider.js';
|
||||
import { GeminiMcpProvider } from '@/modules/providers/list/gemini/gemini-mcp.provider.js';
|
||||
import { GeminiSessionSynchronizer } from '@/modules/providers/list/gemini/gemini-session-synchronizer.provider.js';
|
||||
import { GeminiSessionsProvider } from '@/modules/providers/list/gemini/gemini-sessions.provider.js';
|
||||
import type { IProviderAuth, IProviderSessions } from '@/shared/interfaces.js';
|
||||
import type { IProviderAuth, IProviderSessionSynchronizer, IProviderSessions } from '@/shared/interfaces.js';
|
||||
|
||||
export class GeminiProvider extends AbstractProvider {
|
||||
readonly mcp = new GeminiMcpProvider();
|
||||
readonly auth: IProviderAuth = new GeminiProviderAuth();
|
||||
readonly sessions: IProviderSessions = new GeminiSessionsProvider();
|
||||
readonly sessionSynchronizer: IProviderSessionSynchronizer = new GeminiSessionSynchronizer();
|
||||
|
||||
constructor() {
|
||||
super('gemini');
|
||||
|
||||
@@ -2,6 +2,8 @@ import express, { type Request, type Response } from 'express';
|
||||
|
||||
import { providerAuthService } from '@/modules/providers/services/provider-auth.service.js';
|
||||
import { providerMcpService } from '@/modules/providers/services/mcp.service.js';
|
||||
import { sessionConversationsSearchService } from '@/modules/providers/services/session-conversations-search.service.js';
|
||||
import { sessionsService } from '@/modules/providers/services/sessions.service.js';
|
||||
import type { LLMProvider, McpScope, McpTransport, UpsertProviderMcpServerInput } from '@/shared/types.js';
|
||||
import { AppError, asyncHandler, createApiSuccessResponse } from '@/shared/utils.js';
|
||||
|
||||
@@ -25,6 +27,20 @@ const readPathParam = (value: unknown, name: string): string => {
|
||||
const normalizeProviderParam = (value: unknown): string =>
|
||||
readPathParam(value, 'provider').trim().toLowerCase();
|
||||
|
||||
const SESSION_ID_PATTERN = /^[a-zA-Z0-9._-]{1,120}$/;
|
||||
|
||||
const parseSessionId = (value: unknown): string => {
|
||||
const sessionId = readPathParam(value, 'sessionId').trim();
|
||||
if (!SESSION_ID_PATTERN.test(sessionId)) {
|
||||
throw new AppError('Invalid sessionId.', {
|
||||
code: 'INVALID_SESSION_ID',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
|
||||
return sessionId;
|
||||
};
|
||||
|
||||
const readOptionalQueryString = (value: unknown): string | undefined => {
|
||||
if (typeof value !== 'string') {
|
||||
return undefined;
|
||||
@@ -34,6 +50,29 @@ const readOptionalQueryString = (value: unknown): string | undefined => {
|
||||
return normalized.length > 0 ? normalized : undefined;
|
||||
};
|
||||
|
||||
const parseOptionalBooleanQuery = (value: unknown, name: string): boolean | undefined => {
|
||||
if (value === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const normalized = readOptionalQueryString(value);
|
||||
if (!normalized) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (normalized === 'true') {
|
||||
return true;
|
||||
}
|
||||
if (normalized === 'false') {
|
||||
return false;
|
||||
}
|
||||
|
||||
throw new AppError(`${name} must be "true" or "false".`, {
|
||||
code: 'INVALID_QUERY_PARAMETER',
|
||||
statusCode: 400,
|
||||
});
|
||||
};
|
||||
|
||||
const parseMcpScope = (value: unknown): McpScope | undefined => {
|
||||
if (value === undefined) {
|
||||
return undefined;
|
||||
@@ -103,19 +142,19 @@ const parseMcpUpsertPayload = (payload: unknown): UpsertProviderMcpServerInput =
|
||||
args: Array.isArray(body.args) ? body.args.filter((entry): entry is string => typeof entry === 'string') : undefined,
|
||||
env: typeof body.env === 'object' && body.env !== null
|
||||
? Object.fromEntries(
|
||||
Object.entries(body.env as Record<string, unknown>).filter(
|
||||
(entry): entry is [string, string] => typeof entry[1] === 'string',
|
||||
),
|
||||
)
|
||||
Object.entries(body.env as Record<string, unknown>).filter(
|
||||
(entry): entry is [string, string] => typeof entry[1] === 'string',
|
||||
),
|
||||
)
|
||||
: undefined,
|
||||
cwd: readOptionalQueryString(body.cwd),
|
||||
url: readOptionalQueryString(body.url),
|
||||
headers: typeof body.headers === 'object' && body.headers !== null
|
||||
? Object.fromEntries(
|
||||
Object.entries(body.headers as Record<string, unknown>).filter(
|
||||
(entry): entry is [string, string] => typeof entry[1] === 'string',
|
||||
),
|
||||
)
|
||||
Object.entries(body.headers as Record<string, unknown>).filter(
|
||||
(entry): entry is [string, string] => typeof entry[1] === 'string',
|
||||
),
|
||||
)
|
||||
: undefined,
|
||||
envVars: Array.isArray(body.envVars)
|
||||
? body.envVars.filter((entry): entry is string => typeof entry === 'string')
|
||||
@@ -123,10 +162,10 @@ const parseMcpUpsertPayload = (payload: unknown): UpsertProviderMcpServerInput =
|
||||
bearerTokenEnvVar: readOptionalQueryString(body.bearerTokenEnvVar),
|
||||
envHttpHeaders: typeof body.envHttpHeaders === 'object' && body.envHttpHeaders !== null
|
||||
? Object.fromEntries(
|
||||
Object.entries(body.envHttpHeaders as Record<string, unknown>).filter(
|
||||
(entry): entry is [string, string] => typeof entry[1] === 'string',
|
||||
),
|
||||
)
|
||||
Object.entries(body.envHttpHeaders as Record<string, unknown>).filter(
|
||||
(entry): entry is [string, string] => typeof entry[1] === 'string',
|
||||
),
|
||||
)
|
||||
: undefined,
|
||||
};
|
||||
};
|
||||
@@ -143,6 +182,62 @@ const parseProvider = (value: unknown): LLMProvider => {
|
||||
});
|
||||
};
|
||||
|
||||
const parseSessionRenameSummary = (payload: unknown): string => {
|
||||
if (!payload || typeof payload !== 'object') {
|
||||
throw new AppError('Request body must be an object.', {
|
||||
code: 'INVALID_REQUEST_BODY',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
|
||||
const body = payload as Record<string, unknown>;
|
||||
const summary = typeof body.summary === 'string' ? body.summary.trim() : '';
|
||||
if (!summary) {
|
||||
throw new AppError('Summary is required.', {
|
||||
code: 'INVALID_SESSION_SUMMARY',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
|
||||
if (summary.length > 500) {
|
||||
throw new AppError('Summary must not exceed 500 characters.', {
|
||||
code: 'INVALID_SESSION_SUMMARY',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
|
||||
return summary;
|
||||
};
|
||||
|
||||
const parseSessionSearchQuery = (value: unknown): string => {
|
||||
const query = readOptionalQueryString(value) ?? '';
|
||||
if (query.length < 2) {
|
||||
throw new AppError('Query must be at least 2 characters', {
|
||||
code: 'INVALID_SEARCH_QUERY',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
|
||||
return query;
|
||||
};
|
||||
|
||||
const parseSessionSearchLimit = (value: unknown): number => {
|
||||
const raw = readOptionalQueryString(value);
|
||||
if (!raw) {
|
||||
return 50;
|
||||
}
|
||||
|
||||
const parsed = Number.parseInt(raw, 10);
|
||||
if (Number.isNaN(parsed)) {
|
||||
throw new AppError('limit must be a valid integer.', {
|
||||
code: 'INVALID_QUERY_PARAMETER',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
|
||||
return Math.max(1, Math.min(parsed, 100));
|
||||
};
|
||||
|
||||
router.get(
|
||||
'/:provider/auth/status',
|
||||
asyncHandler(async (req: Request, res: Response) => {
|
||||
@@ -152,6 +247,7 @@ router.get(
|
||||
}),
|
||||
);
|
||||
|
||||
// ----------------- MCP routes -----------------
|
||||
router.get(
|
||||
'/:provider/mcp/servers',
|
||||
asyncHandler(async (req: Request, res: Response) => {
|
||||
@@ -214,4 +310,116 @@ router.post(
|
||||
}),
|
||||
);
|
||||
|
||||
// ----------------- Session routes -----------------
|
||||
router.delete(
|
||||
'/sessions/:sessionId',
|
||||
asyncHandler(async (req: Request, res: Response) => {
|
||||
const sessionId = parseSessionId(req.params.sessionId);
|
||||
const deletedFromDisk = parseOptionalBooleanQuery(req.query.deletedFromDisk, 'deletedFromDisk') ?? false;
|
||||
const result = await sessionsService.deleteSessionById(sessionId, deletedFromDisk);
|
||||
res.json(createApiSuccessResponse(result));
|
||||
}),
|
||||
);
|
||||
|
||||
router.put(
|
||||
'/sessions/:sessionId',
|
||||
asyncHandler(async (req: Request, res: Response) => {
|
||||
const sessionId = parseSessionId(req.params.sessionId);
|
||||
const summary = parseSessionRenameSummary(req.body);
|
||||
const result = sessionsService.renameSessionById(sessionId, summary);
|
||||
res.json(createApiSuccessResponse(result));
|
||||
}),
|
||||
);
|
||||
|
||||
router.get(
|
||||
'/sessions/:sessionId/messages',
|
||||
asyncHandler(async (req: Request, res: Response) => {
|
||||
const sessionId = parseSessionId(req.params.sessionId);
|
||||
const limitRaw = readOptionalQueryString(req.query.limit);
|
||||
const offsetRaw = readOptionalQueryString(req.query.offset);
|
||||
|
||||
let limit: number | null = null;
|
||||
if (limitRaw !== undefined) {
|
||||
const parsedLimit = Number.parseInt(limitRaw, 10);
|
||||
if (Number.isNaN(parsedLimit) || parsedLimit < 0) {
|
||||
throw new AppError('limit must be a non-negative integer.', {
|
||||
code: 'INVALID_QUERY_PARAMETER',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
limit = parsedLimit;
|
||||
}
|
||||
|
||||
let offset = 0;
|
||||
if (offsetRaw !== undefined) {
|
||||
const parsedOffset = Number.parseInt(offsetRaw, 10);
|
||||
if (Number.isNaN(parsedOffset) || parsedOffset < 0) {
|
||||
throw new AppError('offset must be a non-negative integer.', {
|
||||
code: 'INVALID_QUERY_PARAMETER',
|
||||
statusCode: 400,
|
||||
});
|
||||
}
|
||||
offset = parsedOffset;
|
||||
}
|
||||
|
||||
const result = await sessionsService.fetchHistory(sessionId, {
|
||||
limit,
|
||||
offset,
|
||||
});
|
||||
res.json(result);
|
||||
}),
|
||||
);
|
||||
|
||||
router.get('/search/sessions', asyncHandler(async (req: Request, res: Response) => {
|
||||
const query = parseSessionSearchQuery(req.query.q);
|
||||
const limit = parseSessionSearchLimit(req.query.limit);
|
||||
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
});
|
||||
|
||||
let closed = false;
|
||||
const abortController = new AbortController();
|
||||
req.on('close', () => {
|
||||
closed = true;
|
||||
abortController.abort();
|
||||
});
|
||||
|
||||
try {
|
||||
await sessionConversationsSearchService.search({
|
||||
query,
|
||||
limit,
|
||||
signal: abortController.signal,
|
||||
onProgress: ({ projectResult, totalMatches, scannedProjects, totalProjects }) => {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (projectResult) {
|
||||
res.write(`event: result\ndata: ${JSON.stringify({ projectResult, totalMatches, scannedProjects, totalProjects })}\n\n`);
|
||||
return;
|
||||
}
|
||||
|
||||
res.write(`event: progress\ndata: ${JSON.stringify({ totalMatches, scannedProjects, totalProjects })}\n\n`);
|
||||
},
|
||||
});
|
||||
|
||||
if (!closed) {
|
||||
res.write('event: done\ndata: {}\n\n');
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error searching conversations:', error);
|
||||
if (!closed) {
|
||||
res.write(`event: error\ndata: ${JSON.stringify({ error: 'Search failed' })}\n\n`);
|
||||
}
|
||||
} finally {
|
||||
if (!closed) {
|
||||
res.end();
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
export default router;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,74 @@
|
||||
import { scanStateDb } from '@/modules/database/index.js';
|
||||
import { providerRegistry } from '@/modules/providers/provider.registry.js';
|
||||
import type { LLMProvider } from '@/shared/types.js';
|
||||
|
||||
type SessionSynchronizeResult = {
|
||||
processedByProvider: Record<LLMProvider, number>;
|
||||
failures: string[];
|
||||
};
|
||||
|
||||
/**
|
||||
* Orchestrates provider-specific session indexers and indexed-session lifecycle operations.
|
||||
*/
|
||||
export const sessionSynchronizerService = {
|
||||
/**
|
||||
* Runs all provider synchronizers and updates scan_state.last_scanned_at.
|
||||
*/
|
||||
async synchronizeSessions(): Promise<SessionSynchronizeResult> {
|
||||
const lastScanAt = scanStateDb.getLastScannedAt();
|
||||
const scanBoundary = new Date();
|
||||
const processedByProvider: Record<LLMProvider, number> = {
|
||||
claude: 0,
|
||||
codex: 0,
|
||||
cursor: 0,
|
||||
gemini: 0,
|
||||
};
|
||||
const failures: string[] = [];
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
providerRegistry.listProviders().map(async (provider) => ({
|
||||
provider: provider.id,
|
||||
processed: await provider.sessionSynchronizer.synchronize(lastScanAt ?? undefined),
|
||||
}))
|
||||
);
|
||||
|
||||
for (const result of results) {
|
||||
if (result.status === 'fulfilled') {
|
||||
processedByProvider[result.value.provider] = result.value.processed;
|
||||
continue;
|
||||
}
|
||||
|
||||
const reason = result.reason instanceof Error ? result.reason.message : String(result.reason);
|
||||
failures.push(reason);
|
||||
}
|
||||
|
||||
if (failures.length === 0) {
|
||||
scanStateDb.updateLastScannedAt(scanBoundary);
|
||||
} else {
|
||||
console.warn(
|
||||
`[Sessions] Skipping scan_state cursor advance because ${failures.length} provider sync(s) failed.`,
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
processedByProvider,
|
||||
failures,
|
||||
};
|
||||
},
|
||||
|
||||
/**
|
||||
* Indexes one provider artifact file without running a full provider rescan.
|
||||
*/
|
||||
async synchronizeProviderFile(
|
||||
provider: LLMProvider,
|
||||
filePath: string
|
||||
): Promise<{ provider: LLMProvider; indexed: boolean; sessionId: string | null }> {
|
||||
const resolvedProvider = providerRegistry.resolveProvider(provider);
|
||||
const sessionId = await resolvedProvider.sessionSynchronizer.synchronizeFile(filePath);
|
||||
return {
|
||||
provider,
|
||||
indexed: Boolean(sessionId),
|
||||
sessionId,
|
||||
};
|
||||
},
|
||||
};
|
||||
283
server/modules/providers/services/sessions-watcher.service.ts
Normal file
283
server/modules/providers/services/sessions-watcher.service.ts
Normal file
@@ -0,0 +1,283 @@
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
import { promises as fsPromises } from 'node:fs';
|
||||
|
||||
import chokidar, { type FSWatcher } from 'chokidar';
|
||||
|
||||
import { sessionSynchronizerService } from '@/modules/providers/services/session-synchronizer.service.js';
|
||||
import { WS_OPEN_STATE, connectedClients } from '@/modules/websocket/index.js';
|
||||
import type { LLMProvider } from '@/shared/types.js';
|
||||
import { getProjectsWithSessions } from '@/modules/projects/index.js';
|
||||
|
||||
type WatcherEventType = 'add' | 'change';
|
||||
|
||||
const PROVIDER_WATCH_PATHS: Array<{ provider: LLMProvider; rootPath: string }> = [
|
||||
{
|
||||
provider: 'claude',
|
||||
rootPath: path.join(os.homedir(), '.claude', 'projects'),
|
||||
},
|
||||
{
|
||||
provider: 'cursor',
|
||||
rootPath: path.join(os.homedir(), '.cursor', 'chats'),
|
||||
},
|
||||
{
|
||||
provider: 'codex',
|
||||
rootPath: path.join(os.homedir(), '.codex', 'sessions'),
|
||||
},
|
||||
{
|
||||
provider: 'gemini',
|
||||
rootPath: path.join(os.homedir(), '.gemini', 'sessions'),
|
||||
},
|
||||
{
|
||||
provider: 'gemini',
|
||||
rootPath: path.join(os.homedir(), '.gemini', 'tmp'),
|
||||
},
|
||||
];
|
||||
|
||||
const WATCHER_IGNORED_PATTERNS = [
|
||||
'**/node_modules/**',
|
||||
'**/.git/**',
|
||||
'**/dist/**',
|
||||
'**/build/**',
|
||||
'**/*.tmp',
|
||||
'**/*.swp',
|
||||
'**/.DS_Store',
|
||||
];
|
||||
|
||||
const PROJECTS_UPDATE_DEBOUNCE_MS = 500;
|
||||
const PROJECTS_UPDATE_MAX_WAIT_MS = 2_000;
|
||||
|
||||
const watchers: FSWatcher[] = [];
|
||||
|
||||
type PendingWatcherUpdate = {
|
||||
providers: Set<LLMProvider>;
|
||||
changeTypes: Set<WatcherEventType>;
|
||||
updatedSessionIds: Set<string>;
|
||||
};
|
||||
|
||||
let pendingWatcherUpdate: PendingWatcherUpdate | null = null;
|
||||
let pendingWatcherUpdateStartedAt: number | null = null;
|
||||
let pendingWatcherFlushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let watcherRefreshInFlight = false;
|
||||
let watcherRescheduleAfterRefresh = false;
|
||||
|
||||
/**
|
||||
* Filters watcher events to provider-specific session artifact file types.
|
||||
*/
|
||||
function isWatcherTargetFile(provider: LLMProvider, filePath: string): boolean {
|
||||
if (provider === 'gemini') {
|
||||
return filePath.endsWith('.json') || filePath.endsWith('.jsonl');
|
||||
}
|
||||
|
||||
return filePath.endsWith('.jsonl');
|
||||
}
|
||||
|
||||
function clearPendingWatcherFlushTimer(): void {
|
||||
if (pendingWatcherFlushTimer) {
|
||||
clearTimeout(pendingWatcherFlushTimer);
|
||||
pendingWatcherFlushTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
function schedulePendingWatcherFlush(): void {
|
||||
if (!pendingWatcherUpdate) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
if (pendingWatcherUpdateStartedAt === null) {
|
||||
pendingWatcherUpdateStartedAt = now;
|
||||
}
|
||||
|
||||
const elapsed = now - pendingWatcherUpdateStartedAt;
|
||||
const remainingMaxWait = Math.max(0, PROJECTS_UPDATE_MAX_WAIT_MS - elapsed);
|
||||
const delay = Math.min(PROJECTS_UPDATE_DEBOUNCE_MS, remainingMaxWait);
|
||||
|
||||
clearPendingWatcherFlushTimer();
|
||||
pendingWatcherFlushTimer = setTimeout(() => {
|
||||
void flushPendingWatcherUpdate();
|
||||
}, delay);
|
||||
}
|
||||
|
||||
function queuePendingWatcherUpdate(
|
||||
eventType: WatcherEventType,
|
||||
provider: LLMProvider,
|
||||
updatedSessionId: string | null
|
||||
): void {
|
||||
if (!pendingWatcherUpdate) {
|
||||
pendingWatcherUpdate = {
|
||||
providers: new Set<LLMProvider>(),
|
||||
changeTypes: new Set<WatcherEventType>(),
|
||||
updatedSessionIds: new Set<string>(),
|
||||
};
|
||||
}
|
||||
|
||||
pendingWatcherUpdate.providers.add(provider);
|
||||
pendingWatcherUpdate.changeTypes.add(eventType);
|
||||
if (updatedSessionId) {
|
||||
pendingWatcherUpdate.updatedSessionIds.add(updatedSessionId);
|
||||
}
|
||||
|
||||
schedulePendingWatcherFlush();
|
||||
}
|
||||
|
||||
async function flushPendingWatcherUpdate(): Promise<void> {
|
||||
clearPendingWatcherFlushTimer();
|
||||
|
||||
if (!pendingWatcherUpdate) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (watcherRefreshInFlight) {
|
||||
watcherRescheduleAfterRefresh = true;
|
||||
return;
|
||||
}
|
||||
|
||||
const queuedUpdate = pendingWatcherUpdate;
|
||||
pendingWatcherUpdate = null;
|
||||
pendingWatcherUpdateStartedAt = null;
|
||||
watcherRefreshInFlight = true;
|
||||
|
||||
try {
|
||||
const updatedProjects = await getProjectsWithSessions({ skipSynchronization: true });
|
||||
const changeTypes = Array.from(queuedUpdate.changeTypes);
|
||||
const watchProviders = Array.from(queuedUpdate.providers);
|
||||
const updatedSessionIds = Array.from(queuedUpdate.updatedSessionIds);
|
||||
|
||||
// Backward-compatible fields stay populated with the first queued values.
|
||||
const updateMessage = JSON.stringify({
|
||||
type: 'projects_updated',
|
||||
projects: updatedProjects,
|
||||
timestamp: new Date().toISOString(),
|
||||
changeType: changeTypes[0] ?? 'change',
|
||||
updatedSessionId: updatedSessionIds[0] ?? undefined,
|
||||
watchProvider: watchProviders[0] ?? undefined,
|
||||
changeTypes,
|
||||
updatedSessionIds,
|
||||
watchProviders,
|
||||
batched: true,
|
||||
});
|
||||
|
||||
connectedClients.forEach(client => {
|
||||
if (client.readyState === WS_OPEN_STATE) {
|
||||
client.send(updateMessage);
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.error('Session watcher refresh failed while broadcasting projects_updated', { error: message });
|
||||
} finally {
|
||||
watcherRefreshInFlight = false;
|
||||
|
||||
if (pendingWatcherUpdate || watcherRescheduleAfterRefresh) {
|
||||
watcherRescheduleAfterRefresh = false;
|
||||
schedulePendingWatcherFlush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles file watcher updates and triggers provider file-level synchronization.
|
||||
*/
|
||||
async function onUpdate(
|
||||
eventType: WatcherEventType,
|
||||
filePath: string,
|
||||
provider: LLMProvider
|
||||
): Promise<void> {
|
||||
if (!isWatcherTargetFile(provider, filePath)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await sessionSynchronizerService.synchronizeProviderFile(provider, filePath);
|
||||
if (!result.indexed) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`Session synchronization triggered by ${eventType} event for provider "${provider}"`, {
|
||||
filePath,
|
||||
sessionId: result.sessionId,
|
||||
});
|
||||
queuePendingWatcherUpdate(eventType, provider, result.sessionId);
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.error(`Session watcher sync failed for provider "${provider}"`, {
|
||||
eventType,
|
||||
filePath,
|
||||
error: message,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts provider filesystem watchers and performs initial DB synchronization.
|
||||
*/
|
||||
export async function initializeSessionsWatcher(): Promise<void> {
|
||||
console.log('Setting up session watchers');
|
||||
|
||||
const initialSync = await sessionSynchronizerService.synchronizeSessions();
|
||||
console.log('Initial session synchronization complete', {
|
||||
processedByProvider: initialSync.processedByProvider,
|
||||
failures: initialSync.failures,
|
||||
});
|
||||
|
||||
for (const { provider, rootPath } of PROVIDER_WATCH_PATHS) {
|
||||
try {
|
||||
await fsPromises.mkdir(rootPath, { recursive: true });
|
||||
|
||||
const watcher = chokidar.watch(rootPath, {
|
||||
ignored: WATCHER_IGNORED_PATTERNS,
|
||||
persistent: true,
|
||||
ignoreInitial: true,
|
||||
followSymlinks: false,
|
||||
depth: 6,
|
||||
usePolling: true,
|
||||
interval: 6_000,
|
||||
binaryInterval: 6_000,
|
||||
});
|
||||
|
||||
watcher
|
||||
.on('add', (filePath: string) => {
|
||||
void onUpdate('add', filePath, provider);
|
||||
})
|
||||
.on('change', (filePath: string) => {
|
||||
void onUpdate('change', filePath, provider);
|
||||
})
|
||||
.on('error', (error: unknown) => {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.error(`Session watcher error for provider "${provider}"`, { error: message });
|
||||
});
|
||||
|
||||
watchers.push(watcher);
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.error(`Failed to initialize session watcher for provider "${provider}"`, {
|
||||
rootPath,
|
||||
error: message,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops all active provider session watchers.
|
||||
*/
|
||||
export async function closeSessionsWatcher(): Promise<void> {
|
||||
clearPendingWatcherFlushTimer();
|
||||
|
||||
await Promise.all(
|
||||
watchers.map(async (watcher) => {
|
||||
try {
|
||||
await watcher.close();
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.error('Failed to close session watcher', { error: message });
|
||||
}
|
||||
})
|
||||
);
|
||||
watchers.length = 0;
|
||||
pendingWatcherUpdate = null;
|
||||
pendingWatcherUpdateStartedAt = null;
|
||||
watcherRefreshInFlight = false;
|
||||
watcherRescheduleAfterRefresh = false;
|
||||
}
|
||||
@@ -1,3 +1,6 @@
|
||||
import fsp from 'node:fs/promises';
|
||||
|
||||
import { sessionsDb } from '@/modules/database/index.js';
|
||||
import { providerRegistry } from '@/modules/providers/provider.registry.js';
|
||||
import type {
|
||||
FetchHistoryOptions,
|
||||
@@ -5,6 +8,23 @@ import type {
|
||||
LLMProvider,
|
||||
NormalizedMessage,
|
||||
} from '@/shared/types.js';
|
||||
import { AppError } from '@/shared/utils.js';
|
||||
|
||||
/**
|
||||
* Removes one file if it exists.
|
||||
*/
|
||||
async function removeFileIfExists(filePath: string): Promise<boolean> {
|
||||
try {
|
||||
await fsp.unlink(filePath);
|
||||
return true;
|
||||
} catch (error) {
|
||||
const code = (error as NodeJS.ErrnoException).code;
|
||||
if (code === 'ENOENT') {
|
||||
return false;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Application service for provider-backed session message operations.
|
||||
@@ -33,13 +53,78 @@ export const sessionsService = {
|
||||
},
|
||||
|
||||
/**
|
||||
* Fetches normalized persisted session history for one provider/session pair.
|
||||
* Fetches persisted history by session id.
|
||||
*
|
||||
* Provider and provider-specific lookup hints are resolved from the indexed
|
||||
* session metadata in the database.
|
||||
*/
|
||||
fetchHistory(
|
||||
providerName: string,
|
||||
sessionId: string,
|
||||
options?: FetchHistoryOptions,
|
||||
options: Pick<FetchHistoryOptions, 'limit' | 'offset'> = {},
|
||||
): Promise<FetchHistoryResult> {
|
||||
return providerRegistry.resolveProvider(providerName).sessions.fetchHistory(sessionId, options);
|
||||
const session = sessionsDb.getSessionById(sessionId);
|
||||
if (!session) {
|
||||
throw new AppError(`Session "${sessionId}" was not found.`, {
|
||||
code: 'SESSION_NOT_FOUND',
|
||||
statusCode: 404,
|
||||
});
|
||||
}
|
||||
|
||||
const provider = session.provider as LLMProvider;
|
||||
return providerRegistry.resolveProvider(provider).sessions.fetchHistory(sessionId, {
|
||||
limit: options.limit ?? null,
|
||||
offset: options.offset ?? 0,
|
||||
projectPath: session.project_path ?? '',
|
||||
});
|
||||
},
|
||||
|
||||
/**
|
||||
* Deletes one persisted session row by id.
|
||||
*
|
||||
* When `deletedFromDisk` is true and a session `jsonl_path` exists, the path
|
||||
* is deleted from disk before the DB row is removed.
|
||||
*/
|
||||
async deleteSessionById(
|
||||
sessionId: string,
|
||||
deletedFromDisk = false,
|
||||
): Promise<{ sessionId: string; deletedFromDisk: boolean }> {
|
||||
const session = sessionsDb.getSessionById(sessionId);
|
||||
if (!session) {
|
||||
throw new AppError(`Session "${sessionId}" was not found.`, {
|
||||
code: 'SESSION_NOT_FOUND',
|
||||
statusCode: 404,
|
||||
});
|
||||
}
|
||||
|
||||
let removedFromDisk = false;
|
||||
if (deletedFromDisk && session.jsonl_path) {
|
||||
removedFromDisk = await removeFileIfExists(session.jsonl_path);
|
||||
}
|
||||
|
||||
const deleted = sessionsDb.deleteSessionById(sessionId);
|
||||
if (!deleted) {
|
||||
throw new AppError(`Session "${sessionId}" was not found.`, {
|
||||
code: 'SESSION_NOT_FOUND',
|
||||
statusCode: 404,
|
||||
});
|
||||
}
|
||||
|
||||
return { sessionId, deletedFromDisk: removedFromDisk };
|
||||
},
|
||||
|
||||
/**
|
||||
* Renames one session by id without requiring the caller to pass provider.
|
||||
*/
|
||||
renameSessionById(sessionId: string, summary: string): { sessionId: string; summary: string } {
|
||||
const session = sessionsDb.getSessionById(sessionId);
|
||||
if (!session) {
|
||||
throw new AppError(`Session "${sessionId}" was not found.`, {
|
||||
code: 'SESSION_NOT_FOUND',
|
||||
statusCode: 404,
|
||||
});
|
||||
}
|
||||
|
||||
sessionsDb.updateSessionCustomName(sessionId, summary);
|
||||
return { sessionId, summary };
|
||||
},
|
||||
};
|
||||
|
||||
@@ -1,4 +1,10 @@
|
||||
import type { IProvider, IProviderAuth, IProviderMcp, IProviderSessions } from '@/shared/interfaces.js';
|
||||
import type {
|
||||
IProvider,
|
||||
IProviderAuth,
|
||||
IProviderMcp,
|
||||
IProviderSessionSynchronizer,
|
||||
IProviderSessions,
|
||||
} from '@/shared/interfaces.js';
|
||||
import type { LLMProvider } from '@/shared/types.js';
|
||||
|
||||
/**
|
||||
@@ -13,6 +19,7 @@ export abstract class AbstractProvider implements IProvider {
|
||||
abstract readonly mcp: IProviderMcp;
|
||||
abstract readonly auth: IProviderAuth;
|
||||
abstract readonly sessions: IProviderSessions;
|
||||
abstract readonly sessionSynchronizer: IProviderSessionSynchronizer;
|
||||
|
||||
protected constructor(id: LLMProvider) {
|
||||
this.id = id;
|
||||
|
||||
Reference in New Issue
Block a user