fix: add notifications for response completion for all providers

This commit is contained in:
Haileyesus
2026-03-13 18:41:32 +03:00
parent 7eb52e73ea
commit ae494ea383
7 changed files with 189 additions and 170 deletions

129
package-lock.json generated
View File

@@ -1828,9 +1828,6 @@
"cpu": [ "cpu": [
"arm" "arm"
], ],
"libc": [
"glibc"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -1847,9 +1844,6 @@
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
"libc": [
"glibc"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -1867,9 +1861,6 @@
"ppc64" "ppc64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -1887,9 +1878,6 @@
"s390x" "s390x"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -1906,9 +1894,6 @@
"cpu": [ "cpu": [
"x64" "x64"
], ],
"libc": [
"glibc"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -1925,9 +1910,6 @@
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
"libc": [
"musl"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -1944,9 +1926,6 @@
"cpu": [ "cpu": [
"x64" "x64"
], ],
"libc": [
"musl"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -1963,9 +1942,6 @@
"cpu": [ "cpu": [
"arm" "arm"
], ],
"libc": [
"glibc"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -1988,9 +1964,6 @@
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
"libc": [
"glibc"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -2014,9 +1987,6 @@
"ppc64" "ppc64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -2040,9 +2010,6 @@
"s390x" "s390x"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -2065,9 +2032,6 @@
"cpu": [ "cpu": [
"x64" "x64"
], ],
"libc": [
"glibc"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -2090,9 +2054,6 @@
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
"libc": [
"musl"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -2115,9 +2076,6 @@
"cpu": [ "cpu": [
"x64" "x64"
], ],
"libc": [
"musl"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3468,9 +3426,6 @@
"arm" "arm"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3485,9 +3440,6 @@
"arm" "arm"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3502,9 +3454,6 @@
"arm64" "arm64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3519,9 +3468,6 @@
"arm64" "arm64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3536,9 +3482,6 @@
"loong64" "loong64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3553,9 +3496,6 @@
"ppc64" "ppc64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3570,9 +3510,6 @@
"riscv64" "riscv64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3587,9 +3524,6 @@
"riscv64" "riscv64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3604,9 +3538,6 @@
"s390x" "s390x"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3621,9 +3552,6 @@
"x64" "x64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -3638,9 +3566,6 @@
"x64" "x64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -4384,9 +4309,6 @@
"arm64" "arm64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -4401,9 +4323,6 @@
"arm64" "arm64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -4418,9 +4337,6 @@
"ppc64" "ppc64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -4435,9 +4351,6 @@
"riscv64" "riscv64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -4452,9 +4365,6 @@
"riscv64" "riscv64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -4469,9 +4379,6 @@
"s390x" "s390x"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -4486,9 +4393,6 @@
"x64" "x64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -4503,9 +4407,6 @@
"x64" "x64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "MIT", "license": "MIT",
"optional": true, "optional": true,
"os": [ "os": [
@@ -14855,9 +14756,6 @@
"arm" "arm"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -14875,9 +14773,6 @@
"arm64" "arm64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -14895,9 +14790,6 @@
"x64" "x64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -14915,9 +14807,6 @@
"arm64" "arm64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -14935,9 +14824,6 @@
"x64" "x64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "LGPL-3.0-or-later", "license": "LGPL-3.0-or-later",
"optional": true, "optional": true,
"os": [ "os": [
@@ -14955,9 +14841,6 @@
"arm" "arm"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -14981,9 +14864,6 @@
"arm64" "arm64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -15007,9 +14887,6 @@
"x64" "x64"
], ],
"dev": true, "dev": true,
"libc": [
"glibc"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -15033,9 +14910,6 @@
"arm64" "arm64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [
@@ -15059,9 +14933,6 @@
"x64" "x64"
], ],
"dev": true, "dev": true,
"libc": [
"musl"
],
"license": "Apache-2.0", "license": "Apache-2.0",
"optional": true, "optional": true,
"os": [ "os": [

View File

@@ -18,7 +18,12 @@ import { promises as fs } from 'fs';
import path from 'path'; import path from 'path';
import os from 'os'; import os from 'os';
import { CLAUDE_MODELS } from '../shared/modelConstants.js'; import { CLAUDE_MODELS } from '../shared/modelConstants.js';
import { createNotificationEvent, notifyUserIfEnabled } from './services/notification-orchestrator.js'; import {
createNotificationEvent,
notifyRunFailed,
notifyRunStopped,
notifyUserIfEnabled
} from './services/notification-orchestrator.js';
const activeSessions = new Map(); const activeSessions = new Map();
const pendingToolApprovals = new Map(); const pendingToolApprovals = new Map();
@@ -509,22 +514,6 @@ async function queryClaudeSDK(command, options = {}, ws) {
})); }));
return {}; return {};
}] }]
}],
Stop: [{
matcher: '',
hooks: [async (input) => {
const stopReason = typeof input?.stop_reason === 'string' ? input.stop_reason : 'completed';
emitNotification(createNotificationEvent({
provider: 'claude',
sessionId: capturedSessionId || sessionId || null,
kind: 'stop',
code: 'run.stopped',
meta: { stopReason },
severity: 'info',
dedupeKey: `claude:hook:stop:${capturedSessionId || sessionId || 'none'}:${stopReason}`
}));
return {};
}]
}] }]
}; };
@@ -714,6 +703,12 @@ async function queryClaudeSDK(command, options = {}, ws) {
exitCode: 0, exitCode: 0,
isNewSession: !sessionId && !!command isNewSession: !sessionId && !!command
}); });
notifyRunStopped({
userId: ws?.userId || null,
provider: 'claude',
sessionId: capturedSessionId || sessionId || null,
stopReason: 'completed'
});
console.log('claude-complete event sent'); console.log('claude-complete event sent');
} catch (error) { } catch (error) {
@@ -733,15 +728,12 @@ async function queryClaudeSDK(command, options = {}, ws) {
error: error.message, error: error.message,
sessionId: capturedSessionId || sessionId || null sessionId: capturedSessionId || sessionId || null
}); });
emitNotification(createNotificationEvent({ notifyRunFailed({
userId: ws?.userId || null,
provider: 'claude', provider: 'claude',
sessionId: capturedSessionId || sessionId || null, sessionId: capturedSessionId || sessionId || null,
kind: 'error', error
code: 'run.failed', });
meta: { error: error.message },
severity: 'error',
dedupeKey: `claude:error:${capturedSessionId || sessionId || 'none'}:${error.message}`
}));
throw error; throw error;
} }

