diff --git a/bun.lock b/bun.lock index f85d99fbe..8704bd613 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "@coder/cmux", diff --git a/src/browser/components/AIView.tsx b/src/browser/components/AIView.tsx index 92d27fdb5..6470bf680 100644 --- a/src/browser/components/AIView.tsx +++ b/src/browser/components/AIView.tsx @@ -50,6 +50,9 @@ const AIViewInner: React.FC = ({ }) => { const chatAreaRef = useRef(null); + // Track whether user has interrupted once (for soft vs hard interrupt) + const [hasInterruptedOnce, setHasInterruptedOnce] = useState(false); + // Track active tab to conditionally enable resize functionality // RightSidebar notifies us of tab changes via onTabChange callback const [activeTab, setActiveTab] = useState("costs"); @@ -188,6 +191,16 @@ const AIViewInner: React.FC = ({ setEditingMessage(undefined); }, []); + const handleInterrupt = useCallback(async () => { + const soft = !hasInterruptedOnce; // First press = soft, second = hard + + if (soft) { + setHasInterruptedOnce(true); // Mark for next press + } + + await window.api.workspace.interruptStream(workspaceId, { soft }); + }, [workspaceId, hasInterruptedOnce]); + const handleMessageSent = useCallback(() => { // Enable auto-scroll when user sends a message setAutoScroll(true); @@ -246,6 +259,12 @@ const AIViewInner: React.FC = ({ // eslint-disable-next-line react-hooks/exhaustive-deps }, [workspaceId, workspaceState?.loading]); + // Reset interrupt flag when stream ends (allows fresh start for next stream) + useEffect(() => { + if (workspaceState && !workspaceState.canInterrupt) { + setHasInterruptedOnce(false); + } + }, [workspaceState, workspaceState?.canInterrupt]); // Compute showRetryBarrier once for both keybinds and UI // Track if last message was interrupted or errored (for RetryBarrier) // Uses same logic as useResumeManager for DRY @@ -258,7 +277,7 @@ const AIViewInner: React.FC = ({ useAIViewKeybinds({ workspaceId, currentModel: workspaceState?.currentModel ?? null, - canInterrupt: workspaceState?.canInterrupt ?? false, + canInterrupt: workspaceState.canInterrupt, showRetryBarrier, currentWorkspaceThinking, setThinkingLevel, @@ -269,6 +288,7 @@ const AIViewInner: React.FC = ({ aggregator, setEditingMessage, vimEnabled, + onInterrupt: handleInterrupt, }); // Clear editing state if the message being edited no longer exists @@ -307,7 +327,6 @@ const AIViewInner: React.FC = ({ ); } - // Extract state from workspace state const { messages, canInterrupt, isCompacting, loading, currentModel } = workspaceState; // Get active stream message ID for token counting @@ -320,6 +339,15 @@ const AIViewInner: React.FC = ({ // Merge consecutive identical stream errors const mergedMessages = mergeConsecutiveStreamErrors(messages); + const model = currentModel ? getModelName(currentModel) : ""; + // Determine if we're in "interrupting" state (second press, waiting for hard abort) + const interrupting = canInterrupt && hasInterruptedOnce; + + const prefix = interrupting ? "⏸️ Interrupting " : ""; + const action = interrupting ? "" : isCompacting ? "compacting..." : "streaming..."; + + const statusText = `${prefix}${model} ${action}`.trim(); + // When editing, find the cutoff point const editCutoffHistoryId = editingMessage ? mergedMessages.find( @@ -454,19 +482,12 @@ const AIViewInner: React.FC = ({ {canInterrupt && ( = ({ @@ -15,11 +16,13 @@ export const StreamingBarrier: React.FC = ({ cancelText, tokenCount, tps, + interrupting, }) => { + const color = interrupting ? "var(--color-interrupted)" : "var(--color-assistant-border)"; return (
- + {tokenCount !== undefined && ( ~{tokenCount.toLocaleString()} tokens diff --git a/src/browser/hooks/useAIViewKeybinds.ts b/src/browser/hooks/useAIViewKeybinds.ts index 4032379c3..049935d1c 100644 --- a/src/browser/hooks/useAIViewKeybinds.ts +++ b/src/browser/hooks/useAIViewKeybinds.ts @@ -24,6 +24,7 @@ interface UseAIViewKeybindsParams { aggregator: StreamingMessageAggregator; // For compaction detection setEditingMessage: (editing: { id: string; content: string } | undefined) => void; vimEnabled: boolean; // For vim-aware interrupt keybind + onInterrupt: () => Promise; // Callback to handle interrupt } /** @@ -52,6 +53,7 @@ export function useAIViewKeybinds({ aggregator, setEditingMessage, vimEnabled, + onInterrupt, }: UseAIViewKeybindsParams): void { useEffect(() => { const handleKeyDown = (e: KeyboardEvent) => { @@ -81,7 +83,7 @@ export function useAIViewKeybinds({ if (canInterrupt || showRetryBarrier) { e.preventDefault(); setAutoRetry(false); // User explicitly stopped - don't auto-retry - void window.api.workspace.interruptStream(workspaceId); + void onInterrupt(); return; } } @@ -95,7 +97,7 @@ export function useAIViewKeybinds({ // No flag set - handleCompactionAbort will perform compaction with [truncated] e.preventDefault(); setAutoRetry(false); - void window.api.workspace.interruptStream(workspaceId); + void onInterrupt(); } // Let browser handle Ctrl+A (select all) when not compacting return; @@ -175,5 +177,6 @@ export function useAIViewKeybinds({ aggregator, setEditingMessage, vimEnabled, + onInterrupt, ]); } diff --git a/src/browser/utils/commands/sources.ts b/src/browser/utils/commands/sources.ts index 2c3aa9348..396c3a591 100644 --- a/src/browser/utils/commands/sources.ts +++ b/src/browser/utils/commands/sources.ts @@ -333,7 +333,7 @@ export function buildCoreSources(p: BuildSourcesParams): Array<() => CommandActi title: "Interrupt Streaming", section: section.chat, run: async () => { - await window.api.workspace.interruptStream(id); + await window.api.workspace.interruptStream(id, { soft: false }); // hard interrupt }, }); list.push({ diff --git a/src/common/types/ipc.ts b/src/common/types/ipc.ts index 339da510d..ce7727986 100644 --- a/src/common/types/ipc.ts +++ b/src/common/types/ipc.ts @@ -300,7 +300,7 @@ export interface IPCApi { ): Promise>; interruptStream( workspaceId: string, - options?: { abandonPartial?: boolean } + options?: { soft?: boolean; abandonPartial?: boolean } ): Promise>; clearQueue(workspaceId: string): Promise>; truncateHistory(workspaceId: string, percentage?: number): Promise>; diff --git a/src/common/utils/compaction/handler.ts b/src/common/utils/compaction/handler.ts index adcc68bed..17fbd0a0e 100644 --- a/src/common/utils/compaction/handler.ts +++ b/src/common/utils/compaction/handler.ts @@ -101,7 +101,7 @@ export async function cancelCompaction( // Interrupt stream with abandonPartial flag // This tells backend to DELETE the partial instead of committing it // Result: history ends with the compaction-request user message (which is fine - just a user message) - await window.api.workspace.interruptStream(workspaceId, { abandonPartial: true }); + await window.api.workspace.interruptStream(workspaceId, { soft: false, abandonPartial: true }); // Enter edit mode on the compaction-request message with original command // This lets user immediately edit the message or delete it diff --git a/src/desktop/preload.ts b/src/desktop/preload.ts index b8a910bd5..56c18f915 100644 --- a/src/desktop/preload.ts +++ b/src/desktop/preload.ts @@ -72,7 +72,7 @@ const api: IPCApi = { ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_SEND_MESSAGE, workspaceId, message, options), resumeStream: (workspaceId, options) => ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_RESUME_STREAM, workspaceId, options), - interruptStream: (workspaceId: string, options?: { abandonPartial?: boolean }) => + interruptStream: (workspaceId, options) => ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM, workspaceId, options), clearQueue: (workspaceId: string) => ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_QUEUE_CLEAR, workspaceId), diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index f49f1d61c..db24835c4 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -337,14 +337,17 @@ export class AgentSession { return this.streamWithHistory(model, options); } - async interruptStream(): Promise> { + async interruptStream(options?: { + soft?: boolean; + abandonPartial?: boolean; + }): Promise> { this.assertNotDisposed("interruptStream"); if (!this.aiService.isStreaming(this.workspaceId)) { return Ok(undefined); } - const stopResult = await this.aiService.stopStream(this.workspaceId); + const stopResult = await this.aiService.stopStream(this.workspaceId, options); if (!stopResult.success) { return Err(stopResult.error); } diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index 56b0355f9..2f8b38abd 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -896,12 +896,15 @@ export class AIService extends EventEmitter { } } - async stopStream(workspaceId: string): Promise> { + async stopStream( + workspaceId: string, + options?: { soft?: boolean; abandonPartial?: boolean } + ): Promise> { if (this.mockModeEnabled && this.mockScenarioPlayer) { this.mockScenarioPlayer.stop(workspaceId); return Ok(undefined); } - return this.streamManager.stopStream(workspaceId); + return this.streamManager.stopStream(workspaceId, options); } /** diff --git a/src/node/services/ipcMain.ts b/src/node/services/ipcMain.ts index d76819023..b887fe0b8 100644 --- a/src/node/services/ipcMain.ts +++ b/src/node/services/ipcMain.ts @@ -975,11 +975,15 @@ export class IpcMain { ipcMain.handle( IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM, - async (_event, workspaceId: string, options?: { abandonPartial?: boolean }) => { + async ( + _event, + workspaceId: string, + options?: { soft?: boolean; abandonPartial?: boolean } + ) => { log.debug("interruptStream handler: Received", { workspaceId, options }); try { const session = this.getOrCreateSession(workspaceId); - const stopResult = await session.interruptStream(); + const stopResult = await session.interruptStream(options); if (!stopResult.success) { log.error("Failed to stop stream:", stopResult.error); return { success: false, error: stopResult.error }; diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 0faebea56..e20a6f349 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -107,6 +107,8 @@ interface WorkspaceStreamInfo { partialWritePromise?: Promise; // Track background processing promise for guaranteed cleanup processingPromise: Promise; + // Flag for soft-interrupt: when true, stream will end at next block boundary + softInterruptPending: boolean; // Temporary directory for tool outputs (auto-cleaned when stream ends) runtimeTempDir: string; // Runtime for temp directory cleanup @@ -412,30 +414,37 @@ export class StreamManager extends EventEmitter { ): Promise { try { streamInfo.state = StreamState.STOPPING; - // Flush any pending partial write immediately (preserves work on interruption) await this.flushPartialWrite(workspaceId, streamInfo); streamInfo.abortController.abort(); - // CRITICAL: Wait for processing to fully complete before cleanup - // This prevents race conditions where the old stream is still running - // while a new stream starts (e.g., old stream writing to partial.json) - await streamInfo.processingPromise; + await this.cleanupStream(workspaceId, streamInfo); + } catch (error) { + console.error("Error during stream cancellation:", error); + // Force cleanup even if cancellation fails + this.workspaceStreams.delete(workspaceId); + } + } - // Get usage and duration metadata (usage may be undefined if aborted early) - const { usage, duration } = await this.getStreamMetadata(streamInfo); + // Checks if a soft interrupt is necessary, and performs one if so + // Similar to cancelStreamSafely but performs cleanup without blocking + private async checkSoftCancelStream( + workspaceId: WorkspaceId, + streamInfo: WorkspaceStreamInfo + ): Promise { + if (!streamInfo.softInterruptPending) return; + try { + streamInfo.state = StreamState.STOPPING; - // Emit abort event with usage if available - this.emit("stream-abort", { - type: "stream-abort", - workspaceId: workspaceId as string, - messageId: streamInfo.messageId, - metadata: { usage, duration }, - }); + // Flush any pending partial write immediately (preserves work on interruption) + await this.flushPartialWrite(workspaceId, streamInfo); - // Clean up immediately - this.workspaceStreams.delete(workspaceId); + streamInfo.abortController.abort(); + + // Return back to the stream loop so we can wait for it to finish before + // sending the stream abort event. + void this.cleanupStream(workspaceId, streamInfo); } catch (error) { console.error("Error during stream cancellation:", error); // Force cleanup even if cancellation fails @@ -443,6 +452,28 @@ export class StreamManager extends EventEmitter { } } + private async cleanupStream( + workspaceId: WorkspaceId, + streamInfo: WorkspaceStreamInfo + ): Promise { + // CRITICAL: Wait for processing to fully complete before cleanup + // This prevents race conditions where the old stream is still running + // while a new stream starts (e.g., old stream writing to partial.json) + await streamInfo.processingPromise; + // Get usage and duration metadata (usage may be undefined if aborted early) + const { usage, duration } = await this.getStreamMetadata(streamInfo); + // Emit abort event with usage if available + this.emit("stream-abort", { + type: "stream-abort", + workspaceId: workspaceId as string, + messageId: streamInfo.messageId, + metadata: { usage, duration }, + }); + + // Clean up immediately + this.workspaceStreams.delete(workspaceId); + } + /** * Atomically creates a new stream with all necessary setup */ @@ -529,6 +560,7 @@ export class StreamManager extends EventEmitter { lastPartialWriteTime: 0, // Initialize to 0 to allow immediate first write partialWritePromise: undefined, // No write in flight initially processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream + softInterruptPending: false, runtimeTempDir, // Stream-scoped temp directory for tool outputs runtime, // Runtime for temp directory cleanup }; @@ -692,6 +724,7 @@ export class StreamManager extends EventEmitter { workspaceId: workspaceId as string, messageId: streamInfo.messageId, }); + await this.checkSoftCancelStream(workspaceId, streamInfo); break; } @@ -746,6 +779,7 @@ export class StreamManager extends EventEmitter { strippedOutput ); } + await this.checkSoftCancelStream(workspaceId, streamInfo); break; } @@ -782,6 +816,7 @@ export class StreamManager extends EventEmitter { toolErrorPart.toolName, errorOutput ); + await this.checkSoftCancelStream(workspaceId, streamInfo); break; } @@ -827,9 +862,14 @@ export class StreamManager extends EventEmitter { case "start-step": case "text-start": case "finish": - case "finish-step": + case "tool-input-end": // These events can be logged or handled if needed break; + + case "finish-step": + case "text-end": + await this.checkSoftCancelStream(workspaceId, streamInfo); + break; } } @@ -1317,13 +1357,28 @@ export class StreamManager extends EventEmitter { /** * Stops an active stream for a workspace + * First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..." + * Second call: Hard aborts the stream immediately */ - async stopStream(workspaceId: string): Promise> { + async stopStream( + workspaceId: string, + options?: { soft?: boolean; abandonPartial?: boolean } + ): Promise> { const typedWorkspaceId = workspaceId as WorkspaceId; try { const streamInfo = this.workspaceStreams.get(typedWorkspaceId); - if (streamInfo) { + if (!streamInfo) { + return Ok(undefined); // No active stream + } + + const soft = options?.soft ?? false; + + if (soft) { + // Soft interrupt: set flag, will cancel at next block boundary + streamInfo.softInterruptPending = true; + } else { + // Hard interrupt: cancel immediately await this.cancelStreamSafely(typedWorkspaceId, streamInfo); } return Ok(undefined); diff --git a/tests/ipcMain/sendMessage.test.ts b/tests/ipcMain/sendMessage.test.ts index 61d4113d6..fd3bc54e1 100644 --- a/tests/ipcMain/sendMessage.test.ts +++ b/tests/ipcMain/sendMessage.test.ts @@ -105,14 +105,23 @@ describeIntegration("IpcMain sendMessage integration tests", () => { const collector = createEventCollector(env.sentEvents, workspaceId); await collector.waitForEvent("stream-start", 5000); - // Use interruptStream() to interrupt - const interruptResult = await env.mockIpcRenderer.invoke( + // Use interruptStream() to soft-interrupt + const softInterruptResult = await env.mockIpcRenderer.invoke( IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM, - workspaceId + workspaceId, + { soft: true } ); // Should succeed (interrupt is not an error) - expect(interruptResult.success).toBe(true); + expect(softInterruptResult.success).toBe(true); + + // Then hard-interrupt + const hardInterruptResult = await env.mockIpcRenderer.invoke( + IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM, + workspaceId, + { soft: false } + ); + expect(hardInterruptResult.success).toBe(true); // Wait for abort or end event const abortOrEndReceived = await waitFor(() => { @@ -162,7 +171,8 @@ describeIntegration("IpcMain sendMessage integration tests", () => { // Interrupt the stream const interruptResult = await env.mockIpcRenderer.invoke( IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM, - workspaceId + workspaceId, + { soft: false } // hard interrupt for immediate cancellation ); const interruptDuration = performance.now() - interruptStartTime; @@ -292,7 +302,8 @@ describeIntegration("IpcMain sendMessage integration tests", () => { // Interrupt the stream with interruptStream() const interruptResult = await env.mockIpcRenderer.invoke( IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM, - workspaceId + workspaceId, + { soft: false } // hard interrupt ); expect(interruptResult.success).toBe(true);