import os from 'node:os'; import path from 'node:path'; import { promises as fsPromises } from 'node:fs'; import chokidar, { type FSWatcher } from 'chokidar'; import { sessionSynchronizerService } from '@/modules/providers/services/session-synchronizer.service.js'; import { WS_OPEN_STATE, connectedClients } from '@/modules/websocket/index.js'; import type { LLMProvider } from '@/shared/types.js'; import { getProjectsWithSessions } from '@/modules/projects/index.js'; type WatcherEventType = 'add' | 'change'; const PROVIDER_WATCH_PATHS: Array<{ provider: LLMProvider; rootPath: string }> = [ { provider: 'claude', rootPath: path.join(os.homedir(), '.claude', 'projects'), }, { provider: 'cursor', rootPath: path.join(os.homedir(), '.cursor', 'chats'), }, { provider: 'codex', rootPath: path.join(os.homedir(), '.codex', 'sessions'), }, { provider: 'gemini', rootPath: path.join(os.homedir(), '.gemini', 'sessions'), }, { provider: 'gemini', rootPath: path.join(os.homedir(), '.gemini', 'tmp'), }, ]; const WATCHER_IGNORED_PATTERNS = [ '**/node_modules/**', '**/.git/**', '**/dist/**', '**/build/**', '**/*.tmp', '**/*.swp', '**/.DS_Store', ]; const PROJECTS_UPDATE_DEBOUNCE_MS = 500; const PROJECTS_UPDATE_MAX_WAIT_MS = 2_000; const watchers: FSWatcher[] = []; type PendingWatcherUpdate = { providers: Set; changeTypes: Set; updatedSessionIds: Set; }; let pendingWatcherUpdate: PendingWatcherUpdate | null = null; let pendingWatcherUpdateStartedAt: number | null = null; let pendingWatcherFlushTimer: ReturnType | null = null; let watcherRefreshInFlight = false; let watcherRescheduleAfterRefresh = false; /** * Filters watcher events to provider-specific session artifact file types. */ function isWatcherTargetFile(provider: LLMProvider, filePath: string): boolean { if (provider === 'gemini') { return filePath.endsWith('.json') || filePath.endsWith('.jsonl'); } return filePath.endsWith('.jsonl'); } function clearPendingWatcherFlushTimer(): void { if (pendingWatcherFlushTimer) { clearTimeout(pendingWatcherFlushTimer); pendingWatcherFlushTimer = null; } } function schedulePendingWatcherFlush(): void { if (!pendingWatcherUpdate) { return; } const now = Date.now(); if (pendingWatcherUpdateStartedAt === null) { pendingWatcherUpdateStartedAt = now; } const elapsed = now - pendingWatcherUpdateStartedAt; const remainingMaxWait = Math.max(0, PROJECTS_UPDATE_MAX_WAIT_MS - elapsed); const delay = Math.min(PROJECTS_UPDATE_DEBOUNCE_MS, remainingMaxWait); clearPendingWatcherFlushTimer(); pendingWatcherFlushTimer = setTimeout(() => { void flushPendingWatcherUpdate(); }, delay); } function queuePendingWatcherUpdate( eventType: WatcherEventType, provider: LLMProvider, updatedSessionId: string | null ): void { if (!pendingWatcherUpdate) { pendingWatcherUpdate = { providers: new Set(), changeTypes: new Set(), updatedSessionIds: new Set(), }; } pendingWatcherUpdate.providers.add(provider); pendingWatcherUpdate.changeTypes.add(eventType); if (updatedSessionId) { pendingWatcherUpdate.updatedSessionIds.add(updatedSessionId); } schedulePendingWatcherFlush(); } async function flushPendingWatcherUpdate(): Promise { clearPendingWatcherFlushTimer(); if (!pendingWatcherUpdate) { return; } if (watcherRefreshInFlight) { watcherRescheduleAfterRefresh = true; return; } const queuedUpdate = pendingWatcherUpdate; pendingWatcherUpdate = null; pendingWatcherUpdateStartedAt = null; watcherRefreshInFlight = true; try { const updatedProjects = await getProjectsWithSessions({ skipSynchronization: true }); const changeTypes = Array.from(queuedUpdate.changeTypes); const watchProviders = Array.from(queuedUpdate.providers); const updatedSessionIds = Array.from(queuedUpdate.updatedSessionIds); // Backward-compatible fields stay populated with the first queued values. const updateMessage = JSON.stringify({ type: 'projects_updated', projects: updatedProjects, timestamp: new Date().toISOString(), changeType: changeTypes[0] ?? 'change', updatedSessionId: updatedSessionIds[0] ?? undefined, watchProvider: watchProviders[0] ?? undefined, changeTypes, updatedSessionIds, watchProviders, batched: true, }); connectedClients.forEach(client => { if (client.readyState === WS_OPEN_STATE) { client.send(updateMessage); } }); } catch (error) { const message = error instanceof Error ? error.message : String(error); console.error('Session watcher refresh failed while broadcasting projects_updated', { error: message }); } finally { watcherRefreshInFlight = false; if (pendingWatcherUpdate || watcherRescheduleAfterRefresh) { watcherRescheduleAfterRefresh = false; schedulePendingWatcherFlush(); } } } /** * Handles file watcher updates and triggers provider file-level synchronization. */ async function onUpdate( eventType: WatcherEventType, filePath: string, provider: LLMProvider ): Promise { if (!isWatcherTargetFile(provider, filePath)) { return; } try { const result = await sessionSynchronizerService.synchronizeProviderFile(provider, filePath); if (!result.indexed) { return; } console.log(`Session synchronization triggered by ${eventType} event for provider "${provider}"`, { filePath, sessionId: result.sessionId, }); queuePendingWatcherUpdate(eventType, provider, result.sessionId); } catch (error) { const message = error instanceof Error ? error.message : String(error); console.error(`Session watcher sync failed for provider "${provider}"`, { eventType, filePath, error: message, }); } } /** * Starts provider filesystem watchers and performs initial DB synchronization. */ export async function initializeSessionsWatcher(): Promise { console.log('Setting up session watchers'); const initialSync = await sessionSynchronizerService.synchronizeSessions(); console.log('Initial session synchronization complete', { processedByProvider: initialSync.processedByProvider, failures: initialSync.failures, }); for (const { provider, rootPath } of PROVIDER_WATCH_PATHS) { try { await fsPromises.mkdir(rootPath, { recursive: true }); const watcher = chokidar.watch(rootPath, { ignored: WATCHER_IGNORED_PATTERNS, persistent: true, ignoreInitial: true, followSymlinks: false, depth: 6, usePolling: true, interval: 6_000, binaryInterval: 6_000, }); watcher .on('add', (filePath: string) => { void onUpdate('add', filePath, provider); }) .on('change', (filePath: string) => { void onUpdate('change', filePath, provider); }) .on('error', (error: unknown) => { const message = error instanceof Error ? error.message : String(error); console.error(`Session watcher error for provider "${provider}"`, { error: message }); }); watchers.push(watcher); } catch (error) { const message = error instanceof Error ? error.message : String(error); console.error(`Failed to initialize session watcher for provider "${provider}"`, { rootPath, error: message, }); } } } /** * Stops all active provider session watchers. */ export async function closeSessionsWatcher(): Promise { clearPendingWatcherFlushTimer(); await Promise.all( watchers.map(async (watcher) => { try { await watcher.close(); } catch (error) { const message = error instanceof Error ? error.message : String(error); console.error('Failed to close session watcher', { error: message }); } }) ); watchers.length = 0; pendingWatcherUpdate = null; pendingWatcherUpdateStartedAt = null; watcherRefreshInFlight = false; watcherRescheduleAfterRefresh = false; }