View File

@@ -1,5 +1,6 @@
import { spawn } from 'child_process'; import { spawn } from 'child_process';
import crossSpawn from 'cross-spawn'; import crossSpawn from 'cross-spawn';
import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js';
// Use cross-spawn on Windows for better command execution // Use cross-spawn on Windows for better command execution
const spawnFunction = process.platform === 'win32' ? crossSpawn : spawn; const spawnFunction = process.platform === 'win32' ? crossSpawn : spawn;
@@ -81,6 +82,33 @@ async function spawnCursor(command, options = {}, ws) {
const isTrustRetry = runReason === 'trust-retry'; const isTrustRetry = runReason === 'trust-retry';
let runSawWorkspaceTrustPrompt = false; let runSawWorkspaceTrustPrompt = false;
let stdoutLineBuffer = ''; let stdoutLineBuffer = '';
let terminalNotificationSent = false;
const notifyTerminalState = ({ code = null, error = null } = {}) => {
if (terminalNotificationSent) {
return;
}
terminalNotificationSent = true;
const finalSessionId = capturedSessionId || sessionId || processKey;
if (code === 0 && !error) {
notifyRunStopped({
userId: ws?.userId || null,
provider: 'cursor',
sessionId: finalSessionId,
stopReason: 'completed'
});
return;
}
notifyRunFailed({
userId: ws?.userId || null,
provider: 'cursor',
sessionId: finalSessionId,
error: error || `Cursor CLI exited with code ${code}`
});
};
if (isTrustRetry) { if (isTrustRetry) {
console.log('Retrying Cursor CLI with --trust after workspace trust prompt'); console.log('Retrying Cursor CLI with --trust after workspace trust prompt');
@@ -255,7 +283,8 @@ async function spawnCursor(command, options = {}, ws) {
ws.send({ ws.send({
type: 'cursor-error', type: 'cursor-error',
error: stderrText, error: stderrText,
sessionId: capturedSessionId || sessionId || null sessionId: capturedSessionId || sessionId || null,
provider: 'cursor'
}); });
}); });
@@ -287,12 +316,15 @@ async function spawnCursor(command, options = {}, ws) {
type: 'claude-complete', type: 'claude-complete',
sessionId: finalSessionId, sessionId: finalSessionId,
exitCode: code, exitCode: code,
provider: 'cursor',
isNewSession: !sessionId && !!command // Flag to indicate this was a new session isNewSession: !sessionId && !!command // Flag to indicate this was a new session
}); });
if (code === 0) { if (code === 0) {
notifyTerminalState({ code });
settleOnce(() => resolve()); settleOnce(() => resolve());
} else { } else {
notifyTerminalState({ code });
settleOnce(() => reject(new Error(`Cursor CLI exited with code ${code}`))); settleOnce(() => reject(new Error(`Cursor CLI exited with code ${code}`)));
} }
}); });
@@ -308,8 +340,10 @@ async function spawnCursor(command, options = {}, ws) {
ws.send({ ws.send({
type: 'cursor-error', type: 'cursor-error',
error: error.message, error: error.message,
sessionId: capturedSessionId || sessionId || null sessionId: capturedSessionId || sessionId || null,
provider: 'cursor'
}); });
notifyTerminalState({ error });
settleOnce(() => reject(error)); settleOnce(() => reject(error));
}); });

