fix: stabilize cloud computer use mcp

This commit is contained in:
Simos Mikelatos
2026-06-19 20:47:53 +00:00
parent 077baee5f2
commit 1c05fe0905
7 changed files with 146 additions and 33 deletions

View File

@@ -245,8 +245,12 @@ function connect(url: string): void {
}
});
const scheduleReconnect = () => {
emitToParent({ type: 'disconnected', url });
const scheduleReconnect = (code?: number, reason?: Buffer) => {
const reasonText = reason?.toString() || '';
emitToParent({ type: 'disconnected', url, code, reason: reasonText });
if (code === 1008 && /computer use.*disabled/i.test(reasonText)) {
return;
}
setTimeout(open, reconnectMs);
reconnectMs = Math.min(reconnectMs * 2, RECONNECT_MAX_MS);
};

View File

@@ -470,19 +470,25 @@ async function handleMessage(message: JsonRpcRequest) {
throw new Error(`Unsupported method: ${message.method}`);
}
function writeMessage(message: Record<string, unknown>) {
type MessageFraming = 'content-length' | 'line';
function writeMessage(message: Record<string, unknown>, framing: MessageFraming) {
const payload = JSON.stringify(message);
if (framing === 'line') {
process.stdout.write(`${payload}\n`);
return;
}
process.stdout.write(`Content-Length: ${Buffer.byteLength(payload, 'utf8')}\r\n\r\n${payload}`);
}
function sendResult(id: string | number | null | undefined, result: unknown) {
function sendResult(id: string | number | null | undefined, result: unknown, framing: MessageFraming) {
if (id === undefined) {
return;
}
writeMessage({ jsonrpc: '2.0', id, result });
writeMessage({ jsonrpc: '2.0', id, result }, framing);
}
function sendError(id: string | number | null | undefined, error: unknown) {
function sendError(id: string | number | null | undefined, error: unknown, framing: MessageFraming) {
if (id === undefined) {
return;
}
@@ -493,28 +499,69 @@ function sendError(id: string | number | null | undefined, error: unknown) {
code: -32000,
message: error instanceof Error ? error.message : String(error),
},
});
}, framing);
}
let buffer = Buffer.alloc(0);
function handleRawMessage(rawMessage: string, framing: MessageFraming) {
void (async () => {
let request: JsonRpcRequest | null = null;
try {
request = JSON.parse(rawMessage) as JsonRpcRequest;
const result = await handleMessage(request);
sendResult(request.id, result, framing);
} catch (error) {
sendError(request?.id ?? null, error, framing);
}
})();
}
function findHeaderEnd(input: Buffer): { index: number; length: number } | null {
const crlf = input.indexOf('\r\n\r\n');
if (crlf !== -1) {
return { index: crlf, length: 4 };
}
const lf = input.indexOf('\n\n');
if (lf !== -1) {
return { index: lf, length: 2 };
}
return null;
}
process.stdin.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
while (true) {
const headerEnd = buffer.indexOf('\r\n\r\n');
if (headerEnd === -1) {
return;
const headerEnd = findHeaderEnd(buffer);
if (!headerEnd) {
if (/^Content-Length:/i.test(buffer.toString('utf8', 0, Math.min(buffer.length, 32)))) {
return;
}
const newline = buffer.indexOf('\n');
if (newline === -1) {
return;
}
const rawLine = buffer.slice(0, newline).toString('utf8').trim();
buffer = buffer.slice(newline + 1);
if (!rawLine) {
continue;
}
handleRawMessage(rawLine, 'line');
continue;
}
const header = buffer.slice(0, headerEnd).toString('utf8');
const header = buffer.slice(0, headerEnd.index).toString('utf8');
const lengthMatch = /Content-Length:\s*(\d+)/i.exec(header);
if (!lengthMatch) {
buffer = buffer.slice(headerEnd + 4);
buffer = buffer.slice(headerEnd.index + headerEnd.length);
continue;
}
const length = Number.parseInt(lengthMatch[1], 10);
const messageStart = headerEnd + 4;
const messageStart = headerEnd.index + headerEnd.length;
const messageEnd = messageStart + length;
if (buffer.length < messageEnd) {
return;
@@ -522,16 +569,6 @@ process.stdin.on('data', (chunk) => {
const rawMessage = buffer.slice(messageStart, messageEnd).toString('utf8');
buffer = buffer.slice(messageEnd);
void (async () => {
let request: JsonRpcRequest | null = null;
try {
request = JSON.parse(rawMessage) as JsonRpcRequest;
const result = await handleMessage(request);
sendResult(request.id, result);
} catch (error) {
sendError(request?.id ?? null, error);
}
})();
handleRawMessage(rawMessage, 'content-length');
}
});

View File

@@ -22,7 +22,7 @@ try {
}
});
} catch (e) {
console.log('No .env file found or error reading it:', e.message);
console.error('No .env file found or error reading it:', e.message);
}
// Keep the default database in a stable user-level location so rebuilding dist-server

