diff --git a/server/modules/database/index.ts b/server/modules/database/index.ts new file mode 100644 index 00000000..db74fd5b --- /dev/null +++ b/server/modules/database/index.ts @@ -0,0 +1,11 @@ +export { apiKeysDb } from '@/modules/database/repositories/api-keys.js'; +export { appConfigDb } from '@/modules/database/repositories/app-config.js'; +export { credentialsDb } from '@/modules/database/repositories/credentials.js'; +export { githubTokensDb } from '@/modules/database/repositories/github-tokens.js'; +export { notificationPreferencesDb } from '@/modules/database/repositories/notification-preferences.js'; +export { projectsDb } from '@/modules/database/repositories/projects.db.js'; +export { pushSubscriptionsDb } from '@/modules/database/repositories/push-subscriptions.js'; +export { scanStateDb } from '@/modules/database/repositories/scan-state.db.js'; +export { sessionsDb } from '@/modules/database/repositories/sessions.db.js'; +export { userDb } from '@/modules/database/repositories/users.js'; +export { vapidKeysDb } from '@/modules/database/repositories/vapid-keys.js'; diff --git a/server/modules/providers/list/claude/claude-session-synchronizer.provider.ts b/server/modules/providers/list/claude/claude-session-synchronizer.provider.ts new file mode 100644 index 00000000..a21b1046 --- /dev/null +++ b/server/modules/providers/list/claude/claude-session-synchronizer.provider.ts @@ -0,0 +1,112 @@ +import os from 'node:os'; +import path from 'node:path'; + +import { sessionsDb } from '@/modules/database/index.js'; +import { + buildLookupMap, + extractFirstValidJsonlData, + findFilesRecursivelyCreatedAfter, + normalizeSessionName, + readFileTimestamps, +} from '@/shared/utils.js'; +import type { IProviderSessionSynchronizer } from '@/shared/interfaces.js'; + +type ParsedSession = { + sessionId: string; + projectPath: string; + sessionName?: string; +}; + +/** + * Session indexer for Claude transcript artifacts. + */ +export class ClaudeSessionSynchronizer implements IProviderSessionSynchronizer { + private readonly provider = 'claude' as const; + private readonly claudeHome = path.join(os.homedir(), '.claude'); + + /** + * Scans ~/.claude/projects and upserts discovered sessions into DB. + */ + async synchronize(since?: Date): Promise { + const nameMap = await buildLookupMap(path.join(this.claudeHome, 'history.jsonl'), 'sessionId', 'display'); + const files = await findFilesRecursivelyCreatedAfter( + path.join(this.claudeHome, 'projects'), + '.jsonl', + since ?? null + ); + + let processed = 0; + for (const filePath of files) { + const parsed = await this.processSessionFile(filePath, nameMap); + if (!parsed) { + continue; + } + + const timestamps = await readFileTimestamps(filePath); + sessionsDb.createSession( + parsed.sessionId, + this.provider, + parsed.projectPath, + parsed.sessionName, + timestamps.createdAt, + timestamps.updatedAt, + filePath + ); + processed += 1; + } + + return processed; + } + + /** + * Parses and upserts one Claude session JSONL file. + */ + async synchronizeFile(filePath: string): Promise { + if (!filePath.endsWith('.jsonl')) { + return false; + } + + const nameMap = await buildLookupMap(path.join(this.claudeHome, 'history.jsonl'), 'sessionId', 'display'); + const parsed = await this.processSessionFile(filePath, nameMap); + if (!parsed) { + return false; + } + + const timestamps = await readFileTimestamps(filePath); + sessionsDb.createSession( + parsed.sessionId, + this.provider, + parsed.projectPath, + parsed.sessionName, + timestamps.createdAt, + timestamps.updatedAt, + filePath + ); + + return true; + } + + /** + * Extracts session metadata from one Claude JSONL session file. + */ + private async processSessionFile( + filePath: string, + nameMap: Map + ): Promise { + return extractFirstValidJsonlData(filePath, (rawData) => { + const data = rawData as Record; + const sessionId = typeof data.sessionId === 'string' ? data.sessionId : undefined; + const projectPath = typeof data.cwd === 'string' ? data.cwd : undefined; + + if (!sessionId || !projectPath) { + return null; + } + + return { + sessionId, + projectPath, + sessionName: normalizeSessionName(nameMap.get(sessionId), 'Untitled Claude Session'), + }; + }); + } +} diff --git a/server/modules/providers/list/claude/claude.provider.ts b/server/modules/providers/list/claude/claude.provider.ts index 675d82dd..eeec1eb4 100644 --- a/server/modules/providers/list/claude/claude.provider.ts +++ b/server/modules/providers/list/claude/claude.provider.ts @@ -1,13 +1,15 @@ import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js'; import { ClaudeProviderAuth } from '@/modules/providers/list/claude/claude-auth.provider.js'; import { ClaudeMcpProvider } from '@/modules/providers/list/claude/claude-mcp.provider.js'; +import { ClaudeSessionSynchronizer } from '@/modules/providers/list/claude/claude-session-synchronizer.provider.js'; import { ClaudeSessionsProvider } from '@/modules/providers/list/claude/claude-sessions.provider.js'; -import type { IProviderAuth, IProviderSessions } from '@/shared/interfaces.js'; +import type { IProviderAuth, IProviderSessionSynchronizer, IProviderSessions } from '@/shared/interfaces.js'; export class ClaudeProvider extends AbstractProvider { readonly mcp = new ClaudeMcpProvider(); readonly auth: IProviderAuth = new ClaudeProviderAuth(); readonly sessions: IProviderSessions = new ClaudeSessionsProvider(); + readonly sessionSynchronizer: IProviderSessionSynchronizer = new ClaudeSessionSynchronizer(); constructor() { super('claude'); diff --git a/server/modules/providers/list/codex/codex-session-synchronizer.provider.ts b/server/modules/providers/list/codex/codex-session-synchronizer.provider.ts new file mode 100644 index 00000000..64d601af --- /dev/null +++ b/server/modules/providers/list/codex/codex-session-synchronizer.provider.ts @@ -0,0 +1,113 @@ +import os from 'node:os'; +import path from 'node:path'; + +import { sessionsDb } from '@/modules/database/index.js'; +import { + buildLookupMap, + extractFirstValidJsonlData, + findFilesRecursivelyCreatedAfter, + normalizeSessionName, + readFileTimestamps, +} from '@/shared/utils.js'; +import type { IProviderSessionSynchronizer } from '@/shared/interfaces.js'; + +type ParsedSession = { + sessionId: string; + projectPath: string; + sessionName?: string; +}; + +/** + * Session indexer for Codex transcript artifacts. + */ +export class CodexSessionSynchronizer implements IProviderSessionSynchronizer { + private readonly provider = 'codex' as const; + private readonly codexHome = path.join(os.homedir(), '.codex'); + + /** + * Scans ~/.codex/sessions and upserts discovered sessions into DB. + */ + async synchronize(since?: Date): Promise { + const nameMap = await buildLookupMap(path.join(this.codexHome, 'session_index.jsonl'), 'id', 'thread_name'); + const files = await findFilesRecursivelyCreatedAfter( + path.join(this.codexHome, 'sessions'), + '.jsonl', + since ?? null + ); + + let processed = 0; + for (const filePath of files) { + const parsed = await this.processSessionFile(filePath, nameMap); + if (!parsed) { + continue; + } + + const timestamps = await readFileTimestamps(filePath); + sessionsDb.createSession( + parsed.sessionId, + this.provider, + parsed.projectPath, + parsed.sessionName, + timestamps.createdAt, + timestamps.updatedAt, + filePath + ); + processed += 1; + } + + return processed; + } + + /** + * Parses and upserts one Codex session JSONL file. + */ + async synchronizeFile(filePath: string): Promise { + if (!filePath.endsWith('.jsonl')) { + return false; + } + + const nameMap = await buildLookupMap(path.join(this.codexHome, 'session_index.jsonl'), 'id', 'thread_name'); + const parsed = await this.processSessionFile(filePath, nameMap); + if (!parsed) { + return false; + } + + const timestamps = await readFileTimestamps(filePath); + sessionsDb.createSession( + parsed.sessionId, + this.provider, + parsed.projectPath, + parsed.sessionName, + timestamps.createdAt, + timestamps.updatedAt, + filePath + ); + + return true; + } + + /** + * Extracts session metadata from one Codex JSONL session file. + */ + private async processSessionFile( + filePath: string, + nameMap: Map + ): Promise { + return extractFirstValidJsonlData(filePath, (rawData) => { + const data = rawData as Record; + const payload = data.payload as Record | undefined; + const sessionId = typeof payload?.id === 'string' ? payload.id : undefined; + const projectPath = typeof payload?.cwd === 'string' ? payload.cwd : undefined; + + if (!sessionId || !projectPath) { + return null; + } + + return { + sessionId, + projectPath, + sessionName: normalizeSessionName(nameMap.get(sessionId), 'Untitled Codex Session'), + }; + }); + } +} diff --git a/server/modules/providers/list/codex/codex.provider.ts b/server/modules/providers/list/codex/codex.provider.ts index fe1b9eb5..593297bc 100644 --- a/server/modules/providers/list/codex/codex.provider.ts +++ b/server/modules/providers/list/codex/codex.provider.ts @@ -1,13 +1,15 @@ import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js'; import { CodexProviderAuth } from '@/modules/providers/list/codex/codex-auth.provider.js'; import { CodexMcpProvider } from '@/modules/providers/list/codex/codex-mcp.provider.js'; +import { CodexSessionSynchronizer } from '@/modules/providers/list/codex/codex-session-synchronizer.provider.js'; import { CodexSessionsProvider } from '@/modules/providers/list/codex/codex-sessions.provider.js'; -import type { IProviderAuth, IProviderSessions } from '@/shared/interfaces.js'; +import type { IProviderAuth, IProviderSessionSynchronizer, IProviderSessions } from '@/shared/interfaces.js'; export class CodexProvider extends AbstractProvider { readonly mcp = new CodexMcpProvider(); readonly auth: IProviderAuth = new CodexProviderAuth(); readonly sessions: IProviderSessions = new CodexSessionsProvider(); + readonly sessionSynchronizer: IProviderSessionSynchronizer = new CodexSessionSynchronizer(); constructor() { super('codex'); diff --git a/server/modules/providers/list/cursor/cursor-session-synchronizer.provider.ts b/server/modules/providers/list/cursor/cursor-session-synchronizer.provider.ts new file mode 100644 index 00000000..b0e89b12 --- /dev/null +++ b/server/modules/providers/list/cursor/cursor-session-synchronizer.provider.ts @@ -0,0 +1,178 @@ +import crypto from 'node:crypto'; +import fs from 'node:fs'; +import fsp from 'node:fs/promises'; +import os from 'node:os'; +import path from 'node:path'; +import readline from 'node:readline'; + +import { sessionsDb } from '@/modules/database/index.js'; +import { + extractFirstValidJsonlData, + findFilesRecursivelyCreatedAfter, + normalizeSessionName, + readFileTimestamps, +} from '@/shared/utils.js'; +import type { IProviderSessionSynchronizer } from '@/shared/interfaces.js'; + +type ParsedSession = { + sessionId: string; + projectPath: string; + sessionName?: string; +}; + +/** + * Returns directory entries or an empty list when the folder is missing. + */ +async function listDirectoryEntriesSafe( + directoryPath: string +): Promise { + try { + return await fsp.readdir(directoryPath, { withFileTypes: true }); + } catch { + return []; + } +} + +/** + * Session indexer for Cursor transcript artifacts. + */ +export class CursorSessionSynchronizer implements IProviderSessionSynchronizer { + private readonly provider = 'cursor' as const; + private readonly cursorHome = path.join(os.homedir(), '.cursor'); + + /** + * Scans Cursor chats and upserts discovered sessions into DB. + */ + async synchronize(since?: Date): Promise { + const projectsDir = path.join(this.cursorHome, 'projects'); + const projectEntries = await listDirectoryEntriesSafe(projectsDir); + const seenProjectPaths = new Set(); + + let processed = 0; + for (const entry of projectEntries) { + if (!entry.isDirectory()) { + continue; + } + + const workerLogPath = path.join(projectsDir, entry.name, 'worker.log'); + const projectPath = await this.extractProjectPathFromWorkerLog(workerLogPath); + if (!projectPath || seenProjectPaths.has(projectPath)) { + continue; + } + + seenProjectPaths.add(projectPath); + const projectHash = this.md5(projectPath); + const chatsDir = path.join(this.cursorHome, 'chats', projectHash); + const files = await findFilesRecursivelyCreatedAfter(chatsDir, '.jsonl', since ?? null); + + for (const filePath of files) { + const parsed = await this.processSessionFile(filePath); + if (!parsed) { + continue; + } + + const timestamps = await readFileTimestamps(filePath); + sessionsDb.createSession( + parsed.sessionId, + this.provider, + parsed.projectPath, + parsed.sessionName, + timestamps.createdAt, + timestamps.updatedAt, + filePath + ); + processed += 1; + } + } + + return processed; + } + + /** + * Parses and upserts one Cursor session JSONL file. + */ + async synchronizeFile(filePath: string): Promise { + if (!filePath.endsWith('.jsonl')) { + return false; + } + + const parsed = await this.processSessionFile(filePath); + if (!parsed) { + return false; + } + + const timestamps = await readFileTimestamps(filePath); + sessionsDb.createSession( + parsed.sessionId, + this.provider, + parsed.projectPath, + parsed.sessionName, + timestamps.createdAt, + timestamps.updatedAt, + filePath + ); + + return true; + } + + /** + * Produces the same project hash Cursor uses in chat directory names. + */ + private md5(input: string): string { + return crypto.createHash('md5').update(input).digest('hex'); + } + + /** + * Extracts project path from Cursor worker.log. + */ + private async extractProjectPathFromWorkerLog(filePath: string): Promise { + try { + const fileStream = fs.createReadStream(filePath, { encoding: 'utf8' }); + const lineReader = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); + + for await (const line of lineReader) { + const match = line.match(/workspacePath=(.*)$/); + const projectPath = match?.[1]?.trim(); + if (projectPath) { + lineReader.close(); + fileStream.close(); + return projectPath; + } + } + } catch { + // Missing worker logs are valid for partial or incomplete session data. + } + + return null; + } + + /** + * Extracts session metadata from one Cursor JSONL session file. + */ + private async processSessionFile(filePath: string): Promise { + const sessionId = path.basename(filePath, '.jsonl'); + const grandparentDir = path.dirname(path.dirname(filePath)); + const workerLogPath = path.join(grandparentDir, 'worker.log'); + const projectPath = await this.extractProjectPathFromWorkerLog(workerLogPath); + + if (!projectPath) { + return null; + } + + return extractFirstValidJsonlData(filePath, (rawData) => { + const data = rawData as Record; + if (data.role !== 'user') { + return null; + } + + const text = typeof data.message?.content?.[0]?.text === 'string' ? data.message.content[0].text : ''; + const firstLine = text.replace(/<\/?user_query>/g, '').trim().split('\n')[0]; + + return { + sessionId, + projectPath, + sessionName: normalizeSessionName(firstLine, 'Untitled Cursor Session'), + }; + }); + } +} diff --git a/server/modules/providers/list/cursor/cursor.provider.ts b/server/modules/providers/list/cursor/cursor.provider.ts index 7e834a10..72edf80c 100644 --- a/server/modules/providers/list/cursor/cursor.provider.ts +++ b/server/modules/providers/list/cursor/cursor.provider.ts @@ -1,13 +1,15 @@ import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js'; import { CursorProviderAuth } from '@/modules/providers/list/cursor/cursor-auth.provider.js'; import { CursorMcpProvider } from '@/modules/providers/list/cursor/cursor-mcp.provider.js'; +import { CursorSessionSynchronizer } from '@/modules/providers/list/cursor/cursor-session-synchronizer.provider.js'; import { CursorSessionsProvider } from '@/modules/providers/list/cursor/cursor-sessions.provider.js'; -import type { IProviderAuth, IProviderSessions } from '@/shared/interfaces.js'; +import type { IProviderAuth, IProviderSessionSynchronizer, IProviderSessions } from '@/shared/interfaces.js'; export class CursorProvider extends AbstractProvider { readonly mcp = new CursorMcpProvider(); readonly auth: IProviderAuth = new CursorProviderAuth(); readonly sessions: IProviderSessions = new CursorSessionsProvider(); + readonly sessionSynchronizer: IProviderSessionSynchronizer = new CursorSessionSynchronizer(); constructor() { super('cursor'); diff --git a/server/modules/providers/list/gemini/gemini-session-synchronizer.provider.ts b/server/modules/providers/list/gemini/gemini-session-synchronizer.provider.ts new file mode 100644 index 00000000..2cf8ecc3 --- /dev/null +++ b/server/modules/providers/list/gemini/gemini-session-synchronizer.provider.ts @@ -0,0 +1,162 @@ +import os from 'node:os'; +import path from 'node:path'; +import { readFile } from 'node:fs/promises'; + +import { sessionsDb } from '@/modules/database/index.js'; +import { + findFilesRecursivelyCreatedAfter, + normalizeSessionName, + readFileTimestamps, +} from '@/shared/utils.js'; +import type { IProviderSessionSynchronizer } from '@/shared/interfaces.js'; + +type ParsedSession = { + sessionId: string; + projectPath: string; + sessionName?: string; +}; + +/** + * Session indexer for Gemini transcript artifacts. + */ +export class GeminiSessionSynchronizer implements IProviderSessionSynchronizer { + private readonly provider = 'gemini' as const; + private readonly geminiHome = path.join(os.homedir(), '.gemini'); + + /** + * Scans Gemini session JSON files and upserts discovered sessions into DB. + */ + async synchronize(since?: Date): Promise { + const legacySessionFiles = await findFilesRecursivelyCreatedAfter( + path.join(this.geminiHome, 'sessions'), + '.json', + since ?? null + ); + const tempFiles = await findFilesRecursivelyCreatedAfter( + path.join(this.geminiHome, 'tmp'), + '.json', + since ?? null + ); + const files = [...legacySessionFiles, ...tempFiles]; + + let processed = 0; + for (const filePath of files) { + if ( + filePath.startsWith(path.join(this.geminiHome, 'tmp')) + && !filePath.includes(`${path.sep}chats${path.sep}`) + ) { + continue; + } + + const parsed = await this.processSessionFile(filePath); + if (!parsed) { + continue; + } + + const timestamps = await readFileTimestamps(filePath); + sessionsDb.createSession( + parsed.sessionId, + this.provider, + parsed.projectPath, + parsed.sessionName, + timestamps.createdAt, + timestamps.updatedAt, + filePath + ); + processed += 1; + } + + return processed; + } + + /** + * Parses and upserts one Gemini session JSON artifact. + */ + async synchronizeFile(filePath: string): Promise { + if (!filePath.endsWith('.json')) { + return false; + } + + if ( + filePath.startsWith(path.join(this.geminiHome, 'tmp')) + && !filePath.includes(`${path.sep}chats${path.sep}`) + ) { + return false; + } + + const parsed = await this.processSessionFile(filePath); + if (!parsed) { + return false; + } + + const timestamps = await readFileTimestamps(filePath); + sessionsDb.createSession( + parsed.sessionId, + this.provider, + parsed.projectPath, + parsed.sessionName, + timestamps.createdAt, + timestamps.updatedAt, + filePath + ); + + return true; + } + + /** + * Extracts session metadata from one Gemini JSON artifact. + */ + private async processSessionFile(filePath: string): Promise { + try { + const content = await readFile(filePath, 'utf8'); + const data = JSON.parse(content) as Record; + + const sessionId = + typeof data.sessionId === 'string' + ? data.sessionId + : typeof data.id === 'string' + ? data.id + : undefined; + if (!sessionId) { + return null; + } + + let projectPath = typeof data.projectPath === 'string' ? data.projectPath : ''; + + if (!projectPath && filePath.includes(`${path.sep}chats${path.sep}`)) { + const chatsDir = path.dirname(filePath); + const workspaceDir = path.dirname(chatsDir); + const projectRootPath = path.join(workspaceDir, '.project_root'); + + try { + const rootContent = await readFile(projectRootPath, 'utf8'); + projectPath = rootContent.trim(); + } catch { + // Some Gemini artifacts do not ship a .project_root marker. + } + } + + if (!projectPath) { + return null; + } + + const messages = Array.isArray(data.messages) ? data.messages : []; + const firstMessage = messages[0] as Record | undefined; + let rawName: string | undefined; + + if (Array.isArray(firstMessage?.content) && typeof firstMessage.content[0]?.text === 'string') { + rawName = firstMessage.content[0].text; + } else if (typeof firstMessage?.content === 'string') { + rawName = firstMessage.content; + } + + return { + sessionId, + projectPath, + sessionName: normalizeSessionName(rawName, 'New Gemini Chat'), + }; + } catch { + return null; + } + } +} diff --git a/server/modules/providers/list/gemini/gemini.provider.ts b/server/modules/providers/list/gemini/gemini.provider.ts index d968b7c0..2fb8a7c2 100644 --- a/server/modules/providers/list/gemini/gemini.provider.ts +++ b/server/modules/providers/list/gemini/gemini.provider.ts @@ -1,13 +1,15 @@ import { AbstractProvider } from '@/modules/providers/shared/base/abstract.provider.js'; import { GeminiProviderAuth } from '@/modules/providers/list/gemini/gemini-auth.provider.js'; import { GeminiMcpProvider } from '@/modules/providers/list/gemini/gemini-mcp.provider.js'; +import { GeminiSessionSynchronizer } from '@/modules/providers/list/gemini/gemini-session-synchronizer.provider.js'; import { GeminiSessionsProvider } from '@/modules/providers/list/gemini/gemini-sessions.provider.js'; -import type { IProviderAuth, IProviderSessions } from '@/shared/interfaces.js'; +import type { IProviderAuth, IProviderSessionSynchronizer, IProviderSessions } from '@/shared/interfaces.js'; export class GeminiProvider extends AbstractProvider { readonly mcp = new GeminiMcpProvider(); readonly auth: IProviderAuth = new GeminiProviderAuth(); readonly sessions: IProviderSessions = new GeminiSessionsProvider(); + readonly sessionSynchronizer: IProviderSessionSynchronizer = new GeminiSessionSynchronizer(); constructor() { super('gemini'); diff --git a/server/modules/providers/services/session-synchronizer.service.ts b/server/modules/providers/services/session-synchronizer.service.ts new file mode 100644 index 00000000..c5afaec0 --- /dev/null +++ b/server/modules/providers/services/session-synchronizer.service.ts @@ -0,0 +1,256 @@ +import path from 'node:path'; +import fsp, { readFile } from 'node:fs/promises'; + +import { scanStateDb, sessionsDb, projectsDb } from '@/modules/database/index.js'; +import { providerRegistry } from '@/modules/providers/provider.registry.js'; +import { sessionsService } from '@/modules/providers/services/sessions.service.js'; +import type { LLMProvider, NormalizedMessage } from '@/shared/types.js'; +import { AppError } from '@/shared/utils.js'; + +type SessionSynchronizeResult = { + processedByProvider: Record; + failures: string[]; +}; + +type SessionHistoryPayload = { + sessionId: string; + provider: string; + projectPath: string | null; + filePath: string; + fileType: 'jsonl' | 'json'; + entries: unknown[]; + messages: NormalizedMessage[]; +}; + +const SESSION_ID_PATTERN = /^[a-zA-Z0-9._-]{1,120}$/; + +/** + * Restricts session ids before they are used in DB and filesystem operations. + */ +function sanitizeSessionId(sessionId: string): string { + const value = String(sessionId).trim(); + if (!SESSION_ID_PATTERN.test(value)) { + throw new AppError('Invalid session id format.', { + code: 'INVALID_SESSION_ID', + statusCode: 400, + }); + } + return value; +} + +/** + * Removes one file if it exists. + */ +async function removeFileIfExists(filePath: string): Promise { + try { + await fsp.unlink(filePath); + return true; + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code === 'ENOENT') { + return false; + } + throw error; + } +} + +/** + * Parses newline-delimited JSON and preserves malformed lines as raw entries. + */ +function parseJsonl(content: string): unknown[] { + const entries: unknown[] = []; + const lines = content.split(/\r?\n/); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + + try { + entries.push(JSON.parse(trimmed)); + } catch { + entries.push({ raw: trimmed, parseError: true }); + } + } + + return entries; +} + +/** + * Parses JSON and normalizes object payloads into a single-element array. + */ +function parseJson(content: string): unknown[] { + try { + const parsed = JSON.parse(content) as unknown; + return Array.isArray(parsed) ? parsed : [parsed]; + } catch { + return [{ raw: content, parseError: true }]; + } +} + +/** + * Orchestrates provider-specific session indexers and indexed-session lifecycle operations. + */ +export const sessionSynchronizerService = { + /** + * Lists indexed sessions from DB, optionally scoped to one provider. + */ + listIndexedSessions(provider?: string) { + const allSessions = sessionsDb.getAllSessions(); + if (!provider) { + return allSessions; + } + + return allSessions.filter((session) => session.provider === provider); + }, + + /** + * Reads one indexed session row and enriches it with the associated project id. + */ + getIndexedSession(sessionId: string) { + const session = sessionsDb.getSessionById(sessionId); + if (!session) { + throw new AppError(`Session "${sessionId}" was not found.`, { + code: 'SESSION_NOT_FOUND', + statusCode: 404, + }); + } + + const project = session.project_path ? projectsDb.getProjectPath(session.project_path) : null; + return { + ...session, + project_id: project?.project_id ?? null, + }; + }, + + /** + * Runs all provider synchronizers and updates scan_state.last_scanned_at. + */ + async synchronizeSessions(): Promise { + const lastScanAt = scanStateDb.getLastScannedAt(); + const processedByProvider: Record = { + claude: 0, + codex: 0, + cursor: 0, + gemini: 0, + }; + const failures: string[] = []; + + const results = await Promise.allSettled( + providerRegistry.listProviders().map(async (provider) => ({ + provider: provider.id, + processed: await provider.sessionSynchronizer.synchronize(lastScanAt ?? undefined), + })) + ); + + for (const result of results) { + if (result.status === 'fulfilled') { + processedByProvider[result.value.provider] = result.value.processed; + continue; + } + + const reason = result.reason instanceof Error ? result.reason.message : String(result.reason); + failures.push(reason); + } + + scanStateDb.updateLastScannedAt(); + + return { + processedByProvider, + failures, + }; + }, + + /** + * Indexes one provider artifact file without running a full provider rescan. + */ + async synchronizeProviderFile( + provider: LLMProvider, + filePath: string + ): Promise<{ provider: LLMProvider; indexed: boolean }> { + const resolvedProvider = providerRegistry.resolveProvider(provider); + const indexed = await resolvedProvider.sessionSynchronizer.synchronizeFile(filePath); + return { provider, indexed }; + }, + + /** + * Updates one indexed session custom name after validating existence. + */ + updateSessionCustomName(sessionId: string, sessionCustomName: string): void { + const sessionMetadata = sessionsDb.getSessionById(sessionId); + if (!sessionMetadata) { + throw new AppError('Session not found.', { + code: 'SESSION_NOT_FOUND', + statusCode: 404, + }); + } + + sessionsDb.updateSessionCustomName(sessionId, sessionCustomName); + }, + + /** + * Deletes a session artifact path from disk (if present) and deletes DB metadata. + */ + async deleteSessionArtifacts(rawSessionId: string): Promise<{ + sessionId: string; + deletedFromDisk: boolean; + deletedFromDatabase: boolean; + }> { + const sessionId = sanitizeSessionId(rawSessionId); + const existingSession = sessionsDb.getSessionById(sessionId); + const sessionFilePath = existingSession?.jsonl_path ?? null; + const deletedFromDisk = sessionFilePath ? await removeFileIfExists(sessionFilePath) : false; + + if (existingSession) { + sessionsDb.deleteSession(sessionId); + } + + return { + sessionId, + deletedFromDisk, + deletedFromDatabase: Boolean(existingSession), + }; + }, + + /** + * Reads indexed session history directly from session json path and normalizes entries. + */ + async getSessionHistory(sessionId: string): Promise { + const session = sessionsDb.getSessionById(sessionId); + if (!session) { + throw new AppError(`Session "${sessionId}" was not found.`, { + code: 'SESSION_NOT_FOUND', + statusCode: 404, + }); + } + + if (!session.jsonl_path) { + throw new AppError(`Session "${sessionId}" does not have a history file path.`, { + code: 'SESSION_HISTORY_NOT_AVAILABLE', + statusCode: 404, + }); + } + + const filePath = session.jsonl_path; + const fileContent = await readFile(filePath, 'utf8'); + const extension = path.extname(filePath).toLowerCase(); + const isGeminiJson = session.provider === 'gemini' || extension === '.json'; + const entries = isGeminiJson ? parseJson(fileContent) : parseJsonl(fileContent); + + const messages: NormalizedMessage[] = []; + for (const entry of entries) { + messages.push(...sessionsService.normalizeMessage(session.provider, entry, session.session_id)); + } + + return { + sessionId: session.session_id, + provider: session.provider, + projectPath: session.project_path, + filePath, + fileType: isGeminiJson ? 'json' : 'jsonl', + entries, + messages, + }; + }, +}; diff --git a/server/modules/providers/services/sessions-watcher.service.ts b/server/modules/providers/services/sessions-watcher.service.ts new file mode 100644 index 00000000..b5eb5c84 --- /dev/null +++ b/server/modules/providers/services/sessions-watcher.service.ts @@ -0,0 +1,151 @@ +import os from 'node:os'; +import path from 'node:path'; +import { promises as fsPromises } from 'node:fs'; + +import chokidar, { type FSWatcher } from 'chokidar'; + +import { sessionSynchronizerService } from '@/modules/providers/services/session-synchronizer.service.js'; +import type { LLMProvider } from '@/shared/types.js'; + +type WatcherEventType = 'add' | 'change'; + +const PROVIDER_WATCH_PATHS: Array<{ provider: LLMProvider; rootPath: string }> = [ + { + provider: 'claude', + rootPath: path.join(os.homedir(), '.claude', 'projects'), + }, + { + provider: 'cursor', + rootPath: path.join(os.homedir(), '.cursor', 'chats'), + }, + { + provider: 'codex', + rootPath: path.join(os.homedir(), '.codex', 'sessions'), + }, + { + provider: 'gemini', + rootPath: path.join(os.homedir(), '.gemini', 'sessions'), + }, + { + provider: 'gemini', + rootPath: path.join(os.homedir(), '.gemini', 'tmp'), + }, +]; + +const WATCHER_IGNORED_PATTERNS = [ + '**/node_modules/**', + '**/.git/**', + '**/dist/**', + '**/build/**', + '**/*.tmp', + '**/*.swp', + '**/.DS_Store', +]; + +const watchers: FSWatcher[] = []; + +/** + * Filters watcher events to provider-specific session artifact file types. + */ +function isWatcherTargetFile(provider: LLMProvider, filePath: string): boolean { + if (provider === 'gemini') { + return filePath.endsWith('.json'); + } + + return filePath.endsWith('.jsonl'); +} + +/** + * Handles file watcher updates and triggers provider file-level synchronization. + */ +async function onUpdate( + eventType: WatcherEventType, + filePath: string, + provider: LLMProvider +): Promise { + if (!isWatcherTargetFile(provider, filePath)) { + return; + } + + try { + const result = await sessionSynchronizerService.synchronizeProviderFile(provider, filePath); + console.log(`Session watcher sync complete for provider "${provider}" after ${eventType}`, { + filePath, + indexed: result.indexed, + }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error(`Session watcher sync failed for provider "${provider}"`, { + eventType, + filePath, + error: message, + }); + } +} + +/** + * Starts provider filesystem watchers and performs initial DB synchronization. + */ +export async function initializeSessionsWatcher(): Promise { + console.log('Setting up session watchers'); + + const initialSync = await sessionSynchronizerService.synchronizeSessions(); + console.log('Initial session synchronization complete', { + processedByProvider: initialSync.processedByProvider, + failures: initialSync.failures, + }); + + for (const { provider, rootPath } of PROVIDER_WATCH_PATHS) { + try { + await fsPromises.mkdir(rootPath, { recursive: true }); + + const watcher = chokidar.watch(rootPath, { + ignored: WATCHER_IGNORED_PATTERNS, + persistent: true, + ignoreInitial: true, + followSymlinks: false, + depth: 6, + usePolling: true, + interval: 2_000, + binaryInterval: 6_000, + }); + + watcher + .on('add', (filePath: string) => { + void onUpdate('add', filePath, provider); + }) + .on('change', (filePath: string) => { + void onUpdate('change', filePath, provider); + }) + .on('error', (error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + console.error(`Session watcher error for provider "${provider}"`, { error: message }); + }); + + watchers.push(watcher); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error(`Failed to initialize session watcher for provider "${provider}"`, { + rootPath, + error: message, + }); + } + } +} + +/** + * Stops all active provider session watchers. + */ +export async function closeSessionsWatcher(): Promise { + await Promise.all( + watchers.map(async (watcher) => { + try { + await watcher.close(); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error('Failed to close session watcher', { error: message }); + } + }) + ); + watchers.length = 0; +} diff --git a/server/modules/providers/shared/base/abstract.provider.ts b/server/modules/providers/shared/base/abstract.provider.ts index 4a591baf..c674364d 100644 --- a/server/modules/providers/shared/base/abstract.provider.ts +++ b/server/modules/providers/shared/base/abstract.provider.ts @@ -1,4 +1,10 @@ -import type { IProvider, IProviderAuth, IProviderMcp, IProviderSessions } from '@/shared/interfaces.js'; +import type { + IProvider, + IProviderAuth, + IProviderMcp, + IProviderSessionSynchronizer, + IProviderSessions, +} from '@/shared/interfaces.js'; import type { LLMProvider } from '@/shared/types.js'; /** @@ -13,6 +19,7 @@ export abstract class AbstractProvider implements IProvider { abstract readonly mcp: IProviderMcp; abstract readonly auth: IProviderAuth; abstract readonly sessions: IProviderSessions; + abstract readonly sessionSynchronizer: IProviderSessionSynchronizer; protected constructor(id: LLMProvider) { this.id = id; diff --git a/server/shared/interfaces.ts b/server/shared/interfaces.ts index d643d0aa..b1f436a6 100644 --- a/server/shared/interfaces.ts +++ b/server/shared/interfaces.ts @@ -21,6 +21,7 @@ export interface IProvider { readonly mcp: IProviderMcp; readonly auth: IProviderAuth; readonly sessions: IProviderSessions; + readonly sessionSynchronizer: IProviderSessionSynchronizer; } // --------------------------- @@ -67,3 +68,25 @@ export interface IProviderSessions { normalizeMessage(raw: unknown, sessionId: string | null): NormalizedMessage[]; fetchHistory(sessionId: string, options?: FetchHistoryOptions): Promise; } + +// --------------------------- +//----------------- PROVIDER SESSION SYNCHRONIZER INTERFACE ------------ +/** + * Session indexing contract for one provider. + * + * Implementations scan provider-specific session artifacts on disk and upsert + * normalized session metadata into the database. The service layer uses this + * interface for both full rescans and single-file incremental sync triggered + * by filesystem watcher events. + */ +export interface IProviderSessionSynchronizer { + /** + * Scans provider session artifacts and upserts discovered sessions into DB. + */ + synchronize(since?: Date): Promise; + + /** + * Parses and upserts one provider artifact file without running a full scan. + */ + synchronizeFile(filePath: string): Promise; +} diff --git a/server/shared/utils.ts b/server/shared/utils.ts index f86d4005..01459a2f 100644 --- a/server/shared/utils.ts +++ b/server/shared/utils.ts @@ -1,6 +1,8 @@ import { randomUUID } from 'node:crypto'; -import { mkdir, readFile, writeFile } from 'node:fs/promises'; +import fs from 'node:fs'; +import { mkdir, readFile, readdir, stat, writeFile } from 'node:fs/promises'; import path from 'node:path'; +import readline from 'node:readline'; import type { NextFunction, Request, RequestHandler, Response } from 'express'; @@ -215,3 +217,167 @@ export const writeJsonConfig = async (filePath: string, data: Record { + try { + const entries = await readdir(rootDir, { withFileTypes: true }); + for (const entry of entries) { + const fullPath = path.join(rootDir, entry.name); + + if (entry.isDirectory()) { + await findFilesRecursivelyCreatedAfter(fullPath, extension, lastScanAt, fileList); + continue; + } + + if (!entry.isFile() || !entry.name.endsWith(extension)) { + continue; + } + + if (!lastScanAt) { + fileList.push(fullPath); + continue; + } + + const fileStat = await stat(fullPath); + if (fileStat.birthtime > lastScanAt) { + fileList.push(fullPath); + } + } + } catch { + // Missing provider folders are expected in first-run or partial setups. + } + + return fileList; +} + +/** + * Reads file creation/update timestamps and maps them to DB-friendly ISO strings. + * + * Session indexers use this to persist `created_at` and `updated_at` metadata + * when upserting sessions. If the file cannot be read, an empty object is + * returned so indexing can continue for other files. + */ +export async function readFileTimestamps( + filePath: string +): Promise<{ createdAt?: string; updatedAt?: string }> { + try { + const fileStat = await stat(filePath); + return { + createdAt: fileStat.birthtime.toISOString(), + updatedAt: fileStat.mtime.toISOString(), + }; + } catch { + return {}; + } +} + +// --------------------------- +//----------------- SESSION SYNCHRONIZER JSONL PARSING HELPERS ------------ +/** + * Builds a first-seen key/value lookup map from a JSONL file. + * + * Use this for provider index files where session id -> display name metadata + * is stored line-by-line. The first value for each key wins, preserving the + * earliest known label while avoiding repeated map overwrites. + */ +export async function buildLookupMap( + filePath: string, + keyField: string, + valueField: string +): Promise> { + const lookup = new Map(); + + try { + const fileStream = fs.createReadStream(filePath); + const lineReader = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); + + for await (const line of lineReader) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + + const parsed = JSON.parse(trimmed) as Record; + const key = parsed[keyField]; + const value = parsed[valueField]; + + if (typeof key === 'string' && typeof value === 'string' && !lookup.has(key)) { + lookup.set(key, value); + } + } + } catch { + // Missing or unreadable lookup files should not block session sync. + } + + return lookup; +} + +/** + * Reads a JSONL file and returns the first extracted payload that matches caller criteria. + * + * The caller supplies an `extractor` that validates provider-specific row + * shapes. This helper centralizes line-by-line parsing and lets indexers stop + * scanning as soon as one valid row is found. + */ +export async function extractFirstValidJsonlData( + filePath: string, + extractor: (parsedJson: unknown) => T | null | undefined +): Promise { + try { + const fileStream = fs.createReadStream(filePath); + const lineReader = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); + + for await (const line of lineReader) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + + const parsed = JSON.parse(trimmed); + const extracted = extractor(parsed); + if (extracted) { + lineReader.close(); + fileStream.close(); + return extracted; + } + } + } catch { + // Ignore malformed or missing artifacts so full scans keep progressing. + } + + return null; +} +