From 68123dcc33e77ed903d4064925eb5ba29c6a3ceb Mon Sep 17 00:00:00 2001 From: Haileyesus <118998054+blackmammoth@users.noreply.github.com> Date: Sat, 25 Apr 2026 21:32:24 +0300 Subject: [PATCH] feat: optimistic update for session watcher --- .../projects-with-sessions-fetch.service.ts | 12 +- .../services/sessions-watcher.service.ts | 151 +++++++++++++++--- src/types/app.ts | 6 + 3 files changed, 149 insertions(+), 20 deletions(-) diff --git a/server/modules/projects/services/projects-with-sessions-fetch.service.ts b/server/modules/projects/services/projects-with-sessions-fetch.service.ts index 15360d56..81689be0 100644 --- a/server/modules/projects/services/projects-with-sessions-fetch.service.ts +++ b/server/modules/projects/services/projects-with-sessions-fetch.service.ts @@ -45,6 +45,10 @@ type ProgressUpdate = { currentProject?: string; }; +type GetProjectsWithSessionsOptions = { + skipSynchronization?: boolean; +}; + const __dirname = getModuleDir(import.meta.url); const APP_ROOT = findAppRoot(__dirname); const PROJECTS_DUMP_DIR = path.join(APP_ROOT, '.tmp', 'project-dumps'); @@ -195,8 +199,12 @@ function broadcastProgress(progress: ProgressUpdate) { /** * Reads all projects from DB and returns provider-bucketed session summaries. */ -export async function getProjectsWithSessions(): Promise { - await sessionSynchronizerService.synchronizeSessions(); +export async function getProjectsWithSessions( + options: GetProjectsWithSessionsOptions = {} +): Promise { + if (!options.skipSynchronization) { + await sessionSynchronizerService.synchronizeSessions(); + } const projectRows = projectsDb.getProjectPaths() as Array<{ project_id: string; diff --git a/server/modules/providers/services/sessions-watcher.service.ts b/server/modules/providers/services/sessions-watcher.service.ts index ab465f97..44505e82 100644 --- a/server/modules/providers/services/sessions-watcher.service.ts +++ b/server/modules/providers/services/sessions-watcher.service.ts @@ -44,8 +44,23 @@ const WATCHER_IGNORED_PATTERNS = [ '**/.DS_Store', ]; +const PROJECTS_UPDATE_DEBOUNCE_MS = 500; +const PROJECTS_UPDATE_MAX_WAIT_MS = 2_000; + const watchers: FSWatcher[] = []; +type PendingWatcherUpdate = { + providers: Set; + changeTypes: Set; + updatedSessionIds: Set; +}; + +let pendingWatcherUpdate: PendingWatcherUpdate | null = null; +let pendingWatcherUpdateStartedAt: number | null = null; +let pendingWatcherFlushTimer: ReturnType | null = null; +let watcherRefreshInFlight = false; +let watcherRescheduleAfterRefresh = false; + /** * Filters watcher events to provider-specific session artifact file types. */ @@ -57,6 +72,110 @@ function isWatcherTargetFile(provider: LLMProvider, filePath: string): boolean { return filePath.endsWith('.jsonl'); } +function clearPendingWatcherFlushTimer(): void { + if (pendingWatcherFlushTimer) { + clearTimeout(pendingWatcherFlushTimer); + pendingWatcherFlushTimer = null; + } +} + +function schedulePendingWatcherFlush(): void { + if (!pendingWatcherUpdate) { + return; + } + + const now = Date.now(); + if (pendingWatcherUpdateStartedAt === null) { + pendingWatcherUpdateStartedAt = now; + } + + const elapsed = now - pendingWatcherUpdateStartedAt; + const remainingMaxWait = Math.max(0, PROJECTS_UPDATE_MAX_WAIT_MS - elapsed); + const delay = Math.min(PROJECTS_UPDATE_DEBOUNCE_MS, remainingMaxWait); + + clearPendingWatcherFlushTimer(); + pendingWatcherFlushTimer = setTimeout(() => { + void flushPendingWatcherUpdate(); + }, delay); +} + +function queuePendingWatcherUpdate( + eventType: WatcherEventType, + provider: LLMProvider, + updatedSessionId: string | null +): void { + if (!pendingWatcherUpdate) { + pendingWatcherUpdate = { + providers: new Set(), + changeTypes: new Set(), + updatedSessionIds: new Set(), + }; + } + + pendingWatcherUpdate.providers.add(provider); + pendingWatcherUpdate.changeTypes.add(eventType); + if (updatedSessionId) { + pendingWatcherUpdate.updatedSessionIds.add(updatedSessionId); + } + + schedulePendingWatcherFlush(); +} + +async function flushPendingWatcherUpdate(): Promise { + clearPendingWatcherFlushTimer(); + + if (!pendingWatcherUpdate) { + return; + } + + if (watcherRefreshInFlight) { + watcherRescheduleAfterRefresh = true; + return; + } + + const queuedUpdate = pendingWatcherUpdate; + pendingWatcherUpdate = null; + pendingWatcherUpdateStartedAt = null; + watcherRefreshInFlight = true; + + try { + const updatedProjects = await getProjectsWithSessions({ skipSynchronization: true }); + const changeTypes = Array.from(queuedUpdate.changeTypes); + const watchProviders = Array.from(queuedUpdate.providers); + const updatedSessionIds = Array.from(queuedUpdate.updatedSessionIds); + + // Backward-compatible fields stay populated with the first queued values. + const updateMessage = JSON.stringify({ + type: 'projects_updated', + projects: updatedProjects, + timestamp: new Date().toISOString(), + changeType: changeTypes[0] ?? 'change', + updatedSessionId: updatedSessionIds[0] ?? undefined, + watchProvider: watchProviders[0] ?? undefined, + changeTypes, + updatedSessionIds, + watchProviders, + batched: true, + }); + + connectedClients.forEach(client => { + if (client.readyState === WS_OPEN_STATE) { + client.send(updateMessage); + } + }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error('Session watcher refresh failed while broadcasting projects_updated', { error: message }); + } finally { + watcherRefreshInFlight = false; + + if (pendingWatcherUpdate || watcherRescheduleAfterRefresh) { + watcherRescheduleAfterRefresh = false; + schedulePendingWatcherFlush(); + } + } +} + /** * Handles file watcher updates and triggers provider file-level synchronization. */ @@ -71,25 +190,15 @@ async function onUpdate( try { const result = await sessionSynchronizerService.synchronizeProviderFile(provider, filePath); + if (!result.indexed) { + return; + } - // Get updated projects list - const updatedProjects = await getProjectsWithSessions(); - - // Notify all connected clients about the project changes - const updateMessage = JSON.stringify({ - type: 'projects_updated', - projects: updatedProjects, - timestamp: new Date().toISOString(), - changeType: eventType, - updatedSessionId: result.sessionId ?? undefined, - watchProvider: provider - }); - - connectedClients.forEach(client => { - if (client.readyState === WS_OPEN_STATE) { - client.send(updateMessage); - } + console.log(`Session synchronization triggered by ${eventType} event for provider "${provider}"`, { + filePath, + sessionId: result.sessionId, }); + queuePendingWatcherUpdate(eventType, provider, result.sessionId); } catch (error) { const message = error instanceof Error ? error.message : String(error); console.error(`Session watcher sync failed for provider "${provider}"`, { @@ -123,7 +232,7 @@ export async function initializeSessionsWatcher(): Promise { followSymlinks: false, depth: 6, usePolling: true, - interval: 2_000, + interval: 6_000, binaryInterval: 6_000, }); @@ -154,6 +263,8 @@ export async function initializeSessionsWatcher(): Promise { * Stops all active provider session watchers. */ export async function closeSessionsWatcher(): Promise { + clearPendingWatcherFlushTimer(); + await Promise.all( watchers.map(async (watcher) => { try { @@ -165,4 +276,8 @@ export async function closeSessionsWatcher(): Promise { }) ); watchers.length = 0; + pendingWatcherUpdate = null; + pendingWatcherUpdateStartedAt = null; + watcherRefreshInFlight = false; + watcherRescheduleAfterRefresh = false; } diff --git a/src/types/app.ts b/src/types/app.ts index 477b403d..364807f7 100644 --- a/src/types/app.ts +++ b/src/types/app.ts @@ -64,6 +64,12 @@ export interface ProjectsUpdatedMessage { type: 'projects_updated'; projects: Project[]; updatedSessionId?: string; + updatedSessionIds?: string[]; + watchProvider?: LLMProvider; + watchProviders?: LLMProvider[]; + changeType?: 'add' | 'change'; + changeTypes?: Array<'add' | 'change'>; + batched?: boolean; [key: string]: unknown; }