Compare commits

..

2 Commits

Author SHA1 Message Date
Haileyesus
123ae31020 fix(chat): sort messages appropriately 2026-06-11 21:48:46 +03:00
Haileyesus
89f05247ed fix(shell): use correct session id 2026-06-11 21:04:31 +03:00
5 changed files with 247 additions and 59 deletions

View File

@@ -112,7 +112,17 @@ const wss = createWebSocketServer(server, {
getPendingApprovalsForSession,
},
shell: {
getSessionById: (sessionId) => sessionManager.getSession(sessionId),
resolveProviderSessionId: (sessionId, provider) => {
const dbSession = sessionsDb.getSessionById(sessionId);
const legacyGeminiSession =
provider === 'gemini' ? sessionManager.getSession(sessionId) : null;
if (dbSession) {
return dbSession.provider_session_id ?? legacyGeminiSession?.cliSessionId ?? null;
}
return legacyGeminiSession?.cliSessionId;
},
stripAnsiSequences,
normalizeDetectedUrl,
extractUrlsFromText,

View File

@@ -5,7 +5,6 @@ import path from 'node:path';
import pty, { type IPty } from 'node-pty';
import { WebSocket, type RawData } from 'ws';
import { sessionsDb } from '@/modules/database/index.js';
import { parseIncomingJsonObject } from '@/shared/utils.js';
type ShellIncomingMessage = {
@@ -36,7 +35,10 @@ const PTY_SESSION_TIMEOUT = 30 * 60 * 1000;
const SHELL_URL_PARSE_BUFFER_LIMIT = 32768;
type ShellWebSocketDependencies = {
getSessionById: (sessionId: string) => { cliSessionId?: string } | null | undefined;
resolveProviderSessionId: (
sessionId: string,
provider: string,
) => string | null | undefined;
stripAnsiSequences: (content: string) => string;
normalizeDetectedUrl: (url: string) => string | null;
extractUrlsFromText: (content: string) => string[];
@@ -79,36 +81,32 @@ function parseShellMessage(rawMessage: RawData): ShellIncomingMessage | null {
const SAFE_SESSION_ID_PATTERN = /^[a-zA-Z0-9_.\-:]+$/;
/**
* Maps the app-facing session id to the provider-native id used by CLIs.
*
* Chat history and provider artifacts on disk are keyed by the provider id,
* while the shell UI sends the stable app id from the session gateway.
*/
function resolveResumeSessionId(
appSessionId: string,
provider: string,
message: ShellIncomingMessage,
dependencies: ShellWebSocketDependencies
): string | null {
try {
const sessionRow = sessionsDb.getSessionById(appSessionId);
const providerSessionId = sessionRow?.provider_session_id;
if (providerSessionId && SAFE_SESSION_ID_PATTERN.test(providerSessionId)) {
return providerSessionId;
}
): string {
const hasSession = readBoolean(message.hasSession);
const sessionId = readString(message.sessionId);
const provider = readString(message.provider, 'claude');
if (provider === 'gemini') {
const geminiSession = dependencies.getSessionById(appSessionId);
const cliSessionId = geminiSession?.cliSessionId;
if (cliSessionId && SAFE_SESSION_ID_PATTERN.test(cliSessionId)) {
return cliSessionId;
}
}
} catch (error) {
console.error(`Failed to resolve resume session id for ${provider}:`, error);
if (!hasSession || !sessionId) {
return '';
}
return null;
let resumeSessionId: string | null | undefined;
try {
resumeSessionId = dependencies.resolveProviderSessionId(sessionId, provider);
} catch (error) {
console.error('Failed to resolve provider session ID:', error);
resumeSessionId = undefined;
}
const resolvedSessionId = resumeSessionId === undefined ? sessionId : resumeSessionId;
if (!resolvedSessionId || !SAFE_SESSION_ID_PATTERN.test(resolvedSessionId)) {
return '';
}
return resolvedSessionId;
}
/**
@@ -119,9 +117,9 @@ function buildShellCommand(
dependencies: ShellWebSocketDependencies
): string {
const hasSession = readBoolean(message.hasSession);
const sessionId = readString(message.sessionId);
const initialCommand = readString(message.initialCommand);
const provider = readString(message.provider, 'claude');
const resumeSessionId = resolveResumeSessionId(message, dependencies);
const isPlainShell =
readBoolean(message.isPlainShell) ||
(!!initialCommand && !hasSession) ||
@@ -131,47 +129,44 @@ function buildShellCommand(
return initialCommand;
}
const resumeId =
hasSession && sessionId ? resolveResumeSessionId(sessionId, provider, dependencies) : null;
if (provider === 'cursor') {
if (resumeId) {
return `cursor-agent --resume="${resumeId}"`;
if (resumeSessionId) {
return `cursor-agent --resume="${resumeSessionId}"`;
}
return 'cursor-agent';
}
if (provider === 'codex') {
if (resumeId) {
if (resumeSessionId) {
if (os.platform() === 'win32') {
return `codex resume "${resumeId}"; if ($LASTEXITCODE -ne 0) { codex }`;
return `codex resume "${resumeSessionId}"; if ($LASTEXITCODE -ne 0) { codex }`;
}
return `codex resume "${resumeId}" || codex`;
return `codex resume "${resumeSessionId}" || codex`;
}
return 'codex';
}
if (provider === 'gemini') {
const command = initialCommand || 'gemini';
if (resumeId) {
return `${command} --resume "${resumeId}"`;
if (resumeSessionId) {
return `${command} --resume "${resumeSessionId}"`;
}
return command;
}
if (provider === 'opencode') {
if (resumeId) {
return `opencode --session "${resumeId}"`;
if (resumeSessionId) {
return `opencode --session "${resumeSessionId}"`;
}
return initialCommand || 'opencode';
}
const command = initialCommand || 'claude';
if (resumeId) {
if (resumeSessionId) {
if (os.platform() === 'win32') {
return `claude --resume "${resumeId}"; if ($LASTEXITCODE -ne 0) { claude }`;
return `claude --resume "${resumeSessionId}"; if ($LASTEXITCODE -ne 0) { claude }`;
}
return `claude --resume "${resumeId}" || claude`;
return `claude --resume "${resumeSessionId}" || claude`;
}
return command;
}
@@ -276,12 +271,14 @@ export function handleShellConnection(
return;
}
if (sessionId && !SAFE_SESSION_ID_PATTERN.test(sessionId)) {
const safeSessionIdPattern = /^[a-zA-Z0-9_.\-:]+$/;
if (sessionId && !safeSessionIdPattern.test(sessionId)) {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid session ID' }));
return;
}
const shellCommand = buildShellCommand(data, dependencies);
const resumeSessionId = resolveResumeSessionId(data, dependencies);
const shell = os.platform() === 'win32' ? 'powershell.exe' : 'bash';
const shellArgs =
os.platform() === 'win32' ? ['-Command', shellCommand] : ['-c', shellCommand];
@@ -427,8 +424,8 @@ export function handleShellConnection(
: provider === 'opencode'
? 'OpenCode'
: 'Claude';
welcomeMsg = hasSession
? `\x1b[36mResuming ${providerName} session ${sessionId} in: ${projectPath}\x1b[0m\r\n`
welcomeMsg = hasSession && resumeSessionId
? `\x1b[36mResuming ${providerName} session ${resumeSessionId} in: ${projectPath}\x1b[0m\r\n`
: `\x1b[36mStarting new ${providerName} session in: ${projectPath}\x1b[0m\r\n`;
}

View File

@@ -325,6 +325,7 @@ function ChatInterface({
onWheel={handleScroll}
onTouchMove={handleScroll}
isLoadingSessionMessages={isLoadingSessionMessages}
isProcessing={isProcessing}
chatMessages={chatMessages}
selectedSession={selectedSession}
currentSessionId={currentSessionId}

View File

@@ -19,6 +19,8 @@ interface ChatMessagesPaneProps {
onWheel: () => void;
onTouchMove: () => void;
isLoadingSessionMessages: boolean;
/** True while the viewed session has an active provider run in flight. */
isProcessing?: boolean;
chatMessages: ChatMessage[];
selectedSession: ProjectSession | null;
currentSessionId: string | null;
@@ -68,6 +70,7 @@ export default function ChatMessagesPane({
onWheel,
onTouchMove,
isLoadingSessionMessages,
isProcessing = false,
chatMessages,
selectedSession,
currentSessionId,
@@ -147,7 +150,7 @@ export default function ChatMessagesPane({
onTouchMove={onTouchMove}
className="relative flex-1 space-y-3 overflow-y-auto overflow-x-hidden px-0 py-3 sm:space-y-4 sm:p-4"
>
{isLoadingSessionMessages && chatMessages.length === 0 ? (
{(isLoadingSessionMessages || isProcessing) && chatMessages.length === 0 ? (
<div className="mt-8 text-center text-gray-500 dark:text-gray-400">
<div className="flex items-center justify-center space-x-2">
<div className="h-4 w-4 animate-spin rounded-full border-b-2 border-gray-400" />

View File

@@ -166,6 +166,108 @@ function hasServerEchoForLocalUser(
});
}
function compareMessagesChronologically(a: NormalizedMessage, b: NormalizedMessage): number {
const timeA = readMessageTime(a) ?? 0;
const timeB = readMessageTime(b) ?? 0;
if (timeA !== timeB) {
return timeA - timeB;
}
return 0;
}
/**
* Count how many user turns precede `message` in a chronologically merged view
* of server + realtime rows. Used to match a realtime row to the correct turn
* on disk when several turns share identical assistant text.
*/
function getUserTurnOrdinalBefore(
message: NormalizedMessage,
serverMessages: NormalizedMessage[],
realtimeMessages: NormalizedMessage[],
): number {
const messageTime = readMessageTime(message);
let userCount = 0;
for (const candidate of [...serverMessages, ...realtimeMessages].sort(compareMessagesChronologically)) {
if (candidate.id === message.id) {
break;
}
const candidateTime = readMessageTime(candidate);
if (
messageTime !== null
&& candidateTime !== null
&& candidateTime > messageTime
) {
break;
}
if (candidate.kind === 'text' && candidate.role === 'user') {
userCount++;
}
}
return Math.max(0, userCount - 1);
}
function findServerTurnRangeByOrdinal(
serverMessages: NormalizedMessage[],
turnOrdinal: number,
): { start: number; end: number } | null {
let userCount = -1;
let start = -1;
for (let index = 0; index < serverMessages.length; index++) {
const message = serverMessages[index];
if (message.kind === 'text' && message.role === 'user') {
userCount++;
if (userCount === turnOrdinal) {
start = index;
break;
}
}
}
if (start < 0) {
return null;
}
let end = serverMessages.length;
for (let index = start + 1; index < serverMessages.length; index++) {
if (serverMessages[index].kind === 'text' && serverMessages[index].role === 'user') {
end = index;
break;
}
}
return { start, end };
}
function isAssistantTextEchoedInSameTurnOnServer(
message: NormalizedMessage,
serverMessages: NormalizedMessage[],
realtimeMessages: NormalizedMessage[],
): boolean {
const assistantText = (message.content || '').trim();
if (!assistantText) {
return false;
}
const turnOrdinal = getUserTurnOrdinalBefore(message, serverMessages, realtimeMessages);
const turnRange = findServerTurnRangeByOrdinal(serverMessages, turnOrdinal);
if (!turnRange) {
return false;
}
return serverMessages
.slice(turnRange.start + 1, turnRange.end)
.some((serverMessage) =>
serverMessage.kind === 'text'
&& serverMessage.role === 'assistant'
&& (serverMessage.content || '').trim() === assistantText,
);
}
/**
* After `finalizeStreaming`, the client holds a synthetic assistant `text` row
* while the sessions API soon returns the same reply with a different id.
@@ -203,22 +305,92 @@ function dedupeAdjacentAssistantEchoes(merged: NormalizedMessage[]): NormalizedM
return out;
}
/**
* After a server refresh, drop only the realtime rows the persisted transcript
* already owns. Anything not yet on disk (common right after `complete`, while
* JSONL indexing lags) stays in `realtimeMessages` so the chat pane never
* flashes the empty "Continue your conversation" state.
*/
function pruneRealtimeSupersededByServer(
serverMessages: NormalizedMessage[],
realtimeMessages: NormalizedMessage[],
): NormalizedMessage[] {
if (realtimeMessages.length === 0) {
return realtimeMessages;
}
const serverIds = new Set(serverMessages.map((message) => message.id));
return realtimeMessages.filter((message) => {
if (serverIds.has(message.id)) {
return false;
}
if (message.id.startsWith('local_') && hasServerEchoForLocalUser(message, serverMessages)) {
return false;
}
if (message.kind === 'stream_delta' || message.id === `__streaming_${message.sessionId}`) {
if (isAssistantTextEchoedInSameTurnOnServer(message, serverMessages, realtimeMessages)) {
return false;
}
return true;
}
if (message.kind === 'text' && message.role === 'assistant') {
if (isAssistantTextEchoedInSameTurnOnServer(message, serverMessages, realtimeMessages)) {
return false;
}
return true;
}
if (message.kind === 'text' && message.role === 'user') {
return !hasServerEchoForLocalUser(message, serverMessages);
}
if (message.kind === 'tool_use' && message.toolId) {
if (serverMessages.some((serverMessage) => serverMessage.kind === 'tool_use' && serverMessage.toolId === message.toolId)) {
return false;
}
}
return true;
});
}
function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] {
if (realtime.length === 0) return server;
if (server.length === 0) return dedupeAdjacentAssistantEchoes(realtime);
const serverIds = new Set(server.map(m => m.id));
const extra = realtime.filter((m) => {
if (serverIds.has(m.id)) return false;
if (realtime.length === 0) {
return dedupeAdjacentAssistantEchoes(server);
}
if (server.length === 0) {
return dedupeAdjacentAssistantEchoes(realtime);
}
const serverIds = new Set(server.map((message) => message.id));
const extra = realtime.filter((message) => {
if (serverIds.has(message.id)) {
return false;
}
// Optimistic user rows use `local_*` ids; once the same text exists on the
// server-backed copy from the same send window, drop the realtime echo to
// avoid duplicate bubbles without hiding repeated prompts from history.
if (m.id.startsWith('local_')) {
if (hasServerEchoForLocalUser(m, server)) return false;
if (message.id.startsWith('local_')) {
if (hasServerEchoForLocalUser(message, server)) {
return false;
}
}
return true;
});
if (extra.length === 0) return server;
return dedupeAdjacentAssistantEchoes([...server, ...extra]);
if (extra.length === 0) {
return dedupeAdjacentAssistantEchoes(server);
}
// Interleave by timestamp so live rows stay with their turn instead of
// piling up at the bottom after every refresh.
return dedupeAdjacentAssistantEchoes(
[...server, ...extra].sort(compareMessagesChronologically),
);
}
/**
@@ -439,8 +611,13 @@ export function useSessionStore() {
slot.total = data.total ?? slot.serverMessages.length;
slot.hasMore = Boolean(data.hasMore);
slot.fetchedAt = Date.now();
// drop realtime messages that the server has caught up with to prevent unbounded growth.
slot.realtimeMessages = [];
// Only drop realtime rows the server transcript now owns. A blind clear
// here caused the chat pane to flash "Continue your conversation" after
// `complete` while JSONL / provider_session_id indexing was still behind.
slot.realtimeMessages = pruneRealtimeSupersededByServer(
slot.serverMessages,
slot.realtimeMessages,
);
recomputeMergedIfNeeded(slot);
notify(sessionId);
} catch (error) {