refactor(websocket): move websocket logic to its own module

This commit is contained in:
Haileyesus
2026-04-25 15:39:22 +03:00
parent 3fd2353ffe
commit 2323a576a6
16 changed files with 1407 additions and 661 deletions

View File

@@ -0,0 +1,267 @@
# WebSocket Module
This module owns the server-side WebSocket gateway used by:
1. Chat streaming (`/ws`)
2. Interactive terminal sessions (`/shell`)
3. Plugin WebSocket passthrough (`/plugin-ws/:pluginName`)
It is intentionally structured as **small services** plus a **barrel export** in `index.ts`.
## Public API
`server/modules/websocket/index.ts` exports:
1. `createWebSocketServer(server, dependencies)`
Creates and wires the shared `ws` server.
2. `connectedClients` and `WS_OPEN_STATE`
Shared chat client registry and open-state constant used by other modules.
## Why Dependency Injection Is Used
The module receives runtime-specific functions from `server/index.js` instead of importing legacy runtime files directly.
Benefits:
1. Keeps module boundaries clean (`server/modules/*` architecture rule).
2. Makes each service easier to test in isolation.
3. Keeps WebSocket transport concerns separate from provider runtime concerns.
## Service Map
| File | Responsibility |
|---|---|
| `services/websocket-server.service.ts` | Creates `WebSocketServer`, binds `verifyClient`, routes connection by pathname |
| `services/websocket-auth.service.ts` | Authenticates upgrade requests and attaches `request.user` |
| `services/chat-websocket.service.ts` | Handles `/ws` chat protocol and provider command/session control messages |
| `services/shell-websocket.service.ts` | Handles `/shell` PTY lifecycle, reconnect buffering, auth URL detection |
| `services/plugin-websocket-proxy.service.ts` | Bridges client socket to plugin socket |
| `services/websocket-writer.service.ts` | Adapts raw WebSocket to writer interface (`send`, `setSessionId`, `getSessionId`) |
| `services/websocket-state.service.ts` | Holds shared chat client set and open-state constant |
## High-Level Architecture
```mermaid
flowchart LR
A[HTTP Server] --> B[createWebSocketServer]
B --> C[verifyWebSocketClient]
B --> D{Pathname}
D -->|/ws| E[handleChatConnection]
D -->|/shell| F[handleShellConnection]
D -->|/plugin-ws/:name| G[handlePluginWsProxy]
D -->|other| H[close()]
E --> I[connectedClients Set]
E --> J[WebSocketWriter]
F --> K[ptySessionsMap]
G --> L[Upstream Plugin ws://127.0.0.1:port/ws]
I --> M[projects.service broadcastProgress]
I --> N[sessions-watcher.service projects_updated]
```
## Connection Handshake + Routing
```mermaid
sequenceDiagram
participant Client
participant WSS as WebSocketServer
participant Auth as verifyWebSocketClient
participant Router as connection router
participant Chat as /ws handler
participant Shell as /shell handler
participant Proxy as /plugin-ws handler
Client->>WSS: Upgrade Request
WSS->>Auth: verifyClient(info)
alt Platform mode
Auth->>Auth: authenticateWebSocket(null)
Auth->>Auth: attach request.user
else OSS mode
Auth->>Auth: read token from ?token or Authorization
Auth->>Auth: authenticateWebSocket(token)
Auth->>Auth: attach request.user
end
alt Auth failed
Auth-->>WSS: false (reject handshake)
else Auth ok
Auth-->>WSS: true
WSS->>Router: on("connection", ws, request)
alt pathname == /ws
Router->>Chat: handleChatConnection(ws, request, deps.chat)
else pathname == /shell
Router->>Shell: handleShellConnection(ws, deps.shell)
else pathname startsWith /plugin-ws/
Router->>Proxy: handlePluginWsProxy(ws, pathname, getPluginPort)
else unknown
Router->>Router: ws.close()
end
end
```
## `/ws` Chat Flow
When a chat socket connects:
1. Add socket to `connectedClients`.
2. Build `WebSocketWriter` (captures `userId` from authenticated request).
3. Parse each incoming message with `parseIncomingJsonObject`.
4. Dispatch by `data.type`.
5. On close, remove socket from `connectedClients`.
### Chat Message Dispatch
```mermaid
flowchart TD
A[Incoming WS message] --> B[parseIncomingJsonObject]
B -->|invalid| C[send {type:error}]
B -->|ok| D{data.type}
D -->|claude-command| E[queryClaudeSDK]
D -->|cursor-command| F[spawnCursor]
D -->|codex-command| G[queryCodex]
D -->|gemini-command| H[spawnGemini]
D -->|cursor-resume| I[spawnCursor resume]
D -->|abort-session| J[abort by provider]
D -->|claude-permission-response| K[resolveToolApproval]
D -->|cursor-abort| L[abortCursorSession]
D -->|check-session-status| M[is*SessionActive + optional reconnectSessionWriter]
D -->|get-pending-permissions| N[getPendingApprovalsForSession]
D -->|get-active-sessions| O[getActive*Sessions]
```
### Chat Notes
1. `abort-session` returns a normalized `complete` message with `aborted: true`.
2. `check-session-status` returns `{ type: "session-status", isProcessing }`.
3. Claude status checks can reconnect output stream to the new socket via `reconnectSessionWriter`.
## `/shell` Terminal Flow
The shell handler manages persistent PTY sessions keyed by:
`<projectPath>_<sessionIdOrDefault>[_cmd_<hash>]`
This enables reconnect behavior and isolates command-specific plain-shell sessions.
### Shell Lifecycle
```mermaid
stateDiagram-v2
[*] --> WaitingInit
WaitingInit --> ValidateInit: message.type == init
ValidateInit --> ReconnectExisting: session key exists and not login reset
ValidateInit --> SpawnNewPTY: valid path + valid sessionId
ValidateInit --> EmitError: invalid payload/path/sessionId
ReconnectExisting --> Running: attach ws, replay buffer
SpawnNewPTY --> Running: pty.spawn + wire onData/onExit
Running --> Running: input -> pty.write
Running --> Running: resize -> pty.resize
Running --> Running: onData -> buffer + output + auth_url detection
Running --> Exited: onExit
Running --> Detached: ws close
Detached --> Running: reconnect before timeout
Detached --> Killed: timeout reached -> pty.kill
Exited --> [*]
Killed --> [*]
EmitError --> WaitingInit
```
### Shell Behaviors in Detail
1. `init`:
Reads `projectPath`, `sessionId`, `provider`, `hasSession`, `initialCommand`, `isPlainShell`.
2. Login reset:
For login-like commands, existing keyed PTY session is killed and recreated.
3. Validation:
Path must exist and be a directory; `sessionId` must match safe pattern.
4. Command build:
Provider-specific command construction with resume semantics.
5. PTY output buffering:
Stores up to 5000 chunks for replay on reconnect.
6. URL detection:
Strips ANSI, accumulates text buffer, extracts URLs, emits `auth_url` once per normalized URL, supports `autoOpen`.
7. Close behavior:
Socket disconnect does not instantly kill PTY; session is kept alive and terminated on timeout.
## `/plugin-ws/:pluginName` Proxy Flow
```mermaid
sequenceDiagram
participant Client
participant Proxy as handlePluginWsProxy
participant PM as getPluginPort
participant Upstream as Plugin WS
Client->>Proxy: Connect /plugin-ws/:name
Proxy->>Proxy: Validate pluginName regex
alt Invalid name
Proxy-->>Client: close(4400, "Invalid plugin name")
else Valid
Proxy->>PM: getPluginPort(name)
alt Plugin not running
Proxy-->>Client: close(4404, "Plugin not running")
else Port found
Proxy->>Upstream: new WebSocket(ws://127.0.0.1:port/ws)
Client-->>Upstream: relay messages bidirectionally
Upstream-->>Client: relay messages bidirectionally
Upstream-->>Client: close propagation
Client-->>Upstream: close propagation
Upstream-->>Client: close(4502, "Upstream error") on upstream error
end
end
```
## Shared Client Registry and Broadcasts
Only chat sockets (`/ws`) are tracked in `connectedClients`.
That shared set is consumed by:
1. `modules/projects/services/projects.service.ts`
Broadcasts `loading_progress` while project snapshots are being built.
2. `modules/providers/services/sessions-watcher.service.ts`
Broadcasts `projects_updated` when provider session artifacts change.
This design centralizes cross-module realtime fanout without requiring route-local references to WebSocket internals.
## Writer Adapter (`WebSocketWriter`)
`WebSocketWriter` normalizes chat transport behavior to match existing writer-style interfaces used elsewhere.
Methods:
1. `send(data)`
JSON-serializes and sends only if socket is open.
2. `setSessionId(sessionId)` / `getSessionId()`
Supports provider session bookkeeping and resume flows.
3. `updateWebSocket(newRawWs)`
Allows active session stream redirection on reconnect.
## Error Handling and Close Codes
Current explicit close codes in this module:
1. `4400`: Invalid plugin name
2. `4404`: Plugin not running
3. `4502`: Upstream plugin WebSocket error
Other errors:
1. Chat handler catches and emits `{ type: "error", error }`.
2. Shell handler catches and writes terminal-visible error output.
3. Unknown websocket paths are closed immediately.
## Extending This Module
To add a new websocket route:
1. Add a new handler service under `services/`.
2. Extend `WebSocketServerDependencies` in `websocket-server.service.ts` if needed.
3. Add a new pathname branch in the router.
4. Wire dependency injection from `server/index.js`.
5. Keep `index.ts` as barrel-only export surface.

