fix: on file update, no directory rescan is needed

This commit is contained in:
Haileyesus
2026-04-06 22:18:08 +03:00
parent 28aa5a3902
commit f576b8e6d2
17 changed files with 372 additions and 126 deletions

View File

@@ -0,0 +1,190 @@
# LLM Module Structure (Refactor Runtime)
This document describes the current backend structure under `server/src/modules/llm`, how execution/session state works, and how the provider abstraction is designed.
## High-Level Layout
```text
server/src/modules/llm/
llm.routes.ts
llm.registry.ts
providers/
provider.interface.ts
abstract.provider.ts
base-sdk.provider.ts
base-cli.provider.ts
claude.provider.ts
codex.provider.ts
cursor.provider.ts
gemini.provider.ts
services/
llm.service.ts
sessions.service.ts
sessions-watcher.service.ts
messages-unifier.service.ts
assets.service.ts
mcp.service.ts
skills.service.ts
session-indexers/
session-indexer.interface.ts
session-indexer.utils.ts
claude.session-indexer.ts
codex.session-indexer.ts
cursor.session-indexer.ts
gemini.session-indexer.ts
index.ts
tests/
llm-unifier.providers.test.ts
llm-unifier.sessions.test.ts
llm-unifier.images.test.ts
llm-unifier.mcp.test.ts
llm-unifier.skills.test.ts
llm-unifier.messages.test.ts
```
## Responsibilities By File Group
- `llm.routes.ts`
- HTTP API for provider runtime sessions (start/resume/stop/model/thinking), normalized session/history messages, assets upload, MCP config/probe, skills listing, indexed session CRUD/sync.
- `llm.registry.ts`
- Singleton provider registry. Instantiates one provider class per provider id.
- `providers/*`
- Runtime execution and live event collection.
- SDK family (`BaseSdkProvider`) for Claude/Codex.
- CLI family (`BaseCliProvider`) for Cursor/Gemini.
- `services/llm.service.ts`
- Input validation + capability gating + facade over provider registry.
- `services/sessions.service.ts`
- DB-backed indexed sessions and history file parsing.
- Returns normalized message history via `messages-unifier.service.ts`.
- `services/sessions-watcher.service.ts`
- `chokidar` watchers for provider artifact folders.
- On filesystem update, triggers `synchronizeProviderFile(provider, filePath)`.
- `services/messages-unifier.service.ts`
- Provider-specific raw event/history -> unified message contract for frontend.
- `services/assets.service.ts`
- Stores uploaded images in `.cloudcli/assets`.
- `services/mcp.service.ts`
- Unified MCP CRUD/probe across provider-native config formats/scopes/transports.
- `services/skills.service.ts`
- Provider-specific skill directory discovery and metadata extraction.
- `session-indexers/*`
- Scans provider artifacts from disk and upserts indexed sessions into `sessions` DB table.
## Runtime Flow (Provider Sessions)
1. `POST /api/llm/providers/:provider/sessions/start` hits `llm.routes.ts`.
2. Route calls `llmService.startSession(...)`.
3. `llm.service.ts` validates payload and capability constraints.
4. `llm.registry.ts` resolves provider instance.
5. Provider (`BaseSdkProvider` or `BaseCliProvider`) creates an in-memory session record and starts execution.
6. Stream/process output is appended as in-memory `ProviderSessionEvent[]`.
7. Route can either:
- return `202` immediately with snapshot, or
- await completion via `waitForSession`.
8. Snapshots are enriched with unified `messages` via `llmMessagesUnifier.normalizeSessionEvents(...)`.
## Indexed History Flow (Disk/DB)
1. Watcher or manual sync scans provider folders.
2. Provider-specific indexer extracts minimal metadata and upserts `sessionsDb`.
3. History endpoints (`/sessions/:sessionId/history`, `/sessions/:sessionId/messages`) read transcript path from DB.
4. JSON/JSONL is parsed and transformed via `llmMessagesUnifier.normalizeHistoryEntries(...)`.
## Interface + Abstract + Base-Class Design
### `IProvider` (interface)
`providers/provider.interface.ts`
- Consumer contract used by registry/service layer.
- Exposes:
- `launchSession`, `resumeSession`, `stopSession`, `waitForSession`
- `setSessionModel`, `setSessionThinkingMode`
- `getSession`, `listSessions`
- `listModels`
- Exposes `capabilities` so callers can gate unsupported features before calling provider-specific logic.
### `AbstractProvider` (abstract class)
`providers/abstract.provider.ts`
- Shared lifecycle state and rules:
- `sessions: Map<string, MutableProviderSession>`
- `sessionPreferences: Map<string, { model?, thinkingMode? }>`
- Implements:
- in-memory session reads (`getSession`, `listSessions`, `waitForSession`)
- stop handling + session status events
- model/thinking updates with capability checks
- event ring-buffer logic (`MAX_EVENT_BUFFER_SIZE`)
- Leaves provider execution specifics abstract (`listModels`, `launchSession`, `resumeSession`).
### `BaseSdkProvider` and `BaseCliProvider`
- `BaseSdkProvider`
- shared async iterable stream consumption.
- handles completion/error transitions and completion system event emission.
- `BaseCliProvider`
- shared child-process spawn + stdout/stderr line accumulation + JSON line parsing.
- graceful stop (`SIGTERM` then `SIGKILL`) and completion/error transitions.
### Concrete provider classes
- `ClaudeProvider` (SDK)
- uses `@anthropic-ai/claude-agent-sdk`.
- supports runtime permission requests and emits permission events.
- image payload support via base64 content blocks.
- `CodexProvider` (SDK)
- dynamic import of `@openai/codex-sdk`.
- supports text + `local_image` prompt items.
- `CursorProvider` (CLI)
- `cursor-agent` invocation builder + model list parsing.
- `GeminiProvider` (CLI)
- `gemini` invocation builder + curated model catalog.
## In-Memory Session Setup: How It Works
The in-memory part is inside `AbstractProvider` + base classes:
- Session record is created at launch/resume in memory (`Map`).
- Events are appended in real-time while stream/process runs.
- Snapshot endpoints read this map directly (`/providers/:provider/sessions...`).
- Stop/wait/model/thinking controls operate on this same in-memory handle.
- Completed sessions currently remain in map (bounded event history per session, but no map eviction).
Key characteristics:
- Process-local only (not shared across instances).
- Lost on server restart.
- Good for immediate live control and progress.
- Not the source of truth for historical transcripts (disk/DB is).
## Is In-Memory Session State Necessary, Or Useless?
Short answer: **not useless**, but **not sufficient as a durable architecture**.
### Why it is necessary in the current design
- You need live handles for:
- `stopSession` (abort process/stream now),
- `waitForSession`,
- real-time event buffering for immediate API responses.
- These are runtime concerns and cannot be satisfied by session-index DB rows alone.
### Where it is weak
- No eviction/pruning for completed session map entries.
- No persistence across restart.
- No cross-instance coordination (if horizontally scaled, only the owning instance can control that session).
### Practical conclusion
- Keep in-memory runtime state for **active execution control**.
- Treat DB/indexed history as the durable read model.
- If you need reliability across restarts/instances, move execution ownership to a durable worker/orchestrator and store live session metadata in a shared store.
## Suggested Hardening (Incremental)
1. Add session map eviction policy (TTL/LRU for completed/failed/stopped sessions).
2. Add ownership metadata (`instanceId`) if multiple backend instances will run.
3. Add explicit `activeSessions` metric endpoint.
4. Optionally persist minimal runtime state (status transitions + timestamps) to DB for auditability.

