refactor: implement session synchronizer interfaces and logic for multiple providers

This commit is contained in:
Haileyesus
2026-04-07 14:11:27 +03:00
parent 664713776a
commit b54a2839e3
16 changed files with 81 additions and 91 deletions

View File

@@ -1,6 +1,7 @@
import type {
IProvider,
IProviderMcpRuntime,
IProviderSessionSynchronizerRuntime,
IProviderSkillsRuntime,
MutableProviderSession,
ProviderCapabilities,
@@ -23,6 +24,7 @@ export abstract class AbstractProvider implements IProvider {
readonly capabilities: ProviderCapabilities;
abstract readonly mcp: IProviderMcpRuntime;
abstract readonly skills: IProviderSkillsRuntime;
abstract readonly sessionSynchronizer: IProviderSessionSynchronizerRuntime;
protected readonly sessions = new Map<string, MutableProviderSession>();

View File

@@ -8,8 +8,8 @@ import {
findFilesRecursivelyCreatedAfter,
normalizeSessionName,
readFileTimestamps,
} from '@/modules/ai-runtime/session-indexers/session-indexer.utils.js';
import type { ISessionIndexer } from '@/modules/ai-runtime/session-indexers/session-indexer.interface.js';
} from '@/modules/ai-runtime/providers/shared/session-synchronizer/session-synchronizer.utils.js';
import type { IProviderSessionSynchronizerRuntime } from '@/modules/ai-runtime/types/index.js';
type ParsedSession = {
sessionId: string;
@@ -20,19 +20,19 @@ type ParsedSession = {
/**
* Session indexer for Claude transcript artifacts.
*/
export class ClaudeSessionIndexer implements ISessionIndexer {
readonly provider = 'claude' as const;
export class ClaudeSessionSynchronizerRuntime implements IProviderSessionSynchronizerRuntime {
private readonly provider = 'claude' as const;
private readonly claudeHome = path.join(os.homedir(), '.claude');
/**
* Scans ~/.claude projects and upserts discovered sessions into DB.
*/
async synchronize(lastScanAt: Date | null): Promise<number> {
async synchronize(since?: Date): Promise<number> {
const nameMap = await buildLookupMap(path.join(this.claudeHome, 'history.jsonl'), 'sessionId', 'display');
const files = await findFilesRecursivelyCreatedAfter(
path.join(this.claudeHome, 'projects'),
'.jsonl',
lastScanAt,
since ?? null,
);
let processed = 0;

View File

@@ -10,6 +10,7 @@ import { readFile } from 'node:fs/promises';
import { BaseSdkProvider } from '@/modules/ai-runtime/providers/base/base-sdk.provider.js';
import type {
IProviderMcpRuntime,
IProviderSessionSynchronizerRuntime,
IProviderSkillsRuntime,
ProviderModel,
ProviderSessionEvent,
@@ -18,6 +19,7 @@ import type {
} from '@/modules/ai-runtime/types/index.js';
import { ClaudeMcpRuntime } from '@/modules/ai-runtime/providers/claude/claude-mcp.runtime.js';
import { ClaudeSkillsRuntime } from '@/modules/ai-runtime/providers/claude/claude-skills.runtime.js';
import { ClaudeSessionSynchronizerRuntime } from '@/modules/ai-runtime/providers/claude/claude-session-synchronizer.runtime.js';
type ClaudeExecutionInput = StartSessionInput & {
sessionId: string;
@@ -75,6 +77,7 @@ const readString = (value: unknown): string | undefined => {
export class ClaudeProvider extends BaseSdkProvider {
readonly mcp: IProviderMcpRuntime = new ClaudeMcpRuntime();
readonly skills: IProviderSkillsRuntime = new ClaudeSkillsRuntime();
readonly sessionSynchronizer: IProviderSessionSynchronizerRuntime = new ClaudeSessionSynchronizerRuntime();
constructor() {
super('claude', {

View File

@@ -8,8 +8,8 @@ import {
findFilesRecursivelyCreatedAfter,
normalizeSessionName,
readFileTimestamps,
} from '@/modules/ai-runtime/session-indexers/session-indexer.utils.js';
import type { ISessionIndexer } from '@/modules/ai-runtime/session-indexers/session-indexer.interface.js';
} from '@/modules/ai-runtime/providers/shared/session-synchronizer/session-synchronizer.utils.js';
import type { IProviderSessionSynchronizerRuntime } from '@/modules/ai-runtime/types/index.js';
type ParsedSession = {
sessionId: string;
@@ -20,19 +20,19 @@ type ParsedSession = {
/**
* Session indexer for Codex transcript artifacts.
*/
export class CodexSessionIndexer implements ISessionIndexer {
readonly provider = 'codex' as const;
export class CodexSessionSynchronizerRuntime implements IProviderSessionSynchronizerRuntime {
private readonly provider = 'codex' as const;
private readonly codexHome = path.join(os.homedir(), '.codex');
/**
* Scans ~/.codex sessions and upserts discovered sessions into DB.
*/
async synchronize(lastScanAt: Date | null): Promise<number> {
async synchronize(since?: Date): Promise<number> {
const nameMap = await buildLookupMap(path.join(this.codexHome, 'session_index.jsonl'), 'id', 'thread_name');
const files = await findFilesRecursivelyCreatedAfter(
path.join(this.codexHome, 'sessions'),
'.jsonl',
lastScanAt,
since ?? null,
);
let processed = 0;

View File

@@ -5,6 +5,7 @@ import { readFile } from 'node:fs/promises';
import { BaseSdkProvider } from '@/modules/ai-runtime/providers/base/base-sdk.provider.js';
import type {
IProviderMcpRuntime,
IProviderSessionSynchronizerRuntime,
IProviderSkillsRuntime,
ProviderModel,
ProviderSessionEvent,
@@ -12,6 +13,7 @@ import type {
} from '@/modules/ai-runtime/types/index.js';
import { CodexMcpRuntime } from '@/modules/ai-runtime/providers/codex/codex-mcp.runtime.js';
import { CodexSkillsRuntime } from '@/modules/ai-runtime/providers/codex/codex-skills.runtime.js';
import { CodexSessionSynchronizerRuntime } from '@/modules/ai-runtime/providers/codex/codex-session-synchronizer.runtime.js';
import { AppError } from '@/shared/utils/app-error.js';
type CodexExecutionInput = StartSessionInput & {
@@ -67,6 +69,7 @@ type CodexSdkModule = {
export class CodexProvider extends BaseSdkProvider {
readonly mcp: IProviderMcpRuntime = new CodexMcpRuntime();
readonly skills: IProviderSkillsRuntime = new CodexSkillsRuntime();
readonly sessionSynchronizer: IProviderSessionSynchronizerRuntime = new CodexSessionSynchronizerRuntime();
private codexClientPromise: Promise<CodexSdkClient> | null = null;

View File

@@ -11,8 +11,8 @@ import {
listDirectoryEntriesSafe,
normalizeSessionName,
readFileTimestamps,
} from '@/modules/ai-runtime/session-indexers/session-indexer.utils.js';
import type { ISessionIndexer } from '@/modules/ai-runtime/session-indexers/session-indexer.interface.js';
} from '@/modules/ai-runtime/providers/shared/session-synchronizer/session-synchronizer.utils.js';
import type { IProviderSessionSynchronizerRuntime } from '@/modules/ai-runtime/types/index.js';
type ParsedSession = {
sessionId: string;
@@ -23,14 +23,14 @@ type ParsedSession = {
/**
* Session indexer for Cursor transcript artifacts.
*/
export class CursorSessionIndexer implements ISessionIndexer {
readonly provider = 'cursor' as const;
export class CursorSessionSynchronizerRuntime implements IProviderSessionSynchronizerRuntime {
private readonly provider = 'cursor' as const;
private readonly cursorHome = path.join(os.homedir(), '.cursor');
/**
* Scans Cursor chats and upserts discovered sessions into DB.
*/
async synchronize(lastScanAt: Date | null): Promise<number> {
async synchronize(since?: Date): Promise<number> {
const projectsDir = path.join(this.cursorHome, 'projects');
const projectEntries = await listDirectoryEntriesSafe(projectsDir);
const seenWorkspacePaths = new Set<string>();
@@ -50,7 +50,7 @@ export class CursorSessionIndexer implements ISessionIndexer {
seenWorkspacePaths.add(workspacePath);
const workspaceHash = this.md5(workspacePath);
const chatsDir = path.join(this.cursorHome, 'chats', workspaceHash);
const files = await findFilesRecursivelyCreatedAfter(chatsDir, '.jsonl', lastScanAt);
const files = await findFilesRecursivelyCreatedAfter(chatsDir, '.jsonl', since ?? null);
for (const filePath of files) {
const parsed = await this.processSessionFile(filePath);

View File

@@ -1,12 +1,14 @@
import { BaseCliProvider } from '@/modules/ai-runtime/providers/base/base-cli.provider.js';
import type {
IProviderMcpRuntime,
IProviderSessionSynchronizerRuntime,
IProviderSkillsRuntime,
ProviderModel,
StartSessionInput,
} from '@/modules/ai-runtime/types/index.js';
import { CursorMcpRuntime } from '@/modules/ai-runtime/providers/cursor/cursor-mcp.runtime.js';
import { CursorSkillsRuntime } from '@/modules/ai-runtime/providers/cursor/cursor-skills.runtime.js';
import { CursorSessionSynchronizerRuntime } from '@/modules/ai-runtime/providers/cursor/cursor-session-synchronizer.runtime.js';
type CursorExecutionInput = StartSessionInput & {
sessionId: string;
@@ -23,6 +25,7 @@ const ANSI_REGEX =
export class CursorProvider extends BaseCliProvider {
readonly mcp: IProviderMcpRuntime = new CursorMcpRuntime();
readonly skills: IProviderSkillsRuntime = new CursorSkillsRuntime();
readonly sessionSynchronizer: IProviderSessionSynchronizerRuntime = new CursorSessionSynchronizerRuntime();
constructor() {
super('cursor', {

View File

@@ -7,8 +7,8 @@ import {
findFilesRecursivelyCreatedAfter,
normalizeSessionName,
readFileTimestamps,
} from '@/modules/ai-runtime/session-indexers/session-indexer.utils.js';
import type { ISessionIndexer } from '@/modules/ai-runtime/session-indexers/session-indexer.interface.js';
} from '@/modules/ai-runtime/providers/shared/session-synchronizer/session-synchronizer.utils.js';
import type { IProviderSessionSynchronizerRuntime } from '@/modules/ai-runtime/types/index.js';
type ParsedSession = {
sessionId: string;
@@ -19,23 +19,23 @@ type ParsedSession = {
/**
* Session indexer for Gemini transcript artifacts.
*/
export class GeminiSessionIndexer implements ISessionIndexer {
readonly provider = 'gemini' as const;
export class GeminiSessionSynchronizerRuntime implements IProviderSessionSynchronizerRuntime {
private readonly provider = 'gemini' as const;
private readonly geminiHome = path.join(os.homedir(), '.gemini');
/**
* Scans Gemini session JSON files and upserts discovered sessions into DB.
*/
async synchronize(lastScanAt: Date | null): Promise<number> {
async synchronize(since?: Date): Promise<number> {
const legacySessionFiles = await findFilesRecursivelyCreatedAfter(
path.join(this.geminiHome, 'sessions'),
'.json',
lastScanAt,
since ?? null,
);
const tempFiles = await findFilesRecursivelyCreatedAfter(
path.join(this.geminiHome, 'tmp'),
'.json',
lastScanAt,
since ?? null,
);
const files = [...legacySessionFiles, ...tempFiles];

View File

@@ -1,12 +1,14 @@
import { BaseCliProvider } from '@/modules/ai-runtime/providers/base/base-cli.provider.js';
import type {
IProviderMcpRuntime,
IProviderSessionSynchronizerRuntime,
IProviderSkillsRuntime,
ProviderModel,
StartSessionInput,
} from '@/modules/ai-runtime/types/index.js';
import { GeminiMcpRuntime } from '@/modules/ai-runtime/providers/gemini/gemini-mcp.runtime.js';
import { GeminiSkillsRuntime } from '@/modules/ai-runtime/providers/gemini/gemini-skills.runtime.js';
import { GeminiSessionSynchronizerRuntime } from '@/modules/ai-runtime/providers/gemini/gemini-session-synchronizer.runtime.js';
type GeminiExecutionInput = StartSessionInput & {
sessionId: string;
@@ -31,6 +33,7 @@ const GEMINI_MODELS: ProviderModel[] = [
export class GeminiProvider extends BaseCliProvider {
readonly mcp: IProviderMcpRuntime = new GeminiMcpRuntime();
readonly skills: IProviderSkillsRuntime = new GeminiSkillsRuntime();
readonly sessionSynchronizer: IProviderSessionSynchronizerRuntime = new GeminiSessionSynchronizerRuntime();
constructor() {
super('gemini', {

View File

@@ -5,7 +5,7 @@ import { scanStateDb } from '@/shared/database/repositories/scan-state.db.js';
import { sessionsDb } from '@/shared/database/repositories/sessions.db.js';
import type { LLMProvider } from '@/shared/types/app.js';
import { AppError } from '@/shared/utils/app-error.js';
import { sessionIndexers } from '@/modules/ai-runtime/session-indexers/index.js';
import { llmProviderRegistry } from '@/modules/ai-runtime/ai-runtime.registry.js';
import { llmMessagesUnifier, type UnifiedChatMessage } from '@/modules/ai-runtime/services/messages-unifier.service.js';
type SyncResult = {
@@ -119,12 +119,11 @@ export const llmSessionsService = {
};
const failures: string[] = [];
const results = await Promise.allSettled(
sessionIndexers.map(async (indexer) => ({
provider: indexer.provider,
processed: await indexer.synchronize(lastScanAt),
})),
);
// Provider-specific session indexers used by the sync orchestrator.
const results = await Promise.allSettled(llmProviderRegistry.listProviders().map(async (provider) => ({
provider: provider.id,
processed: await provider.sessionSynchronizer.synchronize(lastScanAt ?? undefined),
})));
for (const result of results) {
if (result.status === 'fulfilled') {
@@ -151,19 +150,15 @@ export const llmSessionsService = {
provider: LLMProvider,
filePath: string,
): Promise<{ provider: LLMProvider; indexed: boolean }> {
const indexer = sessionIndexers.find((entry) => entry.provider === provider);
if (!indexer) {
const resolvedProvider = llmProviderRegistry.listProviders().find((entry) => entry.id === provider);
if (!resolvedProvider) {
throw new AppError(`No session indexer registered for provider "${provider}".`, {
code: 'SESSION_INDEXER_NOT_FOUND',
statusCode: 500,
});
}
if (!indexer.synchronizeFile) {
return { provider, indexed: false };
}
const indexed = await indexer.synchronizeFile(filePath);
const indexed = await resolvedProvider.sessionSynchronizer.synchronizeFile(filePath);
return { provider, indexed };
},

View File

@@ -1,15 +0,0 @@
import type { ISessionIndexer } from '@/modules/ai-runtime/session-indexers/session-indexer.interface.js';
import { ClaudeSessionIndexer } from '@/modules/ai-runtime/session-indexers/claude.session-indexer.js';
import { CodexSessionIndexer } from '@/modules/ai-runtime/session-indexers/codex.session-indexer.js';
import { CursorSessionIndexer } from '@/modules/ai-runtime/session-indexers/cursor.session-indexer.js';
import { GeminiSessionIndexer } from '@/modules/ai-runtime/session-indexers/gemini.session-indexer.js';
/**
* Provider-specific session indexers used by the sync orchestrator.
*/
export const sessionIndexers: ISessionIndexer[] = [
new ClaudeSessionIndexer(),
new CodexSessionIndexer(),
new CursorSessionIndexer(),
new GeminiSessionIndexer(),
];

View File

@@ -7,10 +7,9 @@ import test from 'node:test';
import { AppError } from '@/shared/utils/app-error.js';
import { scanStateDb } from '@/shared/database/repositories/scan-state.db.js';
import { sessionsDb } from '@/shared/database/repositories/sessions.db.js';
import { llmProviderRegistry } from '@/modules/ai-runtime/ai-runtime.registry.js';
import { llmSessionsService } from '@/modules/ai-runtime/services/sessions.service.js';
import { sessionIndexers } from '@/modules/ai-runtime/session-indexers/index.js';
import { conversationSearchService } from '@/modules/conversations/conversation-search.service.js';
import type { ISessionIndexer } from '@/modules/ai-runtime/session-indexers/session-indexer.interface.js';
const patchMethod = <T extends object, K extends keyof T>(target: T, key: K, replacement: T[K]) => {
const original = target[key];
@@ -20,14 +19,6 @@ const patchMethod = <T extends object, K extends keyof T>(target: T, key: K, rep
};
};
const patchIndexers = (nextIndexers: ISessionIndexer[]) => {
const originalIndexers = [...sessionIndexers];
sessionIndexers.splice(0, sessionIndexers.length, ...nextIndexers);
return () => {
sessionIndexers.splice(0, sessionIndexers.length, ...originalIndexers);
};
};
// This test covers multi-provider synchronization orchestration and failure aggregation.
test('llmSessionsService.synchronizeSessions aggregates processed counts and failures', { concurrency: false }, async () => {
let updateLastScannedAtCalls = 0;
@@ -35,20 +26,24 @@ test('llmSessionsService.synchronizeSessions aggregates processed counts and fai
const restoreUpdateScanDate = patchMethod(scanStateDb, 'updateLastScannedAt', () => {
updateLastScannedAtCalls += 1;
});
const restoreIndexers = patchIndexers([
const restoreProviders = patchMethod(llmProviderRegistry, 'listProviders', () => ([
{
provider: 'claude',
async synchronize() {
return 3;
id: 'claude',
sessionSynchronizer: {
async synchronize() {
return 3;
},
},
},
{
provider: 'codex',
async synchronize() {
throw new Error('codex index failed');
id: 'codex',
sessionSynchronizer: {
async synchronize() {
throw new Error('codex index failed');
},
},
},
]);
] as any));
try {
const result = await llmSessionsService.synchronizeSessions();
@@ -60,7 +55,7 @@ test('llmSessionsService.synchronizeSessions aggregates processed counts and fai
assert.equal(result.failures[0], 'codex index failed');
assert.equal(updateLastScannedAtCalls, 1);
} finally {
restoreIndexers();
restoreProviders();
restoreUpdateScanDate();
restoreScanDate();
}
@@ -70,19 +65,21 @@ test('llmSessionsService.synchronizeSessions aggregates processed counts and fai
test('llmSessionsService.synchronizeProviderFile delegates to provider indexer file sync', { concurrency: false }, async () => {
let synchronizeCalls = 0;
let synchronizeFilePath: string | null = null;
const restoreIndexers = patchIndexers([
const restoreProviders = patchMethod(llmProviderRegistry, 'listProviders', () => ([
{
provider: 'claude',
async synchronize() {
synchronizeCalls += 1;
return 0;
},
async synchronizeFile(filePath: string) {
synchronizeFilePath = filePath;
return true;
id: 'claude',
sessionSynchronizer: {
async synchronize() {
synchronizeCalls += 1;
return 0;
},
async synchronizeFile(filePath: string) {
synchronizeFilePath = filePath;
return true;
},
},
},
]);
] as any));
try {
const result = await llmSessionsService.synchronizeProviderFile('claude', '/tmp/claude-session.jsonl');
@@ -91,7 +88,7 @@ test('llmSessionsService.synchronizeProviderFile delegates to provider indexer f
assert.equal(synchronizeFilePath, '/tmp/claude-session.jsonl');
assert.equal(synchronizeCalls, 0);
} finally {
restoreIndexers();
restoreProviders();
}
});

View File

@@ -1,3 +1,4 @@
export * from '@/modules/ai-runtime/types/provider.types.js';
export * from '@/modules/ai-runtime/types/mcp.types.js';
export * from '@/modules/ai-runtime/types/skills.types.js';
export * from '@/modules/ai-runtime/types/session-synchronizer.types.js';

View File

@@ -1,6 +1,7 @@
import type { LLMProvider } from '@/shared/types/app.js';
import type { IProviderMcpRuntime } from '@/modules/ai-runtime/types/mcp.types.js';
import type { IProviderSkillsRuntime } from '@/modules/ai-runtime/types/skills.types.js';
import type { IProviderSessionSynchronizerRuntime } from '@/modules/ai-runtime/types/session-synchronizer.types.js';
export type ProviderExecutionFamily = 'sdk' | 'cli';
@@ -78,6 +79,7 @@ export interface IProvider {
readonly capabilities: ProviderCapabilities;
readonly mcp: IProviderMcpRuntime;
readonly skills: IProviderSkillsRuntime;
readonly sessionSynchronizer: IProviderSessionSynchronizerRuntime;
listModels(): Promise<ProviderModel[]>;

View File

@@ -1,18 +1,14 @@
import type { LLMProvider } from '@/shared/types/app.js';
/**
* Contract for provider-specific session indexing logic.
*/
export interface ISessionIndexer {
readonly provider: LLMProvider;
export interface IProviderSessionSynchronizerRuntime {
/**
* Scans provider session artifacts and upserts discovered sessions into DB.
*/
synchronize(lastScanAt: Date | null): Promise<number>;
synchronize(since?: Date): Promise<number>;
/**
* Parses and upserts one provider artifact file without running a full directory scan.
*/
synchronizeFile?(filePath: string): Promise<boolean>;
synchronizeFile(filePath: string): Promise<boolean>;
}