feat: optimistic update for session watcher

This commit is contained in:
Haileyesus
2026-04-25 21:32:24 +03:00
parent edc7d6d184
commit 68123dcc33
3 changed files with 149 additions and 20 deletions

View File

@@ -45,6 +45,10 @@ type ProgressUpdate = {
currentProject?: string; currentProject?: string;
}; };
type GetProjectsWithSessionsOptions = {
skipSynchronization?: boolean;
};
const __dirname = getModuleDir(import.meta.url); const __dirname = getModuleDir(import.meta.url);
const APP_ROOT = findAppRoot(__dirname); const APP_ROOT = findAppRoot(__dirname);
const PROJECTS_DUMP_DIR = path.join(APP_ROOT, '.tmp', 'project-dumps'); const PROJECTS_DUMP_DIR = path.join(APP_ROOT, '.tmp', 'project-dumps');
@@ -195,8 +199,12 @@ function broadcastProgress(progress: ProgressUpdate) {
/** /**
* Reads all projects from DB and returns provider-bucketed session summaries. * Reads all projects from DB and returns provider-bucketed session summaries.
*/ */
export async function getProjectsWithSessions(): Promise<ProjectListItem[]> { export async function getProjectsWithSessions(
await sessionSynchronizerService.synchronizeSessions(); options: GetProjectsWithSessionsOptions = {}
): Promise<ProjectListItem[]> {
if (!options.skipSynchronization) {
await sessionSynchronizerService.synchronizeSessions();
}
const projectRows = projectsDb.getProjectPaths() as Array<{ const projectRows = projectsDb.getProjectPaths() as Array<{
project_id: string; project_id: string;

View File

@@ -44,8 +44,23 @@ const WATCHER_IGNORED_PATTERNS = [
'**/.DS_Store', '**/.DS_Store',
]; ];
const PROJECTS_UPDATE_DEBOUNCE_MS = 500;
const PROJECTS_UPDATE_MAX_WAIT_MS = 2_000;
const watchers: FSWatcher[] = []; const watchers: FSWatcher[] = [];
type PendingWatcherUpdate = {
providers: Set<LLMProvider>;
changeTypes: Set<WatcherEventType>;
updatedSessionIds: Set<string>;
};
let pendingWatcherUpdate: PendingWatcherUpdate | null = null;
let pendingWatcherUpdateStartedAt: number | null = null;
let pendingWatcherFlushTimer: ReturnType<typeof setTimeout> | null = null;
let watcherRefreshInFlight = false;
let watcherRescheduleAfterRefresh = false;
/** /**
* Filters watcher events to provider-specific session artifact file types. * Filters watcher events to provider-specific session artifact file types.
*/ */
@@ -57,6 +72,110 @@ function isWatcherTargetFile(provider: LLMProvider, filePath: string): boolean {
return filePath.endsWith('.jsonl'); return filePath.endsWith('.jsonl');
} }
function clearPendingWatcherFlushTimer(): void {
if (pendingWatcherFlushTimer) {
clearTimeout(pendingWatcherFlushTimer);
pendingWatcherFlushTimer = null;
}
}
function schedulePendingWatcherFlush(): void {
if (!pendingWatcherUpdate) {
return;
}
const now = Date.now();
if (pendingWatcherUpdateStartedAt === null) {
pendingWatcherUpdateStartedAt = now;
}
const elapsed = now - pendingWatcherUpdateStartedAt;
const remainingMaxWait = Math.max(0, PROJECTS_UPDATE_MAX_WAIT_MS - elapsed);
const delay = Math.min(PROJECTS_UPDATE_DEBOUNCE_MS, remainingMaxWait);
clearPendingWatcherFlushTimer();
pendingWatcherFlushTimer = setTimeout(() => {
void flushPendingWatcherUpdate();
}, delay);
}
function queuePendingWatcherUpdate(
eventType: WatcherEventType,
provider: LLMProvider,
updatedSessionId: string | null
): void {
if (!pendingWatcherUpdate) {
pendingWatcherUpdate = {
providers: new Set<LLMProvider>(),
changeTypes: new Set<WatcherEventType>(),
updatedSessionIds: new Set<string>(),
};
}
pendingWatcherUpdate.providers.add(provider);
pendingWatcherUpdate.changeTypes.add(eventType);
if (updatedSessionId) {
pendingWatcherUpdate.updatedSessionIds.add(updatedSessionId);
}
schedulePendingWatcherFlush();
}
async function flushPendingWatcherUpdate(): Promise<void> {
clearPendingWatcherFlushTimer();
if (!pendingWatcherUpdate) {
return;
}
if (watcherRefreshInFlight) {
watcherRescheduleAfterRefresh = true;
return;
}
const queuedUpdate = pendingWatcherUpdate;
pendingWatcherUpdate = null;
pendingWatcherUpdateStartedAt = null;
watcherRefreshInFlight = true;
try {
const updatedProjects = await getProjectsWithSessions({ skipSynchronization: true });
const changeTypes = Array.from(queuedUpdate.changeTypes);
const watchProviders = Array.from(queuedUpdate.providers);
const updatedSessionIds = Array.from(queuedUpdate.updatedSessionIds);
// Backward-compatible fields stay populated with the first queued values.
const updateMessage = JSON.stringify({
type: 'projects_updated',
projects: updatedProjects,
timestamp: new Date().toISOString(),
changeType: changeTypes[0] ?? 'change',
updatedSessionId: updatedSessionIds[0] ?? undefined,
watchProvider: watchProviders[0] ?? undefined,
changeTypes,
updatedSessionIds,
watchProviders,
batched: true,
});
connectedClients.forEach(client => {
if (client.readyState === WS_OPEN_STATE) {
client.send(updateMessage);
}
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error('Session watcher refresh failed while broadcasting projects_updated', { error: message });
} finally {
watcherRefreshInFlight = false;
if (pendingWatcherUpdate || watcherRescheduleAfterRefresh) {
watcherRescheduleAfterRefresh = false;
schedulePendingWatcherFlush();
}
}
}
/** /**
* Handles file watcher updates and triggers provider file-level synchronization. * Handles file watcher updates and triggers provider file-level synchronization.
*/ */
@@ -71,25 +190,15 @@ async function onUpdate(
try { try {
const result = await sessionSynchronizerService.synchronizeProviderFile(provider, filePath); const result = await sessionSynchronizerService.synchronizeProviderFile(provider, filePath);
if (!result.indexed) {
return;
}
// Get updated projects list console.log(`Session synchronization triggered by ${eventType} event for provider "${provider}"`, {
const updatedProjects = await getProjectsWithSessions(); filePath,
sessionId: result.sessionId,
// Notify all connected clients about the project changes
const updateMessage = JSON.stringify({
type: 'projects_updated',
projects: updatedProjects,
timestamp: new Date().toISOString(),
changeType: eventType,
updatedSessionId: result.sessionId ?? undefined,
watchProvider: provider
});
connectedClients.forEach(client => {
if (client.readyState === WS_OPEN_STATE) {
client.send(updateMessage);
}
}); });
queuePendingWatcherUpdate(eventType, provider, result.sessionId);
} catch (error) { } catch (error) {
const message = error instanceof Error ? error.message : String(error); const message = error instanceof Error ? error.message : String(error);
console.error(`Session watcher sync failed for provider "${provider}"`, { console.error(`Session watcher sync failed for provider "${provider}"`, {
@@ -123,7 +232,7 @@ export async function initializeSessionsWatcher(): Promise<void> {
followSymlinks: false, followSymlinks: false,
depth: 6, depth: 6,
usePolling: true, usePolling: true,
interval: 2_000, interval: 6_000,
binaryInterval: 6_000, binaryInterval: 6_000,
}); });
@@ -154,6 +263,8 @@ export async function initializeSessionsWatcher(): Promise<void> {
* Stops all active provider session watchers. * Stops all active provider session watchers.
*/ */
export async function closeSessionsWatcher(): Promise<void> { export async function closeSessionsWatcher(): Promise<void> {
clearPendingWatcherFlushTimer();
await Promise.all( await Promise.all(
watchers.map(async (watcher) => { watchers.map(async (watcher) => {
try { try {
@@ -165,4 +276,8 @@ export async function closeSessionsWatcher(): Promise<void> {
}) })
); );
watchers.length = 0; watchers.length = 0;
pendingWatcherUpdate = null;
pendingWatcherUpdateStartedAt = null;
watcherRefreshInFlight = false;
watcherRescheduleAfterRefresh = false;
} }

View File

@@ -64,6 +64,12 @@ export interface ProjectsUpdatedMessage {
type: 'projects_updated'; type: 'projects_updated';
projects: Project[]; projects: Project[];
updatedSessionId?: string; updatedSessionId?: string;
updatedSessionIds?: string[];
watchProvider?: LLMProvider;
watchProviders?: LLMProvider[];
changeType?: 'add' | 'change';
changeTypes?: Array<'add' | 'change'>;
batched?: boolean;
[key: string]: unknown; [key: string]: unknown;
} }