From 979cae71ab0169d558f191ebff33844a95498292 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 27 Feb 2026 17:09:42 -0800 Subject: [PATCH 1/7] fix(sse): fix memory leaks in SSE stream cleanup and add memory telemetry --- apps/sim/app/api/a2a/serve/[agentId]/route.ts | 29 +++++++++--- apps/sim/app/api/mcp/events/route.ts | 45 +++++++++++++----- .../app/api/workflows/[id]/execute/route.ts | 16 ++++++- apps/sim/instrumentation-node.ts | 3 ++ .../lib/copilot/orchestrator/stream-buffer.ts | 9 ++++ apps/sim/lib/execution/event-buffer.ts | 10 ++++ apps/sim/lib/execution/isolated-vm.ts | 8 +++- apps/sim/lib/monitoring/memory-telemetry.ts | 47 +++++++++++++++++++ apps/sim/lib/monitoring/sse-connections.ts | 27 +++++++++++ 9 files changed, 174 insertions(+), 20 deletions(-) create mode 100644 apps/sim/lib/monitoring/memory-telemetry.ts create mode 100644 apps/sim/lib/monitoring/sse-connections.ts diff --git a/apps/sim/app/api/a2a/serve/[agentId]/route.ts b/apps/sim/app/api/a2a/serve/[agentId]/route.ts index d10ef0eea2..97feb1c64c 100644 --- a/apps/sim/app/api/a2a/serve/[agentId]/route.ts +++ b/apps/sim/app/api/a2a/serve/[agentId]/route.ts @@ -19,6 +19,7 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import { @@ -1016,16 +1017,22 @@ async function handleTaskResubscribe( let pollTimeoutId: ReturnType | null = null const abortSignal = request.signal - abortSignal.addEventListener('abort', () => { - isCancelled = true - if (pollTimeoutId) { - clearTimeout(pollTimeoutId) - pollTimeoutId = null - } - }) + abortSignal.addEventListener( + 'abort', + () => { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + }, + { once: true } + ) + let sseDecremented = false const stream = new ReadableStream({ async start(controller) { + incrementSSEConnections('a2a-resubscribe') const sendEvent = (event: string, data: unknown): boolean => { if (isCancelled || abortSignal.aborted) return false try { @@ -1047,6 +1054,10 @@ async function handleTaskResubscribe( clearTimeout(pollTimeoutId) pollTimeoutId = null } + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('a2a-resubscribe') + } } if ( @@ -1165,6 +1176,10 @@ async function handleTaskResubscribe( clearTimeout(pollTimeoutId) pollTimeoutId = null } + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('a2a-resubscribe') + } }, }) diff --git a/apps/sim/app/api/mcp/events/route.ts b/apps/sim/app/api/mcp/events/route.ts index 6df91db5c0..7def26b345 100644 --- a/apps/sim/app/api/mcp/events/route.ts +++ b/apps/sim/app/api/mcp/events/route.ts @@ -14,6 +14,7 @@ import { getSession } from '@/lib/auth' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { mcpConnectionManager } from '@/lib/mcp/connection-manager' import { mcpPubSub } from '@/lib/mcp/pubsub' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' const logger = createLogger('McpEventsSSE') @@ -41,10 +42,24 @@ export async function GET(request: NextRequest) { const encoder = new TextEncoder() const unsubscribers: Array<() => void> = [] + let cleaned = false + + const cleanup = () => { + if (cleaned) return + cleaned = true + for (const unsub of unsubscribers) { + unsub() + } + decrementSSEConnections('mcp-events') + logger.info(`SSE connection closed for workspace ${workspaceId}`) + } const stream = new ReadableStream({ start(controller) { + incrementSSEConnections('mcp-events') + const send = (eventName: string, data: Record) => { + if (cleaned) return try { controller.enqueue( encoder.encode(`event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`) @@ -82,6 +97,10 @@ export async function GET(request: NextRequest) { // Heartbeat to keep the connection alive const heartbeat = setInterval(() => { + if (cleaned) { + clearInterval(heartbeat) + return + } try { controller.enqueue(encoder.encode(': heartbeat\n\n')) } catch { @@ -91,20 +110,24 @@ export async function GET(request: NextRequest) { unsubscribers.push(() => clearInterval(heartbeat)) // Cleanup when client disconnects - request.signal.addEventListener('abort', () => { - for (const unsub of unsubscribers) { - unsub() - } - try { - controller.close() - } catch { - // Already closed - } - logger.info(`SSE connection closed for workspace ${workspaceId}`) - }) + request.signal.addEventListener( + 'abort', + () => { + cleanup() + try { + controller.close() + } catch { + // Already closed + } + }, + { once: true } + ) logger.info(`SSE connection opened for workspace ${workspaceId}`) }, + cancel() { + cleanup() + }, }) return new Response(stream, { headers: SSE_HEADERS }) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index b393ae492a..9e005b0068 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -22,6 +22,7 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev import { processInputFileFields } from '@/lib/execution/files' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { cleanupExecutionBase64Cache, hydrateUserFilesWithBase64, @@ -763,6 +764,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const encoder = new TextEncoder() const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync) let isStreamClosed = false + let sseDecremented = false const eventWriter = createExecutionEventWriter(executionId) setExecutionMeta(executionId, { @@ -773,6 +775,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const stream = new ReadableStream({ async start(controller) { + incrementSSEConnections('workflow-execute') let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null const sendEvent = (event: ExecutionEvent) => { @@ -1147,6 +1150,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: if (executionId) { await cleanupExecutionBase64Cache(executionId) } + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('workflow-execute') + } if (!isStreamClosed) { try { controller.enqueue(encoder.encode('data: [DONE]\n\n')) @@ -1155,9 +1162,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } } }, - cancel() { + async cancel() { isStreamClosed = true logger.info(`[${requestId}] Client disconnected from SSE stream`) + timeoutController.abort() + timeoutController.cleanup() + await eventWriter.close().catch(() => {}) + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('workflow-execute') + } }, }) diff --git a/apps/sim/instrumentation-node.ts b/apps/sim/instrumentation-node.ts index c1d1f4bad8..4100a796c7 100644 --- a/apps/sim/instrumentation-node.ts +++ b/apps/sim/instrumentation-node.ts @@ -160,4 +160,7 @@ async function initializeOpenTelemetry() { export async function register() { await initializeOpenTelemetry() + + const { startMemoryTelemetry } = await import('./lib/monitoring/memory-telemetry') + startMemoryTelemetry() } diff --git a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts index bc0524c4af..81e1cbc350 100644 --- a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts +++ b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts @@ -237,6 +237,15 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter { error: error instanceof Error ? error.message : String(error), }) pending = batch.concat(pending) + if (pending.length > config.eventLimit) { + const dropped = pending.length - config.eventLimit + pending = pending.slice(-config.eventLimit) + logger.warn('Dropped oldest pending stream events due to sustained Redis failure', { + streamId, + dropped, + remaining: pending.length, + }) + } } } diff --git a/apps/sim/lib/execution/event-buffer.ts b/apps/sim/lib/execution/event-buffer.ts index 4473a922f4..81373ccf42 100644 --- a/apps/sim/lib/execution/event-buffer.ts +++ b/apps/sim/lib/execution/event-buffer.ts @@ -10,6 +10,7 @@ const EVENT_LIMIT = 1000 const RESERVE_BATCH = 100 const FLUSH_INTERVAL_MS = 15 const FLUSH_MAX_BATCH = 200 +const MAX_PENDING_EVENTS = 1000 function getEventsKey(executionId: string) { return `${REDIS_PREFIX}${executionId}:events` @@ -184,6 +185,15 @@ export function createExecutionEventWriter(executionId: string): ExecutionEventW stack: error instanceof Error ? error.stack : undefined, }) pending = batch.concat(pending) + if (pending.length > MAX_PENDING_EVENTS) { + const dropped = pending.length - MAX_PENDING_EVENTS + pending = pending.slice(-MAX_PENDING_EVENTS) + logger.warn('Dropped oldest pending events due to sustained Redis failure', { + executionId, + dropped, + remaining: pending.length, + }) + } } } diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 3da81142f5..9deffbe83c 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -716,9 +716,15 @@ function spawnWorker(): Promise { proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message)) + const MAX_STDERR_SIZE = 64 * 1024 let stderrData = '' proc.stderr?.on('data', (data: Buffer) => { - stderrData += data.toString() + if (stderrData.length < MAX_STDERR_SIZE) { + stderrData += data.toString() + if (stderrData.length > MAX_STDERR_SIZE) { + stderrData = stderrData.slice(0, MAX_STDERR_SIZE) + } + } }) const startTimeout = setTimeout(() => { diff --git a/apps/sim/lib/monitoring/memory-telemetry.ts b/apps/sim/lib/monitoring/memory-telemetry.ts new file mode 100644 index 0000000000..523a20b1eb --- /dev/null +++ b/apps/sim/lib/monitoring/memory-telemetry.ts @@ -0,0 +1,47 @@ +/** + * Periodic memory telemetry for diagnosing heap growth in production. + * Logs process.memoryUsage(), V8 heap stats, and active SSE connection + * counts every 60s, enabling correlation between connection leaks and + * memory spikes. + */ + +import v8 from 'node:v8' +import { createLogger } from '@sim/logger' +import { + getActiveSSEConnectionCount, + getActiveSSEConnectionsByRoute, +} from '@/lib/monitoring/sse-connections' + +const logger = createLogger('MemoryTelemetry') + +const MB = 1024 * 1024 + +let started = false + +export function startMemoryTelemetry(intervalMs = 60_000) { + if (started) return + started = true + + const timer = setInterval(() => { + const mem = process.memoryUsage() + const heap = v8.getHeapStatistics() + + logger.info('Memory snapshot', { + heapUsedMB: Math.round(mem.heapUsed / MB), + heapTotalMB: Math.round(mem.heapTotal / MB), + rssMB: Math.round(mem.rss / MB), + externalMB: Math.round(mem.external / MB), + arrayBuffersMB: Math.round(mem.arrayBuffers / MB), + heapSizeLimitMB: Math.round(heap.heap_size_limit / MB), + nativeContexts: heap.number_of_native_contexts, + activeHandles: + (process as unknown as Record unknown[]>)._getActiveHandles?.().length ?? -1, + activeRequests: + (process as unknown as Record unknown[]>)._getActiveRequests?.().length ?? -1, + uptimeMin: Math.round(process.uptime() / 60), + activeSSEConnections: getActiveSSEConnectionCount(), + sseByRoute: getActiveSSEConnectionsByRoute(), + }) + }, intervalMs) + timer.unref() +} diff --git a/apps/sim/lib/monitoring/sse-connections.ts b/apps/sim/lib/monitoring/sse-connections.ts new file mode 100644 index 0000000000..b6394ddff6 --- /dev/null +++ b/apps/sim/lib/monitoring/sse-connections.ts @@ -0,0 +1,27 @@ +/** + * Tracks active SSE connections by route for memory leak diagnostics. + * Logged alongside periodic memory telemetry to correlate connection + * counts with heap growth. + */ + +const connections = new Map() + +export function incrementSSEConnections(route: string) { + connections.set(route, (connections.get(route) ?? 0) + 1) +} + +export function decrementSSEConnections(route: string) { + const count = (connections.get(route) ?? 0) - 1 + if (count <= 0) connections.delete(route) + else connections.set(route, count) +} + +export function getActiveSSEConnectionCount(): number { + let total = 0 + for (const count of connections.values()) total += count + return total +} + +export function getActiveSSEConnectionsByRoute(): Record { + return Object.fromEntries(connections) +} From 47074656ec257da8b2ecef232a94e7cf6a97a07a Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 27 Feb 2026 17:28:56 -0800 Subject: [PATCH 2/7] improvement(monitoring): add SSE metering to wand, execution-stream, and a2a-message endpoints --- apps/sim/app/api/a2a/serve/[agentId]/route.ts | 17 ++++++++++++++++- apps/sim/app/api/wand/route.ts | 14 ++++++++++++++ .../executions/[executionId]/stream/route.ts | 12 ++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/api/a2a/serve/[agentId]/route.ts b/apps/sim/app/api/a2a/serve/[agentId]/route.ts index 97feb1c64c..f73e8a49bd 100644 --- a/apps/sim/app/api/a2a/serve/[agentId]/route.ts +++ b/apps/sim/app/api/a2a/serve/[agentId]/route.ts @@ -19,7 +19,10 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' import { markExecutionCancelled } from '@/lib/execution/cancellation' -import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' +import { + decrementSSEConnections, + incrementSSEConnections, +} from '@/lib/monitoring/sse-connections' import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import { @@ -631,9 +634,11 @@ async function handleMessageStream( } const encoder = new TextEncoder() + let messageStreamDecremented = false const stream = new ReadableStream({ async start(controller) { + incrementSSEConnections('a2a-message') const sendEvent = (event: string, data: unknown) => { try { const jsonRpcResponse = { @@ -842,9 +847,19 @@ async function handleMessageStream( }) } finally { await releaseLock(lockKey, lockValue) + if (!messageStreamDecremented) { + messageStreamDecremented = true + decrementSSEConnections('a2a-message') + } controller.close() } }, + cancel() { + if (!messageStreamDecremented) { + messageStreamDecremented = true + decrementSSEConnections('a2a-message') + } + }, }) return new NextResponse(stream, { diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index f868364ae6..6f1e2e23d0 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -10,6 +10,7 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing' import { env } from '@/lib/core/config/env' import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { enrichTableSchema } from '@/lib/table/llm/wand' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils' @@ -330,10 +331,13 @@ export async function POST(req: NextRequest) { const encoder = new TextEncoder() const decoder = new TextDecoder() + let wandStreamClosed = false const readable = new ReadableStream({ async start(controller) { + incrementSSEConnections('wand') const reader = response.body?.getReader() if (!reader) { + decrementSSEConnections('wand') controller.close() return } @@ -478,6 +482,16 @@ export async function POST(req: NextRequest) { controller.close() } finally { reader.releaseLock() + if (!wandStreamClosed) { + wandStreamClosed = true + decrementSSEConnections('wand') + } + } + }, + cancel() { + if (!wandStreamClosed) { + wandStreamClosed = true + decrementSSEConnections('wand') } }, }) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts index 1f77ff391d..88e3c87447 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts @@ -7,6 +7,7 @@ import { getExecutionMeta, readExecutionEvents, } from '@/lib/execution/event-buffer' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { formatSSEEvent } from '@/lib/workflows/executor/execution-events' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' @@ -73,8 +74,10 @@ export async function GET( let closed = false + let sseDecremented = false const stream = new ReadableStream({ async start(controller) { + incrementSSEConnections('execution-stream-reconnect') let lastEventId = fromEventId const pollDeadline = Date.now() + MAX_POLL_DURATION_MS @@ -142,11 +145,20 @@ export async function GET( controller.close() } catch {} } + } finally { + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('execution-stream-reconnect') + } } }, cancel() { closed = true logger.info('Client disconnected from reconnection stream', { executionId }) + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('execution-stream-reconnect') + } }, }) From 1f0fa8a71e60b3666df5f1408030543d72d35fcf Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 27 Feb 2026 17:31:02 -0800 Subject: [PATCH 3/7] fix(workflow-execute): remove abort from cancel() to preserve run-on-leave behavior --- apps/sim/app/api/workflows/[id]/execute/route.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 9e005b0068..b2cb3c1f8c 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -1162,12 +1162,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } } }, - async cancel() { + cancel() { isStreamClosed = true logger.info(`[${requestId}] Client disconnected from SSE stream`) - timeoutController.abort() - timeoutController.cleanup() - await eventWriter.close().catch(() => {}) if (!sseDecremented) { sseDecremented = true decrementSSEConnections('workflow-execute') From b18ac99655b574c50f5bbd8e3410970235a19d39 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 27 Feb 2026 17:32:37 -0800 Subject: [PATCH 4/7] improvement(monitoring): use stable process.getActiveResourcesInfo() API --- apps/sim/lib/monitoring/memory-telemetry.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/sim/lib/monitoring/memory-telemetry.ts b/apps/sim/lib/monitoring/memory-telemetry.ts index 523a20b1eb..a730e0b976 100644 --- a/apps/sim/lib/monitoring/memory-telemetry.ts +++ b/apps/sim/lib/monitoring/memory-telemetry.ts @@ -34,10 +34,10 @@ export function startMemoryTelemetry(intervalMs = 60_000) { arrayBuffersMB: Math.round(mem.arrayBuffers / MB), heapSizeLimitMB: Math.round(heap.heap_size_limit / MB), nativeContexts: heap.number_of_native_contexts, - activeHandles: - (process as unknown as Record unknown[]>)._getActiveHandles?.().length ?? -1, - activeRequests: - (process as unknown as Record unknown[]>)._getActiveRequests?.().length ?? -1, + activeResources: + typeof process.getActiveResourcesInfo === 'function' + ? process.getActiveResourcesInfo().length + : -1, uptimeMin: Math.round(process.uptime() / 60), activeSSEConnections: getActiveSSEConnectionCount(), sseByRoute: getActiveSSEConnectionsByRoute(), From 6db5d21a9304b1f0d30f5ecaffe0eda0ac727712 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 27 Feb 2026 17:34:47 -0800 Subject: [PATCH 5/7] refactor(a2a): hoist resubscribe cleanup to eliminate duplication between start() and cancel() --- apps/sim/app/api/a2a/serve/[agentId]/route.ts | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/apps/sim/app/api/a2a/serve/[agentId]/route.ts b/apps/sim/app/api/a2a/serve/[agentId]/route.ts index f73e8a49bd..0ce03d90dc 100644 --- a/apps/sim/app/api/a2a/serve/[agentId]/route.ts +++ b/apps/sim/app/api/a2a/serve/[agentId]/route.ts @@ -1045,6 +1045,18 @@ async function handleTaskResubscribe( ) let sseDecremented = false + const cleanup = () => { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + if (!sseDecremented) { + sseDecremented = true + decrementSSEConnections('a2a-resubscribe') + } + } + const stream = new ReadableStream({ async start(controller) { incrementSSEConnections('a2a-resubscribe') @@ -1063,18 +1075,6 @@ async function handleTaskResubscribe( } } - const cleanup = () => { - isCancelled = true - if (pollTimeoutId) { - clearTimeout(pollTimeoutId) - pollTimeoutId = null - } - if (!sseDecremented) { - sseDecremented = true - decrementSSEConnections('a2a-resubscribe') - } - } - if ( !sendEvent('status', { kind: 'status', @@ -1186,15 +1186,7 @@ async function handleTaskResubscribe( poll() }, cancel() { - isCancelled = true - if (pollTimeoutId) { - clearTimeout(pollTimeoutId) - pollTimeoutId = null - } - if (!sseDecremented) { - sseDecremented = true - decrementSSEConnections('a2a-resubscribe') - } + cleanup() }, }) From 96fea6871637bc0f8c3e596c67953318c2356ea1 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 27 Feb 2026 17:35:24 -0800 Subject: [PATCH 6/7] style(a2a): format import line Co-Authored-By: Claude Opus 4.6 --- apps/sim/app/api/a2a/serve/[agentId]/route.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/sim/app/api/a2a/serve/[agentId]/route.ts b/apps/sim/app/api/a2a/serve/[agentId]/route.ts index 0ce03d90dc..f865e2c396 100644 --- a/apps/sim/app/api/a2a/serve/[agentId]/route.ts +++ b/apps/sim/app/api/a2a/serve/[agentId]/route.ts @@ -19,10 +19,7 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' import { markExecutionCancelled } from '@/lib/execution/cancellation' -import { - decrementSSEConnections, - incrementSSEConnections, -} from '@/lib/monitoring/sse-connections' +import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections' import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import { From a3a7e1724f0a3d4d78ac1589ce08e845fd5a0773 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Sat, 28 Feb 2026 10:13:15 -0800 Subject: [PATCH 7/7] fix(wand): set guard flag on early-return decrement for consistency --- apps/sim/app/api/wand/route.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 6f1e2e23d0..abebcc1894 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -337,6 +337,7 @@ export async function POST(req: NextRequest) { incrementSSEConnections('wand') const reader = response.body?.getReader() if (!reader) { + wandStreamClosed = true decrementSSEConnections('wand') controller.close() return