View File

@@ -465,6 +465,7 @@ export const computerUseService = {
await this.registerAgentMcp();
} else {
await this.unregisterAgentMcp();
desktopAgentRelay.disconnectAll('Computer Use was disabled in this environment.');
stopSessions('settings:disabled', 'Computer Use was disabled in settings.');
}
return next;
@@ -909,6 +910,7 @@ export const computerUseService = {
// Drive cloud MCP exposure + session teardown off desktop-agent connectivity.
desktopAgentRelay.setHooks({
canAcceptConnection: () => getRuntime() === 'cloud' && readSettings().enabled,
onFirstConnect: () => computerUseService.onDesktopAgentConnected(),
onLastDisconnect: () => computerUseService.onDesktopAgentDisconnected(),
});

View File

@@ -18,6 +18,7 @@ type ConnectedAgent = {
};
type RelayLifecycleHooks = {
canAcceptConnection?: () => boolean;
onFirstConnect?: () => void | Promise<void>;
onLastDisconnect?: () => void | Promise<void>;
};
@@ -54,15 +55,25 @@ export const desktopAgentRelay = {
hooks = next;
},
register(ws: WebSocket, label = 'desktop-agent'): void {
register(ws: WebSocket, label = 'desktop-agent'): boolean {
if (hooks.canAcceptConnection && !hooks.canAcceptConnection()) {
console.log(`[DesktopAgent] Rejected (${label}); Computer Use is disabled.`);
try {
ws.close(1008, 'Computer Use is disabled in this environment.');
} catch {
// ignore close failures
}
return false;
}
const wasEmpty = pickAgent() === undefined;
agents.set(ws, { ws, label, registeredAt: new Date().toISOString() });
console.log(`[DesktopAgent] Registered (${label}); ${agents.size} connected.`);
ws.on('close', () => {
agents.delete(ws);
const wasRegistered = agents.delete(ws);
console.log(`[DesktopAgent] Disconnected (${label}); ${agents.size} remain.`);
if (pickAgent() === undefined) {
if (wasRegistered && pickAgent() === undefined) {
rejectAllPending('Desktop agent disconnected.');
void hooks.onLastDisconnect?.();
}
@@ -71,6 +82,24 @@ export const desktopAgentRelay = {
if (wasEmpty) {
void hooks.onFirstConnect?.();
}
return true;
},
disconnectAll(reason = 'Desktop agent disconnected.'): void {
const hadAgent = pickAgent() !== undefined;
const sockets = [...agents.keys()];
agents.clear();
for (const ws of sockets) {
try {
ws.close(1008, reason);
} catch {
// ignore close failures
}
}
rejectAllPending(reason);
if (hadAgent) {
void hooks.onLastDisconnect?.();
}
},
/** Resolves a pending relay call with the desktop agent's reply. */