View File

@@ -9,6 +9,7 @@ import os from 'os';
import { getSessions, getSessionMessages } from './projects.js'; import { getSessions, getSessionMessages } from './projects.js';
import sessionManager from './sessionManager.js'; import sessionManager from './sessionManager.js';
import GeminiResponseHandler from './gemini-response-handler.js'; import GeminiResponseHandler from './gemini-response-handler.js';
import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js';
let activeGeminiProcesses = new Map(); // Track active processes by session ID let activeGeminiProcesses = new Map(); // Track active processes by session ID
@@ -172,6 +173,34 @@ async function spawnGemini(command, options = {}, ws) {
stdio: ['pipe', 'pipe', 'pipe'], stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env } // Inherit all environment variables env: { ...process.env } // Inherit all environment variables
}); });
let terminalNotificationSent = false;
let terminalFailureReason = null;
const notifyTerminalState = ({ code = null, error = null } = {}) => {
if (terminalNotificationSent) {
return;
}
terminalNotificationSent = true;
const finalSessionId = capturedSessionId || sessionId || processKey;
if (code === 0 && !error) {
notifyRunStopped({
userId: ws?.userId || null,
provider: 'gemini',
sessionId: finalSessionId,
stopReason: 'completed'
});
return;
}
notifyRunFailed({
userId: ws?.userId || null,
provider: 'gemini',
sessionId: finalSessionId,
error: error || terminalFailureReason || `Gemini CLI exited with code ${code}`
});
};
// Attach temp file info to process for cleanup later // Attach temp file info to process for cleanup later
geminiProcess.tempImagePaths = tempImagePaths; geminiProcess.tempImagePaths = tempImagePaths;
@@ -196,10 +225,12 @@ async function spawnGemini(command, options = {}, ws) {
if (timeout) clearTimeout(timeout); if (timeout) clearTimeout(timeout);
timeout = setTimeout(() => { timeout = setTimeout(() => {
const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : (capturedSessionId || sessionId || processKey); const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : (capturedSessionId || sessionId || processKey);
terminalFailureReason = `Gemini CLI timeout - no response received for ${timeoutMs / 1000} seconds`;
ws.send({ ws.send({
type: 'gemini-error', type: 'gemini-error',
sessionId: socketSessionId, sessionId: socketSessionId,
error: `Gemini CLI timeout - no response received for ${timeoutMs / 1000} seconds` error: terminalFailureReason,
provider: 'gemini'
}); });
try { try {
geminiProcess.kill('SIGTERM'); geminiProcess.kill('SIGTERM');
@@ -340,7 +371,8 @@ async function spawnGemini(command, options = {}, ws) {
ws.send({ ws.send({
type: 'gemini-error', type: 'gemini-error',
sessionId: socketSessionId, sessionId: socketSessionId,
error: errorMsg error: errorMsg,
provider: 'gemini'
}); });
}); });
@@ -367,6 +399,7 @@ async function spawnGemini(command, options = {}, ws) {
type: 'claude-complete', // Use claude-complete for compatibility with UI type: 'claude-complete', // Use claude-complete for compatibility with UI
sessionId: finalSessionId, sessionId: finalSessionId,
exitCode: code, exitCode: code,
provider: 'gemini',
isNewSession: !sessionId && !!command // Flag to indicate this was a new session isNewSession: !sessionId && !!command // Flag to indicate this was a new session
}); });
@@ -381,8 +414,13 @@ async function spawnGemini(command, options = {}, ws) {
} }
if (code === 0) { if (code === 0) {
notifyTerminalState({ code });
resolve(); resolve();
} else { } else {
notifyTerminalState({
code,
error: code === null ? 'Gemini CLI process was terminated or timed out' : null
});
reject(new Error(code === null ? 'Gemini CLI process was terminated or timed out' : `Gemini CLI exited with code ${code}`)); reject(new Error(code === null ? 'Gemini CLI process was terminated or timed out' : `Gemini CLI exited with code ${code}`));
} }
}); });
@@ -397,8 +435,10 @@ async function spawnGemini(command, options = {}, ws) {
ws.send({ ws.send({
type: 'gemini-error', type: 'gemini-error',
sessionId: errorSessionId, sessionId: errorSessionId,
error: error.message error: error.message,
provider: 'gemini'
}); });
notifyTerminalState({ error });
reject(error); reject(error);
}); });