View File

@@ -0,0 +1,2 @@
export { WS_OPEN_STATE, connectedClients } from './services/websocket-state.service.js';
export { createWebSocketServer } from './services/websocket-server.service.js';

View File

@@ -0,0 +1,271 @@
import type { WebSocket } from 'ws';
import { connectedClients } from '@/modules/websocket/services/websocket-state.service.js';
import { WebSocketWriter } from '@/modules/websocket/services/websocket-writer.service.js';
import type {
AnyRecord,
AuthenticatedWebSocketRequest,
LLMProvider,
} from '@/shared/types.js';
import { createNormalizedMessage, parseIncomingJsonObject } from '@/shared/utils.js';
type ChatIncomingMessage = AnyRecord & {
type?: string;
command?: string;
options?: AnyRecord;
provider?: string;
sessionId?: string;
requestId?: string;
allow?: unknown;
updatedInput?: unknown;
message?: unknown;
rememberEntry?: unknown;
};
const DEFAULT_PROVIDER: LLMProvider = 'claude';
type ChatWebSocketDependencies = {
queryClaudeSDK: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
spawnCursor: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
queryCodex: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
spawnGemini: (command: string, options: unknown, writer: WebSocketWriter) => Promise<unknown>;
abortClaudeSDKSession: (sessionId: string) => Promise<boolean>;
abortCursorSession: (sessionId: string) => boolean;
abortCodexSession: (sessionId: string) => boolean;
abortGeminiSession: (sessionId: string) => boolean;
resolveToolApproval: (
requestId: string,
payload: {
allow: boolean;
updatedInput?: unknown;
message?: string;
rememberEntry?: unknown;
}
) => void;
isClaudeSDKSessionActive: (sessionId: string) => boolean;
isCursorSessionActive: (sessionId: string) => boolean;
isCodexSessionActive: (sessionId: string) => boolean;
isGeminiSessionActive: (sessionId: string) => boolean;
reconnectSessionWriter: (sessionId: string, ws: WebSocket) => boolean;
getPendingApprovalsForSession: (sessionId: string) => unknown[];
getActiveClaudeSDKSessions: () => unknown;
getActiveCursorSessions: () => unknown;
getActiveCodexSessions: () => unknown;
getActiveGeminiSessions: () => unknown;
};
/**
* Normalizes potentially invalid provider names coming from websocket payloads.
*/
function readProvider(value: unknown): LLMProvider {
if (value === 'claude' || value === 'cursor' || value === 'codex' || value === 'gemini') {
return value;
}
return DEFAULT_PROVIDER;
}
/**
* Extracts the authenticated request user id in the formats currently produced
* by platform and OSS auth code paths.
*/
function readRequestUserId(
request: AuthenticatedWebSocketRequest | undefined
): string | number | null {
const user = request?.user;
if (!user) {
return null;
}
if (typeof user.id === 'string' || typeof user.id === 'number') {
return user.id;
}
if (typeof user.userId === 'string' || typeof user.userId === 'number') {
return user.userId;
}
return null;
}
/**
* Handles authenticated chat websocket messages used by the main chat panel.
*/
export function handleChatConnection(
ws: WebSocket,
request: AuthenticatedWebSocketRequest,
dependencies: ChatWebSocketDependencies
): void {
console.log('[INFO] Chat WebSocket connected');
connectedClients.add(ws);
const writer = new WebSocketWriter(ws, readRequestUserId(request));
ws.on('message', async (rawMessage) => {
try {
const parsed = parseIncomingJsonObject(rawMessage);
if (!parsed) {
throw new Error('Invalid websocket payload');
}
const data = parsed as ChatIncomingMessage;
const messageType = data.type;
if (!messageType) {
throw new Error('Message type is required');
}
if (messageType === 'claude-command') {
await dependencies.queryClaudeSDK(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'cursor-command') {
await dependencies.spawnCursor(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'codex-command') {
await dependencies.queryCodex(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'gemini-command') {
await dependencies.spawnGemini(data.command ?? '', data.options, writer);
return;
}
if (messageType === 'cursor-resume') {
await dependencies.spawnCursor(
'',
{
sessionId: data.sessionId,
resume: true,
cwd: data.options?.cwd,
},
writer
);
return;
}
if (messageType === 'abort-session') {
const provider = readProvider(data.provider);
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : '';
let success = false;
if (provider === 'cursor') {
success = dependencies.abortCursorSession(sessionId);
} else if (provider === 'codex') {
success = dependencies.abortCodexSession(sessionId);
} else if (provider === 'gemini') {
success = dependencies.abortGeminiSession(sessionId);
} else {
success = await dependencies.abortClaudeSDKSession(sessionId);
}
writer.send(
createNormalizedMessage({
kind: 'complete',
exitCode: success ? 0 : 1,
aborted: true,
success,
sessionId,
provider,
})
);
return;
}
if (messageType === 'claude-permission-response') {
if (typeof data.requestId === 'string' && data.requestId.length > 0) {
dependencies.resolveToolApproval(data.requestId, {
allow: Boolean(data.allow),
updatedInput: data.updatedInput,
message: typeof data.message === 'string' ? data.message : undefined,
rememberEntry: data.rememberEntry,
});
}
return;
}
if (messageType === 'cursor-abort') {
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : '';
const success = dependencies.abortCursorSession(sessionId);
writer.send(
createNormalizedMessage({
kind: 'complete',
exitCode: success ? 0 : 1,
aborted: true,
success,
sessionId,
provider: 'cursor',
})
);
return;
}
if (messageType === 'check-session-status') {
const provider = readProvider(data.provider);
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : '';
let isActive = false;
if (provider === 'cursor') {
isActive = dependencies.isCursorSessionActive(sessionId);
} else if (provider === 'codex') {
isActive = dependencies.isCodexSessionActive(sessionId);
} else if (provider === 'gemini') {
isActive = dependencies.isGeminiSessionActive(sessionId);
} else {
isActive = dependencies.isClaudeSDKSessionActive(sessionId);
if (isActive) {
dependencies.reconnectSessionWriter(sessionId, ws);
}
}
writer.send({
type: 'session-status',
sessionId,
provider,
isProcessing: isActive,
});
return;
}
if (messageType === 'get-pending-permissions') {
const sessionId = typeof data.sessionId === 'string' ? data.sessionId : '';
if (sessionId && dependencies.isClaudeSDKSessionActive(sessionId)) {
const pending = dependencies.getPendingApprovalsForSession(sessionId);
writer.send({
type: 'pending-permissions-response',
sessionId,
data: pending,
});
}
return;
}
if (messageType === 'get-active-sessions') {
writer.send({
type: 'active-sessions',
sessions: {
claude: dependencies.getActiveClaudeSDKSessions(),
cursor: dependencies.getActiveCursorSessions(),
codex: dependencies.getActiveCodexSessions(),
gemini: dependencies.getActiveGeminiSessions(),
},
});
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error('[ERROR] Chat WebSocket error:', message);
writer.send({
type: 'error',
error: message,
});
}
});
ws.on('close', () => {
console.log('[INFO] Chat client disconnected');
connectedClients.delete(ws);
});
}

View File

@@ -0,0 +1,65 @@
import { WebSocket } from 'ws';
/**
* Proxies an authenticated client websocket to a plugin websocket endpoint.
*/
export function handlePluginWsProxy(
clientWs: WebSocket,
pathname: string,
getPluginPort: (pluginName: string) => number | null
): void {
const pluginName = pathname.replace('/plugin-ws/', '');
if (!pluginName || /[^a-zA-Z0-9_-]/.test(pluginName)) {
clientWs.close(4400, 'Invalid plugin name');
return;
}
const port = getPluginPort(pluginName);
if (!port) {
clientWs.close(4404, 'Plugin not running');
return;
}
const upstream = new WebSocket(`ws://127.0.0.1:${port}/ws`);
upstream.on('open', () => {
console.log(`[Plugins] WS proxy connected to "${pluginName}" on port ${port}`);
});
upstream.on('message', (data) => {
if (clientWs.readyState === WebSocket.OPEN) {
clientWs.send(data);
}
});
clientWs.on('message', (data) => {
if (upstream.readyState === WebSocket.OPEN) {
upstream.send(data);
}
});
upstream.on('close', () => {
if (clientWs.readyState === WebSocket.OPEN) {
clientWs.close();
}
});
clientWs.on('close', () => {
if (upstream.readyState === WebSocket.OPEN) {
upstream.close();
}
});
upstream.on('error', (error) => {
console.error(`[Plugins] WS proxy error for "${pluginName}":`, error.message);
if (clientWs.readyState === WebSocket.OPEN) {
clientWs.close(4502, 'Upstream error');
}
});
clientWs.on('error', () => {
if (upstream.readyState === WebSocket.OPEN) {
upstream.close();
}
});
}

View File

@@ -0,0 +1,453 @@
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import pty, { type IPty } from 'node-pty';
import { WebSocket, type RawData } from 'ws';
import { parseIncomingJsonObject } from '@/shared/utils.js';
type ShellIncomingMessage = {
type?: string;
data?: string;
cols?: number;
rows?: number;
projectPath?: string;
sessionId?: string;
hasSession?: boolean;
provider?: string;
initialCommand?: string;
isPlainShell?: boolean;
};
type PtySessionEntry = {
pty: IPty;
ws: WebSocket | null;
buffer: string[];
timeoutId: NodeJS.Timeout | null;
projectPath: string;
sessionId: string | null;
};
const ptySessionsMap = new Map<string, PtySessionEntry>();
const PTY_SESSION_TIMEOUT = 30 * 60 * 1000;
const SHELL_URL_PARSE_BUFFER_LIMIT = 32768;
type ShellWebSocketDependencies = {
getSessionById: (sessionId: string) => { cliSessionId?: string } | null | undefined;
stripAnsiSequences: (content: string) => string;
normalizeDetectedUrl: (url: string) => string | null;
extractUrlsFromText: (content: string) => string[];
shouldAutoOpenUrlFromOutput: (content: string) => boolean;
};
/**
* Reads a string field from untyped payloads and falls back when absent.
*/
function readString(value: unknown, fallback = ''): string {
return typeof value === 'string' ? value : fallback;
}
/**
* Reads a boolean field from untyped payloads and falls back when absent.
*/
function readBoolean(value: unknown, fallback = false): boolean {
return typeof value === 'boolean' ? value : fallback;
}
/**
* Reads a finite number field from untyped payloads and falls back when absent.
*/
function readNumber(value: unknown, fallback: number): number {
return typeof value === 'number' && Number.isFinite(value) ? value : fallback;
}
/**
* Parses incoming websocket shell messages and keeps processing safe when
* malformed payloads are received.
*/
function parseShellMessage(rawMessage: RawData): ShellIncomingMessage | null {
const payload = parseIncomingJsonObject(rawMessage);
if (!payload) {
return null;
}
return payload as ShellIncomingMessage;
}
/**
* Resolves provider command line for plain shell and agent-backed shell modes.
*/
function buildShellCommand(
message: ShellIncomingMessage,
dependencies: ShellWebSocketDependencies
): string {
const hasSession = readBoolean(message.hasSession);
const sessionId = readString(message.sessionId);
const initialCommand = readString(message.initialCommand);
const provider = readString(message.provider, 'claude');
const safeSessionIdPattern = /^[a-zA-Z0-9_.\-:]+$/;
const isPlainShell =
readBoolean(message.isPlainShell) ||
(!!initialCommand && !hasSession) ||
provider === 'plain-shell';
if (isPlainShell) {
return initialCommand;
}
if (provider === 'cursor') {
if (hasSession && sessionId) {
return `cursor-agent --resume="${sessionId}"`;
}
return 'cursor-agent';
}
if (provider === 'codex') {
if (hasSession && sessionId) {
if (os.platform() === 'win32') {
return `codex resume "${sessionId}"; if ($LASTEXITCODE -ne 0) { codex }`;
}
return `codex resume "${sessionId}" || codex`;
}
return 'codex';
}
if (provider === 'gemini') {
const command = initialCommand || 'gemini';
let resumeId = sessionId;
if (hasSession && sessionId) {
try {
const existingSession = dependencies.getSessionById(sessionId);
if (existingSession && existingSession.cliSessionId) {
resumeId = existingSession.cliSessionId;
if (!safeSessionIdPattern.test(resumeId)) {
resumeId = '';
}
}
} catch (error) {
console.error('Failed to get Gemini CLI session ID:', error);
}
}
if (hasSession && resumeId) {
return `${command} --resume "${resumeId}"`;
}
return command;
}
const command = initialCommand || 'claude';
if (hasSession && sessionId) {
if (os.platform() === 'win32') {
return `claude --resume "${sessionId}"; if ($LASTEXITCODE -ne 0) { claude }`;
}
return `claude --resume "${sessionId}" || claude`;
}
return command;
}
/**
* Handles websocket connections used by the standalone shell terminal UI.
*/
export function handleShellConnection(
ws: WebSocket,
dependencies: ShellWebSocketDependencies
): void {
console.log('[INFO] Shell websocket connected');
let shellProcess: IPty | null = null;
let ptySessionKey: string | null = null;
let urlDetectionBuffer = '';
const announcedAuthUrls = new Set<string>();
ws.on('message', async (rawMessage) => {
try {
const data = parseShellMessage(rawMessage);
if (!data?.type) {
throw new Error('Invalid websocket payload');
}
if (data.type === 'init') {
const projectPath = readString(data.projectPath, process.cwd());
const sessionId = readString(data.sessionId) || null;
const hasSession = readBoolean(data.hasSession);
const provider = readString(data.provider, 'claude');
const initialCommand = readString(data.initialCommand);
const isPlainShell =
readBoolean(data.isPlainShell) ||
(!!initialCommand && !hasSession) ||
provider === 'plain-shell';
urlDetectionBuffer = '';
announcedAuthUrls.clear();
const isLoginCommand =
!!initialCommand &&
(initialCommand.includes('setup-token') ||
initialCommand.includes('cursor-agent login') ||
initialCommand.includes('auth login'));
const commandSuffix =
isPlainShell && initialCommand
? `_cmd_${Buffer.from(initialCommand).toString('base64').slice(0, 16)}`
: '';
ptySessionKey = `${projectPath}_${sessionId ?? 'default'}${commandSuffix}`;
if (isLoginCommand) {
const oldSession = ptySessionsMap.get(ptySessionKey);
if (oldSession) {
if (oldSession.timeoutId) {
clearTimeout(oldSession.timeoutId);
}
oldSession.pty.kill();
ptySessionsMap.delete(ptySessionKey);
}
}
const existingSession = isLoginCommand ? null : ptySessionsMap.get(ptySessionKey);
if (existingSession) {
shellProcess = existingSession.pty;
if (existingSession.timeoutId) {
clearTimeout(existingSession.timeoutId);
}
ws.send(
JSON.stringify({
type: 'output',
data: '\x1b[36m[Reconnected to existing session]\x1b[0m\r\n',
})
);
if (existingSession.buffer.length > 0) {
existingSession.buffer.forEach((bufferedData) => {
ws.send(
JSON.stringify({
type: 'output',
data: bufferedData,
})
);
});
}
existingSession.ws = ws;
return;
}
const resolvedProjectPath = path.resolve(projectPath);
try {
const stats = fs.statSync(resolvedProjectPath);
if (!stats.isDirectory()) {
throw new Error('Not a directory');
}
} catch {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid project path' }));
return;
}
const safeSessionIdPattern = /^[a-zA-Z0-9_.\-:]+$/;
if (sessionId && !safeSessionIdPattern.test(sessionId)) {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid session ID' }));
return;
}
const shellCommand = buildShellCommand(data, dependencies);
const shell = os.platform() === 'win32' ? 'powershell.exe' : 'bash';
const shellArgs =
os.platform() === 'win32' ? ['-Command', shellCommand] : ['-c', shellCommand];
const termCols = readNumber(data.cols, 80);
const termRows = readNumber(data.rows, 24);
shellProcess = pty.spawn(shell, shellArgs, {
name: 'xterm-256color',
cols: termCols,
rows: termRows,
cwd: resolvedProjectPath,
env: {
...process.env,
TERM: 'xterm-256color',
COLORTERM: 'truecolor',
FORCE_COLOR: '3',
},
});
ptySessionsMap.set(ptySessionKey, {
pty: shellProcess,
ws,
buffer: [],
timeoutId: null,
projectPath,
sessionId,
});
shellProcess.onData((chunk) => {
if (!ptySessionKey) {
return;
}
const session = ptySessionsMap.get(ptySessionKey);
if (!session) {
return;
}
if (session.buffer.length < 5000) {
session.buffer.push(chunk);
} else {
session.buffer.shift();
session.buffer.push(chunk);
}
if (session.ws && session.ws.readyState === WebSocket.OPEN) {
let outputData = chunk;
const cleanChunk = dependencies.stripAnsiSequences(chunk);
urlDetectionBuffer = `${urlDetectionBuffer}${cleanChunk}`.slice(-SHELL_URL_PARSE_BUFFER_LIMIT);
outputData = outputData.replace(
/OPEN_URL:\s*(https?:\/\/[^\s\x1b\x07]+)/g,
'[INFO] Opening in browser: $1'
);
const emitAuthUrl = (detectedUrl: string, autoOpen = false) => {
const normalizedUrl = dependencies.normalizeDetectedUrl(detectedUrl);
if (!normalizedUrl) {
return;
}
const isNewUrl = !announcedAuthUrls.has(normalizedUrl);
if (isNewUrl) {
announcedAuthUrls.add(normalizedUrl);
session.ws?.send(
JSON.stringify({
type: 'auth_url',
url: normalizedUrl,
autoOpen,
})
);
}
};
const normalizedDetectedUrls = dependencies.extractUrlsFromText(urlDetectionBuffer)
.map((url) => dependencies.normalizeDetectedUrl(url))
.filter((url): url is string => Boolean(url));
const dedupedDetectedUrls = Array.from(new Set(normalizedDetectedUrls)).filter(
(url, _, urls) =>
!urls.some((otherUrl) => otherUrl !== url && otherUrl.startsWith(url))
);
dedupedDetectedUrls.forEach((url) => emitAuthUrl(url, false));
if (
dependencies.shouldAutoOpenUrlFromOutput(cleanChunk) &&
dedupedDetectedUrls.length > 0
) {
const bestUrl = dedupedDetectedUrls.reduce((longest, current) =>
current.length > longest.length ? current : longest
);
emitAuthUrl(bestUrl, true);
}
session.ws.send(
JSON.stringify({
type: 'output',
data: outputData,
})
);
}
});
shellProcess.onExit((exitCode) => {
if (!ptySessionKey) {
return;
}
const session = ptySessionsMap.get(ptySessionKey);
if (session && session.ws && session.ws.readyState === WebSocket.OPEN) {
session.ws.send(
JSON.stringify({
type: 'output',
data: `\r\n\x1b[33mProcess exited with code ${exitCode.exitCode}${
exitCode.signal != null ? ` (${exitCode.signal})` : ''
}\x1b[0m\r\n`,
})
);
}
if (session?.timeoutId) {
clearTimeout(session.timeoutId);
}
ptySessionsMap.delete(ptySessionKey);
shellProcess = null;
});
let welcomeMsg = `\x1b[36mStarting terminal in: ${projectPath}\x1b[0m\r\n`;
if (!isPlainShell) {
const providerName =
provider === 'cursor'
? 'Cursor'
: provider === 'codex'
? 'Codex'
: provider === 'gemini'
? 'Gemini'
: 'Claude';
welcomeMsg = hasSession
? `\x1b[36mResuming ${providerName} session ${sessionId} in: ${projectPath}\x1b[0m\r\n`
: `\x1b[36mStarting new ${providerName} session in: ${projectPath}\x1b[0m\r\n`;
}
ws.send(
JSON.stringify({
type: 'output',
data: welcomeMsg,
})
);
return;
}
if (data.type === 'input') {
if (shellProcess) {
shellProcess.write(readString(data.data));
}
return;
}
if (data.type === 'resize') {
if (shellProcess) {
shellProcess.resize(readNumber(data.cols, 80), readNumber(data.rows, 24));
}
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error('[ERROR] Shell WebSocket error:', message);
if (ws.readyState === WebSocket.OPEN) {
ws.send(
JSON.stringify({
type: 'output',
data: `\r\n\x1b[31mError: ${message}\x1b[0m\r\n`,
})
);
}
}
});
ws.on('close', () => {
if (!ptySessionKey) {
return;
}
const session = ptySessionsMap.get(ptySessionKey);
if (!session) {
return;
}
session.ws = null;
session.timeoutId = setTimeout(() => {
session.pty.kill();
ptySessionsMap.delete(ptySessionKey as string);
}, PTY_SESSION_TIMEOUT);
});
ws.on('error', (error) => {
console.error('[ERROR] Shell WebSocket error:', error);
});
}

View File

@@ -0,0 +1,54 @@
import type { VerifyClientCallbackSync } from 'ws';
import type { AuthenticatedWebSocketRequest } from '@/shared/types.js';
type WebSocketAuthDependencies = {
isPlatform: boolean;
authenticateWebSocket: (token: string | null) => {
id?: string | number;
userId?: string | number;
username?: string;
[key: string]: unknown;
} | null;
};
/**
* Authenticates websocket upgrade requests before the `connection` handler runs.
*/
export function verifyWebSocketClient(
info: Parameters<VerifyClientCallbackSync<AuthenticatedWebSocketRequest>>[0],
dependencies: WebSocketAuthDependencies
): boolean {
const request = info.req as AuthenticatedWebSocketRequest;
console.log('WebSocket connection attempt to:', request.url);
// Platform mode: use the first DB user and skip token checks.
if (dependencies.isPlatform) {
const user = dependencies.authenticateWebSocket(null);
if (!user) {
console.log('[WARN] Platform mode: No user found in database');
return false;
}
request.user = user;
console.log('[OK] Platform mode WebSocket authenticated for user:', user.username);
return true;
}
// OSS mode: read JWT from query string first, then Authorization header.
const upgradeUrl = new URL(request.url ?? '/', 'http://localhost');
const token =
upgradeUrl.searchParams.get('token') ??
request.headers.authorization?.split(' ')[1] ??
null;
const user = dependencies.authenticateWebSocket(token);
if (!user) {
console.log('[WARN] WebSocket authentication failed');
return false;
}
request.user = user;
console.log('[OK] WebSocket authenticated for user:', user.username);
return true;
}

View File

@@ -0,0 +1,58 @@
import type { Server as HttpServer } from 'node:http';
import { WebSocketServer, type VerifyClientCallbackSync } from 'ws';
import { handleChatConnection } from '@/modules/websocket/services/chat-websocket.service.js';
import { verifyWebSocketClient } from '@/modules/websocket/services/websocket-auth.service.js';
import { handlePluginWsProxy } from '@/modules/websocket/services/plugin-websocket-proxy.service.js';
import { handleShellConnection } from '@/modules/websocket/services/shell-websocket.service.js';
import type { AuthenticatedWebSocketRequest } from '@/shared/types.js';
type WebSocketServerDependencies = {
verifyClient: Parameters<typeof verifyWebSocketClient>[1];
chat: Parameters<typeof handleChatConnection>[2];
shell: Parameters<typeof handleShellConnection>[1];
getPluginPort: Parameters<typeof handlePluginWsProxy>[2];
};
/**
* Creates and wires the server-wide websocket gateway used for chat, shell, and
* plugin proxy routes.
*/
export function createWebSocketServer(
server: HttpServer,
dependencies: WebSocketServerDependencies
): WebSocketServer {
const wss = new WebSocketServer({
server,
verifyClient: ((
info: Parameters<VerifyClientCallbackSync<AuthenticatedWebSocketRequest>>[0]
) => verifyWebSocketClient(info, dependencies.verifyClient)),
});
wss.on('connection', (ws, request) => {
const incomingRequest = request as AuthenticatedWebSocketRequest;
const url = incomingRequest.url ?? '/';
const pathname = new URL(url, 'http://localhost').pathname;
if (pathname === '/shell') {
handleShellConnection(ws, dependencies.shell);
return;
}
if (pathname === '/ws') {
handleChatConnection(ws, incomingRequest, dependencies.chat);
return;
}
if (pathname.startsWith('/plugin-ws/')) {
handlePluginWsProxy(ws, pathname, dependencies.getPluginPort);
return;
}
console.log('[WARN] Unknown WebSocket path:', pathname);
ws.close();
});
return wss;
}

View File

@@ -0,0 +1,16 @@
import type { RealtimeClientConnection } from '@/shared/types.js';
/**
* Numeric readyState for an open WebSocket connection.
*
* We keep this in module state so services that broadcast updates do not need
* to import `ws` directly just to compare open/closed state.
*/
export const WS_OPEN_STATE = 1;
/**
* Shared registry of active chat WebSocket connections.
*
* Project/session services publish realtime updates by iterating this set.
*/
export const connectedClients = new Set<RealtimeClientConnection>();

View File

@@ -0,0 +1,38 @@
import { WS_OPEN_STATE } from '@/modules/websocket/services/websocket-state.service.js';
import type { RealtimeClientConnection } from '@/shared/types.js';
/**
* Thin transport adapter that gives WebSocket connections the same interface as
* SSE writers used by API routes (`send`, `setSessionId`, `getSessionId`).
*/
export class WebSocketWriter {
ws: RealtimeClientConnection;
sessionId: string | null;
userId: string | number | null;
isWebSocketWriter: boolean;
constructor(ws: RealtimeClientConnection, userId: string | number | null = null) {
this.ws = ws;
this.sessionId = null;
this.userId = userId;
this.isWebSocketWriter = true;
}
send(data: unknown): void {
if (this.ws.readyState === WS_OPEN_STATE) {
this.ws.send(JSON.stringify(data));
}
}
updateWebSocket(newRawWs: RealtimeClientConnection): void {
this.ws = newRawWs;
}
setSessionId(sessionId: string): void {
this.sessionId = sessionId;
}
getSessionId(): string | null {
return this.sessionId;
}
}