From 47cc540d5cb97212b89f771f230b4e8fcd98d31e Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Mon, 16 Mar 2026 23:16:53 -0700 Subject: [PATCH 1/6] Fix --- apps/sim/app/api/mothership/chat/route.ts | 10 ++++- apps/sim/lib/copilot/chat-streaming.ts | 42 +++++++++++++++++++ .../sse/handlers/tool-execution.ts | 22 +++++++++- 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/api/mothership/chat/route.ts b/apps/sim/app/api/mothership/chat/route.ts index eee1f4c2e0c..18b66b5577c 100644 --- a/apps/sim/app/api/mothership/chat/route.ts +++ b/apps/sim/app/api/mothership/chat/route.ts @@ -7,7 +7,11 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle' import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload' -import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/chat-streaming' +import { + createSSEStream, + SSE_RESPONSE_HEADERS, + waitForPendingChatStream, +} from '@/lib/copilot/chat-streaming' import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types' import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents' import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers' @@ -244,6 +248,10 @@ export async function POST(req: NextRequest) { { selectedModel: '' } ) + if (actualChatId) { + await waitForPendingChatStream(actualChatId) + } + const stream = createSSEStream({ requestPayload, userId: authenticatedUserId, diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index ea266eb9338..05d642d0f4d 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -20,6 +20,40 @@ const logger = createLogger('CopilotChatStreaming') // reach them. Keyed by streamId, cleaned up when the stream completes. const activeStreams = new Map() +// Tracks in-flight streams by chatId so that a subsequent request for the +// same chat can wait until the previous stream (and its onComplete / Go-side +// persistence) has fully settled before forwarding to Go. +const pendingChatStreams = new Map; resolve: () => void }>() + +function registerPendingChatStream(chatId: string): void { + let resolve: () => void + const promise = new Promise((r) => { + resolve = r + }) + pendingChatStreams.set(chatId, { promise, resolve: resolve! }) +} + +function resolvePendingChatStream(chatId: string): void { + const entry = pendingChatStreams.get(chatId) + if (entry) { + entry.resolve() + pendingChatStreams.delete(chatId) + } +} + +/** + * Wait for any in-flight stream on `chatId` to finish before proceeding. + * Returns immediately if no stream is active. Gives up after `timeoutMs`. + */ +export async function waitForPendingChatStream( + chatId: string, + timeoutMs = 5_000 +): Promise { + const entry = pendingChatStreams.get(chatId) + if (!entry) return + await Promise.race([entry.promise, new Promise((r) => setTimeout(r, timeoutMs))]) +} + export function abortActiveStream(streamId: string): boolean { const controller = activeStreams.get(streamId) if (!controller) return false @@ -112,6 +146,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS const abortController = new AbortController() activeStreams.set(streamId, abortController) + if (chatId) { + registerPendingChatStream(chatId) + } + return new ReadableStream({ async start(controller) { const encoder = new TextEncoder() @@ -210,6 +248,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS }) } finally { activeStreams.delete(streamId) + if (chatId) { + resolvePendingChatStream(chatId) + } try { controller.close() } catch { @@ -219,6 +260,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS }, cancel() { clientDisconnected = true + abortController.abort() if (eventWriter) { eventWriter.flush().catch(() => {}) } diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts index 7fe32e635e9..44157f42a6a 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts @@ -610,6 +610,24 @@ export async function executeToolAndReport( } } +function abortAwareSleep(ms: number, abortSignal?: AbortSignal): Promise { + return new Promise((resolve) => { + if (abortSignal?.aborted) { + resolve() + return + } + const timer = setTimeout(resolve, ms) + abortSignal?.addEventListener( + 'abort', + () => { + clearTimeout(timer) + resolve() + }, + { once: true } + ) + }) +} + export async function waitForToolDecision( toolCallId: string, timeoutMs: number, @@ -624,7 +642,7 @@ export async function waitForToolDecision( if (decision?.status) { return decision } - await new Promise((resolve) => setTimeout(resolve, interval)) + await abortAwareSleep(interval, abortSignal) interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval) } return null @@ -663,7 +681,7 @@ export async function waitForToolCompletion( ) { return decision } - await new Promise((resolve) => setTimeout(resolve, interval)) + await abortAwareSleep(interval, abortSignal) interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval) } return null From fcf6e60bf8bdffcb1b5d4c50193b4b4c2ab41011 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Mon, 16 Mar 2026 23:24:46 -0700 Subject: [PATCH 2/6] Fix --- apps/sim/lib/copilot/chat-streaming.ts | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 05d642d0f4d..3282f8298a1 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -21,16 +21,19 @@ const logger = createLogger('CopilotChatStreaming') const activeStreams = new Map() // Tracks in-flight streams by chatId so that a subsequent request for the -// same chat can wait until the previous stream (and its onComplete / Go-side -// persistence) has fully settled before forwarding to Go. -const pendingChatStreams = new Map; resolve: () => void }>() +// same chat can force-abort the previous stream and wait for it to settle +// before forwarding to Go. +const pendingChatStreams = new Map< + string, + { promise: Promise; resolve: () => void; streamId: string } +>() -function registerPendingChatStream(chatId: string): void { +function registerPendingChatStream(chatId: string, streamId: string): void { let resolve: () => void const promise = new Promise((r) => { resolve = r }) - pendingChatStreams.set(chatId, { promise, resolve: resolve! }) + pendingChatStreams.set(chatId, { promise, resolve: resolve!, streamId }) } function resolvePendingChatStream(chatId: string): void { @@ -42,8 +45,9 @@ function resolvePendingChatStream(chatId: string): void { } /** - * Wait for any in-flight stream on `chatId` to finish before proceeding. - * Returns immediately if no stream is active. Gives up after `timeoutMs`. + * Abort any in-flight stream on `chatId` and wait for it to fully settle + * (including onComplete and Go-side persistence). Returns immediately if + * no stream is active. Gives up after `timeoutMs`. */ export async function waitForPendingChatStream( chatId: string, @@ -51,6 +55,11 @@ export async function waitForPendingChatStream( ): Promise { const entry = pendingChatStreams.get(chatId) if (!entry) return + + // Force-abort the previous stream so we don't passively wait for it to + // finish naturally (which could take tens of seconds for a subagent). + abortActiveStream(entry.streamId) + await Promise.race([entry.promise, new Promise((r) => setTimeout(r, timeoutMs))]) } @@ -147,7 +156,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS activeStreams.set(streamId, abortController) if (chatId) { - registerPendingChatStream(chatId) + registerPendingChatStream(chatId, streamId) } return new ReadableStream({ From 866fec6d314c54848618c6de04a74373440afc79 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Mon, 16 Mar 2026 23:42:58 -0700 Subject: [PATCH 3/6] Fix --- apps/sim/lib/copilot/chat-streaming.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 3282f8298a1..6c08a1c21a5 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -269,7 +269,6 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS }, cancel() { clientDisconnected = true - abortController.abort() if (eventWriter) { eventWriter.flush().catch(() => {}) } From 9efff136852ed8cd4c07cab77408293a7ab61fac Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Mon, 16 Mar 2026 23:56:19 -0700 Subject: [PATCH 4/6] Fix --- .../sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 2aef0e02c24..d118ecb6809 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -1088,6 +1088,13 @@ export function useChat( }) const stopGeneration = useCallback(async () => { + if (sendingRef.current && !chatIdRef.current) { + const start = Date.now() + while (!chatIdRef.current && sendingRef.current && Date.now() - start < 3000) { + await new Promise((r) => setTimeout(r, 50)) + } + } + if (sendingRef.current) { await persistPartialResponse() } From 4f94368237c3b2a36924aac0cac975a068c6b000 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 17 Mar 2026 00:04:50 -0700 Subject: [PATCH 5/6] Fix lint --- apps/sim/lib/copilot/chat-streaming.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 6c08a1c21a5..5f2e34ace48 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -49,10 +49,7 @@ function resolvePendingChatStream(chatId: string): void { * (including onComplete and Go-side persistence). Returns immediately if * no stream is active. Gives up after `timeoutMs`. */ -export async function waitForPendingChatStream( - chatId: string, - timeoutMs = 5_000 -): Promise { +export async function waitForPendingChatStream(chatId: string, timeoutMs = 5_000): Promise { const entry = pendingChatStreams.get(chatId) if (!entry) return From 5acc22988956057d2f4f543f3c6e31f371807327 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 17 Mar 2026 10:31:37 -0700 Subject: [PATCH 6/6] Fix --- .../workspace/[workspaceId]/home/hooks/use-chat.ts | 1 + apps/sim/lib/copilot/chat-streaming.ts | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index d118ecb6809..299c8f0f852 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -1093,6 +1093,7 @@ export function useChat( while (!chatIdRef.current && sendingRef.current && Date.now() - start < 3000) { await new Promise((r) => setTimeout(r, 50)) } + if (!chatIdRef.current) return } if (sendingRef.current) { diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index 5f2e34ace48..2f4e6d12516 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -29,16 +29,19 @@ const pendingChatStreams = new Map< >() function registerPendingChatStream(chatId: string, streamId: string): void { - let resolve: () => void + if (pendingChatStreams.has(chatId)) { + logger.warn(`registerPendingChatStream: overwriting existing entry for chatId ${chatId}`) + } + let resolve!: () => void const promise = new Promise((r) => { resolve = r }) - pendingChatStreams.set(chatId, { promise, resolve: resolve!, streamId }) + pendingChatStreams.set(chatId, { promise, resolve, streamId }) } -function resolvePendingChatStream(chatId: string): void { +function resolvePendingChatStream(chatId: string, streamId: string): void { const entry = pendingChatStreams.get(chatId) - if (entry) { + if (entry && entry.streamId === streamId) { entry.resolve() pendingChatStreams.delete(chatId) } @@ -255,7 +258,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } finally { activeStreams.delete(streamId) if (chatId) { - resolvePendingChatStream(chatId) + resolvePendingChatStream(chatId, streamId) } try { controller.close()