From 57eafa7b9e95bcf7bc821af4505a5aa2f8374f33 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 3 Jun 2026 13:01:41 -0700 Subject: [PATCH 1/3] fix(dev): use globalThis for singleton state to prevent HMR memory leaks --- .../copilot/persistence/tool-confirm/index.ts | 18 ++- apps/sim/lib/copilot/tasks.ts | 20 +++- apps/sim/lib/core/config/redis.ts | 108 ++++++++++-------- apps/sim/lib/execution/cancellation.ts | 12 +- apps/sim/lib/mcp/connection-manager.ts | 13 ++- apps/sim/lib/mcp/pubsub.ts | 44 ++++--- 6 files changed, 134 insertions(+), 81 deletions(-) diff --git a/apps/sim/lib/copilot/persistence/tool-confirm/index.ts b/apps/sim/lib/copilot/persistence/tool-confirm/index.ts index cd13ee31057..09f5fd2ee93 100644 --- a/apps/sim/lib/copilot/persistence/tool-confirm/index.ts +++ b/apps/sim/lib/copilot/persistence/tool-confirm/index.ts @@ -9,16 +9,24 @@ import { import { getAsyncToolCalls } from '@/lib/copilot/async-runs/repository' import { MothershipStreamV1ToolOutcome } from '@/lib/copilot/generated/mothership-stream-v1' import { getRedisClient } from '@/lib/core/config/redis' -import { createPubSubChannel } from '@/lib/events/pubsub' +import { createPubSubChannel, type PubSubChannel } from '@/lib/events/pubsub' const logger = createLogger('CopilotOrchestratorPersistence') const TOOL_CONFIRMATION_TTL_SECONDS = 60 * 10 const toolConfirmationKey = (toolCallId: string) => `copilot:tool-confirmation:${toolCallId}` -const toolConfirmationChannel = createPubSubChannel({ - channel: 'copilot:tool-confirmation', - label: 'CopilotToolConfirmation', -}) +type ToolConfirmGlobal = typeof globalThis & { + _toolConfirmationChannel?: PubSubChannel +} + +const _g = globalThis as ToolConfirmGlobal +if (!_g._toolConfirmationChannel) { + _g._toolConfirmationChannel = createPubSubChannel({ + channel: 'copilot:tool-confirmation', + label: 'CopilotToolConfirmation', + }) +} +const toolConfirmationChannel = _g._toolConfirmationChannel /** * Get a tool call confirmation state from the durable async tool row. diff --git a/apps/sim/lib/copilot/tasks.ts b/apps/sim/lib/copilot/tasks.ts index db6594ebf28..252b82a61ae 100644 --- a/apps/sim/lib/copilot/tasks.ts +++ b/apps/sim/lib/copilot/tasks.ts @@ -7,7 +7,7 @@ * Channel: `task:status_changed` */ -import { createPubSubChannel } from '@/lib/events/pubsub' +import { createPubSubChannel, type PubSubChannel } from '@/lib/events/pubsub' interface TaskStatusEvent { workspaceId: string @@ -16,10 +16,20 @@ interface TaskStatusEvent { streamId?: string } -const channel = - typeof window !== 'undefined' - ? null - : createPubSubChannel({ channel: 'task:status_changed', label: 'task' }) +type TaskPubSubGlobal = typeof globalThis & { + _taskStatusChannel?: PubSubChannel | null +} + +const g = globalThis as TaskPubSubGlobal + +if (!('_taskStatusChannel' in g)) { + g._taskStatusChannel = + typeof window !== 'undefined' + ? null + : createPubSubChannel({ channel: 'task:status_changed', label: 'task' }) +} + +const channel = g._taskStatusChannel export const taskPubSub = channel ? { diff --git a/apps/sim/lib/core/config/redis.ts b/apps/sim/lib/core/config/redis.ts index 04f976b44ae..820c0215c1e 100644 --- a/apps/sim/lib/core/config/redis.ts +++ b/apps/sim/lib/core/config/redis.ts @@ -54,55 +54,65 @@ export function getRedisConnectionDefaults( } } -let globalRedisClient: Redis | null = null -let pingFailures = 0 -let pingInterval: NodeJS.Timeout | null = null -let pingInFlight = false +interface RedisState { + client: Redis | null + pingFailures: number + pingInterval: NodeJS.Timeout | null + pingInFlight: boolean + reconnectListeners: Array<() => void> +} + +const g = globalThis as typeof globalThis & { _redisState?: RedisState } +if (!g._redisState) { + g._redisState = { + client: null, + pingFailures: 0, + pingInterval: null, + pingInFlight: false, + reconnectListeners: [], + } +} +const state = g._redisState const PING_INTERVAL_MS = 15_000 const MAX_PING_FAILURES = 2 -/** Callbacks invoked when the PING health check forces a reconnect. */ -const reconnectListeners: Array<() => void> = [] - /** * Register a callback that fires when the PING health check forces a reconnect. * Useful for resetting cached adapters that hold a stale Redis reference. */ export function onRedisReconnect(cb: () => void): void { - reconnectListeners.push(cb) + state.reconnectListeners.push(cb) } function startPingHealthCheck(redis: Redis): void { - if (pingInterval) return + if (state.pingInterval) return - pingInterval = setInterval(async () => { - if (pingInFlight) return - pingInFlight = true + state.pingInterval = setInterval(async () => { + if (state.pingInFlight) return + state.pingInFlight = true try { await redis.ping() - pingFailures = 0 + state.pingFailures = 0 } catch (error) { - pingFailures++ + state.pingFailures++ logger.warn('Redis PING failed', { - consecutiveFailures: pingFailures, + consecutiveFailures: state.pingFailures, error: toError(error).message, }) - if (pingFailures >= MAX_PING_FAILURES) { + if (state.pingFailures >= MAX_PING_FAILURES) { logger.error('Redis PING failed consecutive times — forcing reconnect', { - consecutiveFailures: pingFailures, + consecutiveFailures: state.pingFailures, }) - pingFailures = 0 - // Drop the cached client and stop this health check before disconnecting, - // so the next getRedisClient() builds a fresh client and a fresh PING loop. - // Listeners may call getRedisClient() and must observe the cleared global. - globalRedisClient = null - if (pingInterval) { - clearInterval(pingInterval) - pingInterval = null + state.pingFailures = 0 + // Clear before notifying listeners — they may call getRedisClient() and must see the reset state. + state.client = null + if (state.pingInterval) { + clearInterval(state.pingInterval) + state.pingInterval = null } - for (const cb of reconnectListeners) { + for (const cb of state.reconnectListeners) { try { cb() } catch (cbError) { @@ -116,7 +126,7 @@ function startPingHealthCheck(redis: Redis): void { } } } finally { - pingInFlight = false + state.pingInFlight = false } }, PING_INTERVAL_MS) } @@ -131,7 +141,7 @@ function startPingHealthCheck(redis: Redis): void { export function getRedisClient(): Redis | null { if (typeof window !== 'undefined') return null if (!redisUrl) return null - if (globalRedisClient) return globalRedisClient + if (state.client) return state.client // Outside the try/catch so config errors aren't silently swallowed. const defaults = getRedisConnectionDefaults(redisUrl) @@ -139,7 +149,7 @@ export function getRedisClient(): Redis | null { try { logger.info('Initializing Redis client') - globalRedisClient = new Redis(redisUrl, { + state.client = new Redis(redisUrl, { ...defaults, commandTimeout: 5000, maxRetriesPerRequest: 5, @@ -162,17 +172,17 @@ export function getRedisClient(): Redis | null { }, }) - globalRedisClient.on('connect', () => logger.info('Redis connected')) - globalRedisClient.on('ready', () => logger.info('Redis ready')) - globalRedisClient.on('error', (err: Error) => { + state.client.on('connect', () => logger.info('Redis connected')) + state.client.on('ready', () => logger.info('Redis ready')) + state.client.on('error', (err: Error) => { logger.error('Redis error', { error: err.message, code: (err as any).code }) }) - globalRedisClient.on('close', () => logger.warn('Redis connection closed')) - globalRedisClient.on('end', () => logger.error('Redis connection ended')) + state.client.on('close', () => logger.warn('Redis connection closed')) + state.client.on('end', () => logger.error('Redis connection ended')) - startPingHealthCheck(globalRedisClient) + startPingHealthCheck(state.client) - return globalRedisClient + return state.client } catch (error) { logger.error('Failed to initialize Redis client', { error }) return null @@ -274,18 +284,18 @@ export async function extendLock( * Use for graceful shutdown. */ export async function closeRedisConnection(): Promise { - if (pingInterval) { - clearInterval(pingInterval) - pingInterval = null + if (state.pingInterval) { + clearInterval(state.pingInterval) + state.pingInterval = null } - if (globalRedisClient) { + if (state.client) { try { - await globalRedisClient.quit() + await state.client.quit() } catch (error) { logger.error('Error closing Redis connection', { error }) } finally { - globalRedisClient = null + state.client = null } } } @@ -294,12 +304,12 @@ export async function closeRedisConnection(): Promise { * Reset all module-level state. Only intended for use in tests. */ export function resetForTesting(): void { - if (pingInterval) { - clearInterval(pingInterval) - pingInterval = null + if (state.pingInterval) { + clearInterval(state.pingInterval) + state.pingInterval = null } - globalRedisClient = null - pingFailures = 0 - pingInFlight = false - reconnectListeners.length = 0 + state.client = null + state.pingFailures = 0 + state.pingInFlight = false + state.reconnectListeners.length = 0 } diff --git a/apps/sim/lib/execution/cancellation.ts b/apps/sim/lib/execution/cancellation.ts index ffa8ad9d444..a08ea280ed4 100644 --- a/apps/sim/lib/execution/cancellation.ts +++ b/apps/sim/lib/execution/cancellation.ts @@ -19,16 +19,20 @@ export type ExecutionCancellationRecordResult = reason: 'redis_unavailable' | 'redis_write_failed' } -let sharedChannel: PubSubChannel | null = null +type CancellationGlobal = typeof globalThis & { + _executionCancelChannel?: PubSubChannel +} + +const _g = globalThis as CancellationGlobal export function getCancellationChannel(): PubSubChannel { - if (!sharedChannel) { - sharedChannel = createPubSubChannel({ + if (!_g._executionCancelChannel) { + _g._executionCancelChannel = createPubSubChannel({ channel: EXECUTION_CANCEL_CHANNEL, label: 'execution-cancel', }) } - return sharedChannel + return _g._executionCancelChannel } export function isRedisCancellationEnabled(): boolean { diff --git a/apps/sim/lib/mcp/connection-manager.ts b/apps/sim/lib/mcp/connection-manager.ts index 178a3f54564..a6618a31c7c 100644 --- a/apps/sim/lib/mcp/connection-manager.ts +++ b/apps/sim/lib/mcp/connection-manager.ts @@ -461,6 +461,13 @@ export class McpConnectionManager { } } -export const mcpConnectionManager: McpConnectionManager | null = isTest - ? null - : new McpConnectionManager() +type McpManagerGlobal = typeof globalThis & { + _mcpConnectionManager?: McpConnectionManager | null +} + +const _g = globalThis as McpManagerGlobal +if (!('_mcpConnectionManager' in _g)) { + _g._mcpConnectionManager = isTest ? null : new McpConnectionManager() +} + +export const mcpConnectionManager: McpConnectionManager | null = _g._mcpConnectionManager diff --git a/apps/sim/lib/mcp/pubsub.ts b/apps/sim/lib/mcp/pubsub.ts index 725857ae9c7..43bae170979 100644 --- a/apps/sim/lib/mcp/pubsub.ts +++ b/apps/sim/lib/mcp/pubsub.ts @@ -11,7 +11,7 @@ * (published by serve route, consumed by serve route on other processes to push to local SSE clients) */ -import { createPubSubChannel } from '@/lib/events/pubsub' +import { createPubSubChannel, type PubSubChannel } from '@/lib/events/pubsub' import type { ToolsChangedEvent, WorkflowToolsChangedEvent } from '@/lib/mcp/types' interface McpPubSubAdapter { @@ -22,21 +22,35 @@ interface McpPubSubAdapter { dispose(): void } -const toolsChannel = - typeof window !== 'undefined' - ? null - : createPubSubChannel({ - channel: 'mcp:tools_changed', - label: 'mcp-tools', - }) +type McpPubSubGlobal = typeof globalThis & { + _mcpToolsChannel?: PubSubChannel | null + _mcpWorkflowToolsChannel?: PubSubChannel | null +} -const workflowToolsChannel = - typeof window !== 'undefined' - ? null - : createPubSubChannel({ - channel: 'mcp:workflow_tools_changed', - label: 'mcp-workflow-tools', - }) +const g = globalThis as McpPubSubGlobal + +if (!('_mcpToolsChannel' in g)) { + g._mcpToolsChannel = + typeof window !== 'undefined' + ? null + : createPubSubChannel({ + channel: 'mcp:tools_changed', + label: 'mcp-tools', + }) +} + +if (!('_mcpWorkflowToolsChannel' in g)) { + g._mcpWorkflowToolsChannel = + typeof window !== 'undefined' + ? null + : createPubSubChannel({ + channel: 'mcp:workflow_tools_changed', + label: 'mcp-workflow-tools', + }) +} + +const toolsChannel = g._mcpToolsChannel +const workflowToolsChannel = g._mcpWorkflowToolsChannel export const mcpPubSub: McpPubSubAdapter | null = typeof window !== 'undefined' || !toolsChannel || !workflowToolsChannel From 54d4446d375caa1f9ceaf41b52f00473195d9da7 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 3 Jun 2026 13:23:45 -0700 Subject: [PATCH 2/3] fix(dev): apply globalThis guard to rate-limiter storage factory to prevent listener accumulation --- .../lib/core/rate-limiter/storage/factory.ts | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/apps/sim/lib/core/rate-limiter/storage/factory.ts b/apps/sim/lib/core/rate-limiter/storage/factory.ts index 948e51ad907..9c712ad91f2 100644 --- a/apps/sim/lib/core/rate-limiter/storage/factory.ts +++ b/apps/sim/lib/core/rate-limiter/storage/factory.ts @@ -7,19 +7,27 @@ import { RedisTokenBucket } from './redis-token-bucket' const logger = createLogger('RateLimitStorage') -let cachedAdapter: RateLimitStorageAdapter | null = null -let reconnectListenerRegistered = false +type FactoryGlobal = typeof globalThis & { + _rlCachedAdapter?: RateLimitStorageAdapter | null + _rlReconnectListenerRegistered?: boolean +} + +const g = globalThis as FactoryGlobal +if (!('_rlCachedAdapter' in g)) { + g._rlCachedAdapter = null + g._rlReconnectListenerRegistered = false +} export function createStorageAdapter(): RateLimitStorageAdapter { - if (cachedAdapter) { - return cachedAdapter + if (g._rlCachedAdapter) { + return g._rlCachedAdapter } - if (!reconnectListenerRegistered) { + if (!g._rlReconnectListenerRegistered) { onRedisReconnect(() => { - cachedAdapter = null + g._rlCachedAdapter = null }) - reconnectListenerRegistered = true + g._rlReconnectListenerRegistered = true } const storageMethod = getStorageMethod() @@ -30,17 +38,17 @@ export function createStorageAdapter(): RateLimitStorageAdapter { logger.warn( 'Redis configured but client unavailable - falling back to PostgreSQL for rate limiting' ) - cachedAdapter = new DbTokenBucket() + g._rlCachedAdapter = new DbTokenBucket() } else { logger.info('Rate limiting: Using Redis') - cachedAdapter = new RedisTokenBucket(redis) + g._rlCachedAdapter = new RedisTokenBucket(redis) } } else { logger.info('Rate limiting: Using PostgreSQL') - cachedAdapter = new DbTokenBucket() + g._rlCachedAdapter = new DbTokenBucket() } - return cachedAdapter + return g._rlCachedAdapter! } export function getAdapterType(): StorageMethod { @@ -48,9 +56,9 @@ export function getAdapterType(): StorageMethod { } export function resetStorageAdapter(): void { - cachedAdapter = null + g._rlCachedAdapter = null } export function setStorageAdapter(adapter: RateLimitStorageAdapter): void { - cachedAdapter = adapter + g._rlCachedAdapter = adapter } From 138f146365a592720189e65599250fbf75e3d85d Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 3 Jun 2026 13:31:13 -0700 Subject: [PATCH 3/3] fix(types): resolve McpConnectionManager globalThis undefined type error --- apps/sim/lib/mcp/connection-manager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/lib/mcp/connection-manager.ts b/apps/sim/lib/mcp/connection-manager.ts index a6618a31c7c..158983739b2 100644 --- a/apps/sim/lib/mcp/connection-manager.ts +++ b/apps/sim/lib/mcp/connection-manager.ts @@ -470,4 +470,4 @@ if (!('_mcpConnectionManager' in _g)) { _g._mcpConnectionManager = isTest ? null : new McpConnectionManager() } -export const mcpConnectionManager: McpConnectionManager | null = _g._mcpConnectionManager +export const mcpConnectionManager: McpConnectionManager | null = _g._mcpConnectionManager ?? null