refactor: use updated session watcher

In addition, for projects_updated websocket response, send the sessionId instead
This commit is contained in:
Haileyesus
2026-04-24 21:56:19 +03:00
parent b3445508e9
commit 3fd2353ffe
12 changed files with 97 additions and 209 deletions

View File

@@ -36,7 +36,6 @@ import {
deleteProjectById,
getProjectTaskMasterById,
getProjectPathById,
clearProjectDirectoryCache,
searchConversations,
} from './projects.js';
import { queryClaudeSDK, abortClaudeSDKSession, isClaudeSDKSessionActive, getActiveClaudeSDKSessions, resolveToolApproval, getPendingApprovalsForSession, reconnectSessionWriter } from './claude-sdk.js';
@@ -68,149 +67,7 @@ import { getConnectableHost } from '../shared/networkHosts.js';
const VALID_PROVIDERS = ['claude', 'codex', 'cursor', 'gemini'];
// File system watchers for provider project/session folders
const PROVIDER_WATCH_PATHS = [
{ provider: 'claude', rootPath: path.join(os.homedir(), '.claude', 'projects') },
{ provider: 'cursor', rootPath: path.join(os.homedir(), '.cursor', 'chats') },
{ provider: 'codex', rootPath: path.join(os.homedir(), '.codex', 'sessions') },
{ provider: 'gemini', rootPath: path.join(os.homedir(), '.gemini', 'projects') },
{ provider: 'gemini_sessions', rootPath: path.join(os.homedir(), '.gemini', 'sessions') }
];
const WATCHER_IGNORED_PATTERNS = [
'**/node_modules/**',
'**/.git/**',
'**/dist/**',
'**/build/**',
'**/*.tmp',
'**/*.swp',
'**/.DS_Store'
];
const WATCHER_DEBOUNCE_MS = 300;
let projectsWatchers = [];
let projectsWatcherDebounceTimer = null;
const connectedClients = new Set();
let isGetProjectsRunning = false; // Flag to prevent reentrant calls
// Broadcast progress to all connected WebSocket clients
function broadcastProgress(progress) {
const message = JSON.stringify({
type: 'loading_progress',
...progress
});
connectedClients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
// Setup file system watchers for Claude, Cursor, and Codex project/session folders
async function setupProjectsWatcher() {
const chokidar = (await import('chokidar')).default;
if (projectsWatcherDebounceTimer) {
clearTimeout(projectsWatcherDebounceTimer);
projectsWatcherDebounceTimer = null;
}
await Promise.all(
projectsWatchers.map(async (watcher) => {
try {
await watcher.close();
} catch (error) {
console.error('[WARN] Failed to close watcher:', error);
}
})
);
projectsWatchers = [];
const debouncedUpdate = (eventType, filePath, provider, rootPath) => {
if (projectsWatcherDebounceTimer) {
clearTimeout(projectsWatcherDebounceTimer);
}
projectsWatcherDebounceTimer = setTimeout(async () => {
// Prevent reentrant calls
if (isGetProjectsRunning) {
return;
}
try {
isGetProjectsRunning = true;
// Clear project directory cache when files change
clearProjectDirectoryCache();
// Get updated projects list
const updatedProjects = await getProjectsWithSessions(broadcastProgress);
// Notify all connected clients about the project changes
const updateMessage = JSON.stringify({
type: 'projects_updated',
projects: updatedProjects,
timestamp: new Date().toISOString(),
changeType: eventType,
changedFile: path.relative(rootPath, filePath),
watchProvider: provider
});
connectedClients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(updateMessage);
}
});
} catch (error) {
console.error('[ERROR] Error handling project changes:', error);
} finally {
isGetProjectsRunning = false;
}
}, WATCHER_DEBOUNCE_MS);
};
for (const { provider, rootPath } of PROVIDER_WATCH_PATHS) {
try {
// chokidar v4 emits ENOENT via the "error" event for missing roots and will not auto-recover.
// Ensure provider folders exist before creating the watcher so watching stays active.
await fsPromises.mkdir(rootPath, { recursive: true });
// Initialize chokidar watcher with optimized settings
const watcher = chokidar.watch(rootPath, {
ignored: WATCHER_IGNORED_PATTERNS,
persistent: true,
ignoreInitial: true, // Don't fire events for existing files on startup
followSymlinks: false,
depth: 10, // Reasonable depth limit
awaitWriteFinish: {
stabilityThreshold: 100, // Wait 100ms for file to stabilize
pollInterval: 50
}
});
// Set up event listeners
watcher
.on('add', (filePath) => debouncedUpdate('add', filePath, provider, rootPath))
.on('change', (filePath) => debouncedUpdate('change', filePath, provider, rootPath))
.on('unlink', (filePath) => debouncedUpdate('unlink', filePath, provider, rootPath))
.on('addDir', (dirPath) => debouncedUpdate('addDir', dirPath, provider, rootPath))
.on('unlinkDir', (dirPath) => debouncedUpdate('unlinkDir', dirPath, provider, rootPath))
.on('error', (error) => {
console.error(`[ERROR] ${provider} watcher error:`, error);
})
.on('ready', () => {
});
projectsWatchers.push(watcher);
} catch (error) {
console.error(`[ERROR] Failed to setup ${provider} watcher for ${rootPath}:`, error);
}
}
if (projectsWatchers.length === 0) {
console.error('[ERROR] Failed to setup any provider watchers');
}
}
export const connectedClients = new Set();
const app = express();
const server = http.createServer(app);
@@ -219,6 +76,7 @@ const ptySessionsMap = new Map();
const PTY_SESSION_TIMEOUT = 30 * 60 * 1000;
const SHELL_URL_PARSE_BUFFER_LIMIT = 32768;
import { stripAnsiSequences, normalizeDetectedUrl, extractUrlsFromText, shouldAutoOpenUrlFromOutput } from './utils/url-detection.js';
import { closeSessionsWatcher, initializeSessionsWatcher } from '@/modules/providers/index.js';
// Single WebSocket server that handles both paths
const wss = new WebSocketServer({
@@ -431,7 +289,7 @@ app.post('/api/system/update', authenticateToken, async (req, res) => {
app.get('/api/projects', authenticateToken, async (req, res) => {
try {
const projects = await getProjectsWithSessions(broadcastProgress);
const projects = await getProjectsWithSessions();
res.json(projects);
} catch (error) {
res.status(500).json({ error: error.message });
@@ -2373,7 +2231,7 @@ async function startServer() {
console.log('');
// Start watching the projects folder for changes
await setupProjectsWatcher();
await initializeSessionsWatcher();
// await getProjectsWithSessions(); // TODO: REMOVE THIS
// Start server-side plugin processes for enabled plugins
@@ -2382,6 +2240,7 @@ async function startServer() {
});
});
await closeSessionsWatcher();
// Clean up plugin processes on shutdown
const shutdownPlugins = async () => {
await stopAllPlugins();

View File

@@ -76,7 +76,7 @@ export const sessionsDb = {
createdAt?: string,
updatedAt?: string,
jsonlPath?: string | null
): void {
): string {
const db = getConnection();
const createdAtValue = normalizeTimestamp(createdAt);
const updatedAtValue = normalizeTimestamp(updatedAt);
@@ -103,6 +103,8 @@ export const sessionsDb = {
createdAtValue,
updatedAtValue
);
return sessionId;
},
updateSessionCustomName(sessionId: string, customName: string): void {

View File

@@ -4,6 +4,7 @@ import path from 'node:path';
import { projectsDb, sessionsDb } from '@/modules/database/index.js';
import { sessionSynchronizerService } from '@/modules/providers/index.js';
import { findAppRoot, getModuleDir } from '@/utils/runtime-paths.js';
import { connectedClients } from '@/index.js';
type SessionSummary = {
id: string;
@@ -35,9 +36,12 @@ export type ProjectsSnapshot = {
projects: ProjectListItem[];
};
type ProgressCallback =
| ((progress: { phase: 'loading' | 'complete'; current: number; total: number; currentProject?: string }) => void)
| null;
type ProgressUpdate = {
phase: 'loading' | 'complete';
current: number;
total: number;
currentProject?: string;
};
const __dirname = getModuleDir(import.meta.url);
const APP_ROOT = findAppRoot(__dirname);
@@ -172,10 +176,24 @@ export async function writeSnapshot(projects: ProjectListItem[]): Promise<void>
}
}
// Broadcast progress to all connected WebSocket clients
function broadcastProgress(progress: ProgressUpdate) {
const message = JSON.stringify({
type: 'loading_progress',
...progress,
});
connectedClients.forEach((client: any) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
/**
* Reads all projects from DB and returns provider-bucketed session summaries.
*/
export async function getProjectsWithSessions(progressCallback: ProgressCallback = null): Promise<ProjectListItem[]> {
export async function getProjectsWithSessions(): Promise<ProjectListItem[]> {
await sessionSynchronizerService.synchronizeSessions();
const projectRows = projectsDb.getProjectPaths() as Array<{
@@ -193,14 +211,12 @@ export async function getProjectsWithSessions(progressCallback: ProgressCallback
const projectId = row.project_id;
const projectPath = row.project_path;
if (progressCallback) {
progressCallback({
phase: 'loading',
current: processedProjects,
total: totalProjects,
currentProject: projectPath,
});
}
broadcastProgress({
phase: 'loading',
current: processedProjects,
total: totalProjects,
currentProject: projectPath,
});
const displayName =
row.custom_project_name && row.custom_project_name.trim().length > 0
@@ -227,13 +243,11 @@ export async function getProjectsWithSessions(progressCallback: ProgressCallback
});
}
if (progressCallback) {
progressCallback({
phase: 'complete',
current: totalProjects,
total: totalProjects,
});
}
broadcastProgress({
phase: 'complete',
current: totalProjects,
total: totalProjects,
});
await writeSnapshot(projects);
return projects;

View File

@@ -61,19 +61,19 @@ export class ClaudeSessionSynchronizer implements IProviderSessionSynchronizer {
/**
* Parses and upserts one Claude session JSONL file.
*/
async synchronizeFile(filePath: string): Promise<boolean> {
async synchronizeFile(filePath: string): Promise<string | null> {
if (!filePath.endsWith('.jsonl')) {
return false;
return null;
}
const nameMap = await buildLookupMap(path.join(this.claudeHome, 'history.jsonl'), 'sessionId', 'display');
const parsed = await this.processSessionFile(filePath, nameMap);
if (!parsed) {
return false;
return null;
}
const timestamps = await readFileTimestamps(filePath);
sessionsDb.createSession(
return sessionsDb.createSession(
parsed.sessionId,
this.provider,
parsed.projectPath,
@@ -82,8 +82,6 @@ export class ClaudeSessionSynchronizer implements IProviderSessionSynchronizer {
timestamps.updatedAt,
filePath
);
return true;
}
/**

View File

@@ -61,19 +61,19 @@ export class CodexSessionSynchronizer implements IProviderSessionSynchronizer {
/**
* Parses and upserts one Codex session JSONL file.
*/
async synchronizeFile(filePath: string): Promise<boolean> {
async synchronizeFile(filePath: string): Promise<string | null> {
if (!filePath.endsWith('.jsonl')) {
return false;
return null;
}
const nameMap = await buildLookupMap(path.join(this.codexHome, 'session_index.jsonl'), 'id', 'thread_name');
const parsed = await this.processSessionFile(filePath, nameMap);
if (!parsed) {
return false;
return null;
}
const timestamps = await readFileTimestamps(filePath);
sessionsDb.createSession(
return sessionsDb.createSession(
parsed.sessionId,
this.provider,
parsed.projectPath,
@@ -82,8 +82,6 @@ export class CodexSessionSynchronizer implements IProviderSessionSynchronizer {
timestamps.updatedAt,
filePath
);
return true;
}
/**

View File

@@ -91,18 +91,18 @@ export class CursorSessionSynchronizer implements IProviderSessionSynchronizer {
/**
* Parses and upserts one Cursor session JSONL file.
*/
async synchronizeFile(filePath: string): Promise<boolean> {
async synchronizeFile(filePath: string): Promise<string | null> {
if (!filePath.endsWith('.jsonl')) {
return false;
return null;
}
const parsed = await this.processSessionFile(filePath);
if (!parsed) {
return false;
return null;
}
const timestamps = await readFileTimestamps(filePath);
sessionsDb.createSession(
return sessionsDb.createSession(
parsed.sessionId,
this.provider,
parsed.projectPath,
@@ -111,8 +111,6 @@ export class CursorSessionSynchronizer implements IProviderSessionSynchronizer {
timestamps.updatedAt,
filePath
);
return true;
}
/**

View File

@@ -72,25 +72,25 @@ export class GeminiSessionSynchronizer implements IProviderSessionSynchronizer {
/**
* Parses and upserts one Gemini session JSON artifact.
*/
async synchronizeFile(filePath: string): Promise<boolean> {
async synchronizeFile(filePath: string): Promise<string | null> {
if (!filePath.endsWith('.json')) {
return false;
return null;
}
if (
filePath.startsWith(path.join(this.geminiHome, 'tmp'))
&& !filePath.includes(`${path.sep}chats${path.sep}`)
) {
return false;
return null;
}
const parsed = await this.processSessionFile(filePath);
if (!parsed) {
return false;
return null;
}
const timestamps = await readFileTimestamps(filePath);
sessionsDb.createSession(
return sessionsDb.createSession(
parsed.sessionId,
this.provider,
parsed.projectPath,
@@ -99,8 +99,6 @@ export class GeminiSessionSynchronizer implements IProviderSessionSynchronizer {
timestamps.updatedAt,
filePath
);
return true;
}
/**

View File

@@ -168,10 +168,14 @@ export const sessionSynchronizerService = {
async synchronizeProviderFile(
provider: LLMProvider,
filePath: string
): Promise<{ provider: LLMProvider; indexed: boolean }> {
): Promise<{ provider: LLMProvider; indexed: boolean; sessionId: string | null }> {
const resolvedProvider = providerRegistry.resolveProvider(provider);
const indexed = await resolvedProvider.sessionSynchronizer.synchronizeFile(filePath);
return { provider, indexed };
const sessionId = await resolvedProvider.sessionSynchronizer.synchronizeFile(filePath);
return {
provider,
indexed: Boolean(sessionId),
sessionId,
};
},
/**

View File

@@ -6,6 +6,8 @@ import chokidar, { type FSWatcher } from 'chokidar';
import { sessionSynchronizerService } from '@/modules/providers/services/session-synchronizer.service.js';
import type { LLMProvider } from '@/shared/types.js';
import { getProjectsWithSessions } from '@/modules/projects/index.js';
import { connectedClients } from '@/index.js';
type WatcherEventType = 'add' | 'change';
@@ -43,6 +45,7 @@ const WATCHER_IGNORED_PATTERNS = [
];
const watchers: FSWatcher[] = [];
const WS_OPEN_STATE = 1;
/**
* Filters watcher events to provider-specific session artifact file types.
@@ -69,9 +72,31 @@ async function onUpdate(
try {
const result = await sessionSynchronizerService.synchronizeProviderFile(provider, filePath);
// Get updated projects list
const updatedProjects = await getProjectsWithSessions();
// Notify all connected clients about the project changes
const updateMessage = JSON.stringify({
type: 'projects_updated',
projects: updatedProjects,
timestamp: new Date().toISOString(),
changeType: eventType,
updatedSessionId: result.sessionId ?? undefined,
watchProvider: provider
});
connectedClients.forEach(client => {
if (client.readyState === WS_OPEN_STATE) {
client.send(updateMessage);
}
});
console.log(`Session watcher sync complete for provider "${provider}" after ${eventType}`, {
filePath,
indexed: result.indexed,
sessionId: result.sessionId,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);

View File

@@ -88,5 +88,5 @@ export interface IProviderSessionSynchronizer {
/**
* Parses and upserts one provider artifact file without running a full scan.
*/
synchronizeFile(filePath: string): Promise<boolean>;
synchronizeFile(filePath: string): Promise<string | null>;
}