View File

@@ -6,7 +6,7 @@ import os from 'os';
import TOML from '@iarna/toml';
import { getCodexSessions } from '../../../projects.js';
import { sessionsDb } from '@/shared/database/repositories/sessions.db.js';
import { llmSessionsService } from '@/modules/llm/sessions.service.js';
import { llmSessionsService } from '@/modules/llm/services/sessions.service.js';
const router = express.Router();

View File

@@ -1,6 +1,6 @@
import express from 'express';
import sessionManager from '../../../sessionManager.js';
import { llmSessionsService } from '@/modules/llm/sessions.service.js';
import { llmSessionsService } from '@/modules/llm/services/sessions.service.js';
const router = express.Router();

View File

@@ -53,11 +53,15 @@ async function onUpdate(
filePath: string,
provider: LLMProvider,
): Promise<void> {
if (!isWatcherTargetFile(provider, filePath)) {
return;
}
try {
const result = await llmSessionsService.synchronizeProvider(provider, { fullRescan: true });
const result = await llmSessionsService.synchronizeProviderFile(provider, filePath);
logger.info(`LLM watcher sync complete for provider "${provider}" after ${eventType}`, {
filePath,
processed: result.processed,
indexed: result.indexed,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
@@ -69,6 +73,17 @@ async function onUpdate(
}
}
/**
* Filters watcher events to provider-specific transcript artifact file types.
*/
function isWatcherTargetFile(provider: LLMProvider, filePath: string): boolean {
if (provider === 'gemini') {
return filePath.endsWith('.json');
}
return filePath.endsWith('.jsonl');
}
/**
* Initializes LLM session watchers and performs an initial index sync.
*/

View File

@@ -145,12 +145,12 @@ export const llmSessionsService = {
},
/**
* Runs one provider indexer and updates `scan_state.last_scanned_at`.
* Indexes one provider artifact file without running a full provider rescan.
*/
async synchronizeProvider(
async synchronizeProviderFile(
provider: LLMProvider,
options: { fullRescan?: boolean } = {},
): Promise<{ provider: LLMProvider; processed: number }> {
filePath: string,
): Promise<{ provider: LLMProvider; indexed: boolean }> {
const indexer = sessionIndexers.find((entry) => entry.provider === provider);
if (!indexer) {
throw new AppError(`No session indexer registered for provider "${provider}".`, {
@@ -159,11 +159,12 @@ export const llmSessionsService = {
});
}
const lastScanAt = options.fullRescan ? null : scanStateDb.getLastScannedAt();
const processed = await indexer.synchronize(lastScanAt);
scanStateDb.updateLastScannedAt();
if (!indexer.synchronizeFile) {
return { provider, indexed: false };
}
return { provider, processed };
const indexed = await indexer.synchronizeFile(filePath);
return { provider, indexed };
},
updateSessionCustomName(sessionId: string, sessionCustomName: string): void {

View File

@@ -22,15 +22,15 @@ type ParsedSession = {
*/
export class ClaudeSessionIndexer implements ISessionIndexer {
readonly provider = 'claude' as const;
private readonly claudeHome = path.join(os.homedir(), '.claude');
/**
* Scans ~/.claude projects and upserts discovered sessions into DB.
*/
async synchronize(lastScanAt: Date | null): Promise<number> {
const claudeHome = path.join(os.homedir(), '.claude');
const nameMap = await buildLookupMap(path.join(claudeHome, 'history.jsonl'), 'sessionId', 'display');
const nameMap = await buildLookupMap(path.join(this.claudeHome, 'history.jsonl'), 'sessionId', 'display');
const files = await findFilesRecursivelyCreatedAfter(
path.join(claudeHome, 'projects'),
path.join(this.claudeHome, 'projects'),
'.jsonl',
lastScanAt,
);
@@ -58,6 +58,34 @@ export class ClaudeSessionIndexer implements ISessionIndexer {
return processed;
}
/**
* Parses and upserts one Claude session JSONL file.
*/
async synchronizeFile(filePath: string): Promise<boolean> {
if (!filePath.endsWith('.jsonl')) {
return false;
}
const nameMap = await buildLookupMap(path.join(this.claudeHome, 'history.jsonl'), 'sessionId', 'display');
const parsed = await this.processSessionFile(filePath, nameMap);
if (!parsed) {
return false;
}
const timestamps = await readFileTimestamps(filePath);
sessionsDb.createSession(
parsed.sessionId,
this.provider,
parsed.workspacePath,
parsed.sessionName,
timestamps.createdAt,
timestamps.updatedAt,
filePath,
);
return true;
}
/**
* Extracts session metadata from one Claude JSONL session file.
*/

View File

@@ -22,15 +22,15 @@ type ParsedSession = {
*/
export class CodexSessionIndexer implements ISessionIndexer {
readonly provider = 'codex' as const;
private readonly codexHome = path.join(os.homedir(), '.codex');
/**
* Scans ~/.codex sessions and upserts discovered sessions into DB.
*/
async synchronize(lastScanAt: Date | null): Promise<number> {
const codexHome = path.join(os.homedir(), '.codex');
const nameMap = await buildLookupMap(path.join(codexHome, 'session_index.jsonl'), 'id', 'thread_name');
const nameMap = await buildLookupMap(path.join(this.codexHome, 'session_index.jsonl'), 'id', 'thread_name');
const files = await findFilesRecursivelyCreatedAfter(
path.join(codexHome, 'sessions'),
path.join(this.codexHome, 'sessions'),
'.jsonl',
lastScanAt,
);
@@ -58,6 +58,34 @@ export class CodexSessionIndexer implements ISessionIndexer {
return processed;
}
/**
* Parses and upserts one Codex session JSONL file.
*/
async synchronizeFile(filePath: string): Promise<boolean> {
if (!filePath.endsWith('.jsonl')) {
return false;
}
const nameMap = await buildLookupMap(path.join(this.codexHome, 'session_index.jsonl'), 'id', 'thread_name');
const parsed = await this.processSessionFile(filePath, nameMap);
if (!parsed) {
return false;
}
const timestamps = await readFileTimestamps(filePath);
sessionsDb.createSession(
parsed.sessionId,
this.provider,
parsed.workspacePath,
parsed.sessionName,
timestamps.createdAt,
timestamps.updatedAt,
filePath,
);
return true;
}
/**
* Extracts session metadata from one Codex JSONL session file.
*/

View File

@@ -25,13 +25,13 @@ type ParsedSession = {
*/
export class CursorSessionIndexer implements ISessionIndexer {
readonly provider = 'cursor' as const;
private readonly cursorHome = path.join(os.homedir(), '.cursor');
/**
* Scans Cursor chats and upserts discovered sessions into DB.
*/
async synchronize(lastScanAt: Date | null): Promise<number> {
const cursorHome = path.join(os.homedir(), '.cursor');
const projectsDir = path.join(cursorHome, 'projects');
const projectsDir = path.join(this.cursorHome, 'projects');
const projectEntries = await listDirectoryEntriesSafe(projectsDir);
const seenWorkspacePaths = new Set<string>();
@@ -49,7 +49,7 @@ export class CursorSessionIndexer implements ISessionIndexer {
seenWorkspacePaths.add(workspacePath);
const workspaceHash = this.md5(workspacePath);
const chatsDir = path.join(cursorHome, 'chats', workspaceHash);
const chatsDir = path.join(this.cursorHome, 'chats', workspaceHash);
const files = await findFilesRecursivelyCreatedAfter(chatsDir, '.jsonl', lastScanAt);
for (const filePath of files) {
@@ -75,6 +75,33 @@ export class CursorSessionIndexer implements ISessionIndexer {
return processed;
}
/**
* Parses and upserts one Cursor session JSONL file.
*/
async synchronizeFile(filePath: string): Promise<boolean> {
if (!filePath.endsWith('.jsonl')) {
return false;
}
const parsed = await this.processSessionFile(filePath);
if (!parsed) {
return false;
}
const timestamps = await readFileTimestamps(filePath);
sessionsDb.createSession(
parsed.sessionId,
this.provider,
parsed.workspacePath,
parsed.sessionName,
timestamps.createdAt,
timestamps.updatedAt,
filePath,
);
return true;
}
/**
* Produces the same workspace hash Cursor uses in chat directory names.
*/

View File

@@ -21,19 +21,19 @@ type ParsedSession = {
*/
export class GeminiSessionIndexer implements ISessionIndexer {
readonly provider = 'gemini' as const;
private readonly geminiHome = path.join(os.homedir(), '.gemini');
/**
* Scans Gemini session JSON files and upserts discovered sessions into DB.
*/
async synchronize(lastScanAt: Date | null): Promise<number> {
const geminiHome = path.join(os.homedir(), '.gemini');
const legacySessionFiles = await findFilesRecursivelyCreatedAfter(
path.join(geminiHome, 'sessions'),
path.join(this.geminiHome, 'sessions'),
'.json',
lastScanAt,
);
const tempFiles = await findFilesRecursivelyCreatedAfter(
path.join(geminiHome, 'tmp'),
path.join(this.geminiHome, 'tmp'),
'.json',
lastScanAt,
);
@@ -42,7 +42,7 @@ export class GeminiSessionIndexer implements ISessionIndexer {
let processed = 0;
for (const filePath of files) {
if (
filePath.startsWith(path.join(geminiHome, 'tmp')) &&
filePath.startsWith(path.join(this.geminiHome, 'tmp')) &&
!filePath.includes(`${path.sep}chats${path.sep}`)
) {
continue;
@@ -69,6 +69,40 @@ export class GeminiSessionIndexer implements ISessionIndexer {
return processed;
}
/**
* Parses and upserts one Gemini session JSON file.
*/
async synchronizeFile(filePath: string): Promise<boolean> {
if (!filePath.endsWith('.json')) {
return false;
}
if (
filePath.startsWith(path.join(this.geminiHome, 'tmp')) &&
!filePath.includes(`${path.sep}chats${path.sep}`)
) {
return false;
}
const parsed = await this.processSessionFile(filePath);
if (!parsed) {
return false;
}
const timestamps = await readFileTimestamps(filePath);
sessionsDb.createSession(
parsed.sessionId,
this.provider,
parsed.workspacePath,
parsed.sessionName,
timestamps.createdAt,
timestamps.updatedAt,
filePath,
);
return true;
}
/**
* Extracts session metadata from one Gemini JSON artifact.
*/

View File

@@ -10,4 +10,9 @@ export interface ISessionIndexer {
* Scans provider session artifacts and upserts discovered sessions into DB.
*/
synchronize(lastScanAt: Date | null): Promise<number>;
/**
* Parses and upserts one provider artifact file without running a full directory scan.
*/
synchronizeFile?(filePath: string): Promise<boolean>;
}

View File

@@ -74,6 +74,7 @@ export async function findFilesRecursivelyCreatedAfter(
fileList: string[] = [],
): Promise<string[]> {
try {
console.log("HEY THERE!")
const entries = await fsp.readdir(rootDir, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(rootDir, entry.name);

View File

@@ -66,36 +66,32 @@ test('llmSessionsService.synchronizeSessions aggregates processed counts and fai
}
});
// This test covers provider-specific sync behavior for both incremental and full-rescan modes.
test('llmSessionsService.synchronizeProvider honors fullRescan option', { concurrency: false }, async () => {
const observedScanDates: Array<Date | null> = [];
const restoreScanDate = patchMethod(scanStateDb, 'getLastScannedAt', () => new Date('2026-04-02T00:00:00.000Z'));
const restoreUpdateScanDate = patchMethod(scanStateDb, 'updateLastScannedAt', () => {});
// This test covers single-file indexing delegation used by the watcher (no full provider rescan).
test('llmSessionsService.synchronizeProviderFile delegates to provider indexer file sync', { concurrency: false }, async () => {
let synchronizeCalls = 0;
let synchronizeFilePath: string | null = null;
const restoreIndexers = patchIndexers([
{
provider: 'cursor',
async synchronize(lastScanAt) {
observedScanDates.push(lastScanAt);
return 7;
provider: 'claude',
async synchronize() {
synchronizeCalls += 1;
return 0;
},
async synchronizeFile(filePath: string) {
synchronizeFilePath = filePath;
return true;
},
},
]);
try {
const incremental = await llmSessionsService.synchronizeProvider('cursor');
const fullRescan = await llmSessionsService.synchronizeProvider('cursor', { fullRescan: true });
assert.equal(incremental.provider, 'cursor');
assert.equal(incremental.processed, 7);
assert.equal(fullRescan.provider, 'cursor');
assert.equal(fullRescan.processed, 7);
assert.equal(observedScanDates.length, 2);
assert.ok(observedScanDates[0] instanceof Date);
assert.equal(observedScanDates[1], null);
const result = await llmSessionsService.synchronizeProviderFile('claude', '/tmp/claude-session.jsonl');
assert.equal(result.provider, 'claude');
assert.equal(result.indexed, true);
assert.equal(synchronizeFilePath, '/tmp/claude-session.jsonl');
assert.equal(synchronizeCalls, 0);
} finally {
restoreIndexers();
restoreUpdateScanDate();
restoreScanDate();
}
});

View File

@@ -11,7 +11,7 @@ import {
} from '../../../projects.js';
import { sessionsDb } from '@/shared/database/repositories/sessions.db.js';
import { workspaceOriginalPathsDb } from '@/shared/database/repositories/workspace-original-paths.db.js';
import { llmSessionsService } from '@/modules/llm/sessions.service.js';
import { llmSessionsService } from '@/modules/llm/services/sessions.service.js';
import { authenticateToken } from '../auth/auth.middleware.js';
import { getWorkspaceNameFromPath, WORKSPACES_ROOT, validateWorkspacePath } from './projects.utils.js';

View File

@@ -4,13 +4,6 @@ import type {
ApiSuccessShape,
} from '@/shared/types/http.js';
export function createApiMeta(requestId?: string, startedAt?: string): ApiMeta {
return {
requestId,
startedAt,
};
}
export function createApiSuccessResponse<TData>(
data: TData,
meta?: ApiMeta

View File

@@ -1,30 +0,0 @@
import type { NextFunction, Request, Response } from 'express';
import { createApiErrorResponse, createApiMeta } from '@/shared/http/api-response.js';
import { getRequestContext } from '@/shared/http/request-context.js';
import { AppError } from '@/shared/utils/app-error.js';
import { logger } from '@/shared/utils/logger.js';
export function errorHandler(
error: Error,
req: Request,
res: Response,
_next: NextFunction
): void {
const appError = error instanceof AppError ? error : new AppError(error.message);
const context = getRequestContext(req);
const payload = createApiErrorResponse(
appError.code,
appError.message,
createApiMeta(context?.requestId, context?.startedAt),
appError.details
);
logger.error(appError.message, {
code: appError.code,
statusCode: appError.statusCode,
requestId: context?.requestId,
});
res.status(appError.statusCode).json(payload);
}

View File

@@ -1,15 +0,0 @@
import type { Request, Response } from 'express';
import { createApiErrorResponse, createApiMeta } from '@/shared/http/api-response.js';
import { getRequestContext } from '@/shared/http/request-context.js';
export function notFoundHandler(req: Request, res: Response): void {
const context = getRequestContext(req);
const payload = createApiErrorResponse(
'NOT_FOUND',
`Route not found: ${req.originalUrl}`,
createApiMeta(context?.requestId, context?.startedAt)
);
res.status(404).json(payload);
}

View File

@@ -1,27 +0,0 @@
import { randomUUID } from 'crypto';
import type { NextFunction, Request, Response } from 'express';
import type { RequestContext } from '@/shared/types/http.js';
type RequestWithContext = Request & {
context?: RequestContext;
};
export function getRequestContext(req: Request): RequestContext | undefined {
return (req as RequestWithContext).context;
}
// give every request a context with a unique ID and timestamp for tracking purposes
export function requestContextMiddleware(req: Request, res: Response, next: NextFunction): void {
const requestId = randomUUID();
const startedAt = new Date().toISOString();
const context: RequestContext = {
requestId,
startedAt,
};
(req as RequestWithContext).context = context;
(res.locals as Record<string, unknown>).requestId = requestId;
next();
}