View File

@@ -14,6 +14,7 @@
*/ */
import { Codex } from '@openai/codex-sdk'; import { Codex } from '@openai/codex-sdk';
import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js';
// Track active sessions // Track active sessions
const activeCodexSessions = new Map(); const activeCodexSessions = new Map();
@@ -203,6 +204,7 @@ export async function queryCodex(command, options = {}, ws) {
let codex; let codex;
let thread; let thread;
let currentSessionId = sessionId; let currentSessionId = sessionId;
let terminalFailure = null;
const abortController = new AbortController(); const abortController = new AbortController();
try { try {
@@ -268,6 +270,16 @@ export async function queryCodex(command, options = {}, ws) {
sessionId: currentSessionId sessionId: currentSessionId
}); });
if (event.type === 'turn.failed' && !terminalFailure) {
terminalFailure = event.error || new Error('Turn failed');
notifyRunFailed({
userId: ws?.userId || null,
provider: 'codex',
sessionId: currentSessionId,
error: terminalFailure
});
}
// Extract and send token usage if available (normalized to match Claude format) // Extract and send token usage if available (normalized to match Claude format)
if (event.type === 'turn.completed' && event.usage) { if (event.type === 'turn.completed' && event.usage) {
const totalTokens = (event.usage.input_tokens || 0) + (event.usage.output_tokens || 0); const totalTokens = (event.usage.input_tokens || 0) + (event.usage.output_tokens || 0);
@@ -283,11 +295,20 @@ export async function queryCodex(command, options = {}, ws) {
} }
// Send completion event // Send completion event
sendMessage(ws, { if (!terminalFailure) {
type: 'codex-complete', sendMessage(ws, {
sessionId: currentSessionId, type: 'codex-complete',
actualSessionId: thread.id sessionId: currentSessionId,
}); actualSessionId: thread.id,
provider: 'codex'
});
notifyRunStopped({
userId: ws?.userId || null,
provider: 'codex',
sessionId: currentSessionId,
stopReason: 'completed'
});
}
} catch (error) { } catch (error) {
const session = currentSessionId ? activeCodexSessions.get(currentSessionId) : null; const session = currentSessionId ? activeCodexSessions.get(currentSessionId) : null;
@@ -301,8 +322,17 @@ export async function queryCodex(command, options = {}, ws) {
sendMessage(ws, { sendMessage(ws, {
type: 'codex-error', type: 'codex-error',
error: error.message, error: error.message,
sessionId: currentSessionId sessionId: currentSessionId,
provider: 'codex'
}); });
if (!terminalFailure) {
notifyRunFailed({
userId: ws?.userId || null,
provider: 'codex',
sessionId: currentSessionId,
error
});
}
} }
} finally { } finally {

View File

@@ -450,9 +450,10 @@ async function cleanupProject(projectPath, sessionId = null) {
* SSE Stream Writer - Adapts SDK/CLI output to Server-Sent Events * SSE Stream Writer - Adapts SDK/CLI output to Server-Sent Events
*/ */
class SSEStreamWriter { class SSEStreamWriter {
constructor(res) { constructor(res, userId = null) {
this.res = res; this.res = res;
this.sessionId = null; this.sessionId = null;
this.userId = userId;
this.isSSEStreamWriter = true; // Marker for transport detection this.isSSEStreamWriter = true; // Marker for transport detection
} }
@@ -485,9 +486,10 @@ class SSEStreamWriter {
* Non-streaming response collector * Non-streaming response collector
*/ */
class ResponseCollector { class ResponseCollector {
constructor() { constructor(userId = null) {
this.messages = []; this.messages = [];
this.sessionId = null; this.sessionId = null;
this.userId = userId;
} }
send(data) { send(data) {
@@ -920,7 +922,7 @@ router.post('/', validateExternalApiKey, async (req, res) => {
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
writer = new SSEStreamWriter(res); writer = new SSEStreamWriter(res, req.user.id);
// Send initial status // Send initial status
writer.send({ writer.send({
@@ -930,7 +932,7 @@ router.post('/', validateExternalApiKey, async (req, res) => {
}); });
} else { } else {
// Non-streaming mode: collect messages // Non-streaming mode: collect messages
writer = new ResponseCollector(); writer = new ResponseCollector(req.user.id);
// Collect initial status message // Collect initial status message
writer.send({ writer.send({
@@ -1219,7 +1221,7 @@ router.post('/', validateExternalApiKey, async (req, res) => {
res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); res.setHeader('X-Accel-Buffering', 'no');
writer = new SSEStreamWriter(res); writer = new SSEStreamWriter(res, req.user.id);
} }
if (!res.writableEnded) { if (!res.writableEnded) {

View File

@@ -60,6 +60,22 @@ function createNotificationEvent({
}; };
} }
function normalizeErrorMessage(error) {
if (typeof error === 'string') {
return error;
}
if (error && typeof error.message === 'string') {
return error.message;
}
if (error == null) {
return 'Unknown error';
}
return String(error);
}
function buildPushBody(event) { function buildPushBody(event) {
const CODE_MAP = { const CODE_MAP = {
'permission.required': event.meta?.toolName 'permission.required': event.meta?.toolName
@@ -131,7 +147,41 @@ function notifyUserIfEnabled({ userId, event }) {
}); });
} }
function notifyRunStopped({ userId, provider, sessionId = null, stopReason = 'completed' }) {
notifyUserIfEnabled({
userId,
event: createNotificationEvent({
provider,
sessionId,
kind: 'stop',
code: 'run.stopped',
meta: { stopReason },
severity: 'info',
dedupeKey: `${provider}:run:stop:${sessionId || 'none'}:${stopReason}`
})
});
}
function notifyRunFailed({ userId, provider, sessionId = null, error }) {
const errorMessage = normalizeErrorMessage(error);
notifyUserIfEnabled({
userId,
event: createNotificationEvent({
provider,
sessionId,
kind: 'error',
code: 'run.failed',
meta: { error: errorMessage },
severity: 'error',
dedupeKey: `${provider}:run:error:${sessionId || 'none'}:${errorMessage}`
})
});
}
export { export {
createNotificationEvent, createNotificationEvent,
notifyUserIfEnabled notifyUserIfEnabled,
notifyRunStopped,
notifyRunFailed
}; };