Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/sim/app/api/schedules/execute/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ vi.mock('@/background/schedule-execution', () => ({
executeScheduleJob: mockExecuteScheduleJob,
executeJobInline: mockExecuteJobInline,
releaseScheduleLock: mockReleaseScheduleLock,
buildScheduleFailureUpdate: (now: Date, nextRunAt: Date | null) => ({
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
failedCount: { type: 'sql' },
lastFailedAt: now,
status: { type: 'sql' },
infraRetryCount: 0,
}),
}))

vi.mock('@/lib/core/config/feature-flags', () => mockFeatureFlags)
Expand Down
22 changes: 3 additions & 19 deletions apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import {
SCHEDULE_WORKFLOW_ENQUEUE_LIMIT,
} from '@/lib/workflows/schedules/execution-limits'
import {
buildScheduleFailureUpdate,
executeJobInline,
executeScheduleJob,
releaseScheduleLock,
type ScheduleExecutionPayload,
} from '@/background/schedule-execution'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'

export const dynamic = 'force-dynamic'
export const maxDuration = 3600
Expand Down Expand Up @@ -321,15 +321,7 @@ async function markClaimedScheduleFailed(
const now = new Date()
await db
.update(workflowSchedule)
.set({
updatedAt: now,
lastQueuedAt: null,
lastFailedAt: now,
nextRunAt: getScheduleNextRunAt(schedule, now),
failedCount: sql`COALESCE(${workflowSchedule.failedCount}, 0) + 1`,
status: sql`CASE WHEN COALESCE(${workflowSchedule.failedCount}, 0) + 1 >= ${MAX_CONSECUTIVE_FAILURES} THEN 'disabled' ELSE 'active' END`,
infraRetryCount: 0,
})
.set(buildScheduleFailureUpdate(now, getScheduleNextRunAt(schedule, now)))
.where(
and(
eq(workflowSchedule.id, schedule.id),
Expand Down Expand Up @@ -482,15 +474,7 @@ async function recoverStaleDatabaseScheduleJobs(now: Date): Promise<void> {

await tx
.update(workflowSchedule)
.set({
updatedAt: now,
lastQueuedAt: null,
lastFailedAt: now,
nextRunAt: getScheduleNextRunAt(payload, now),
failedCount: sql`COALESCE(${workflowSchedule.failedCount}, 0) + 1`,
status: sql`CASE WHEN COALESCE(${workflowSchedule.failedCount}, 0) + 1 >= ${MAX_CONSECUTIVE_FAILURES} THEN 'disabled' ELSE 'active' END`,
infraRetryCount: 0,
})
.set(buildScheduleFailureUpdate(now, getScheduleNextRunAt(payload, now)))
.where(
and(
eq(workflowSchedule.id, payload.scheduleId),
Expand Down
82 changes: 39 additions & 43 deletions apps/sim/background/schedule-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,29 @@ function resetScheduleInfraRetryCount(): Pick<WorkflowScheduleUpdate, 'infraRetr
return { infraRetryCount: 0 }
}

/**
* Builds the schedule update shared by every path that treats a run as a failure:
* clears the claim, advances to `nextRunAt`, increments the consecutive-failure
* counter, stamps `lastFailedAt`, and auto-disables once `MAX_CONSECUTIVE_FAILURES`
* is reached. Centralizing this keeps all failure branches (preprocessing,
* execution, exhausted infra retries, usage limit) from diverging — only the
* `nextRunAt` cadence differs per caller.
*/
export function buildScheduleFailureUpdate(
now: Date,
nextRunAt: Date | null
): WorkflowScheduleUpdate {
return {
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
failedCount: incrementScheduleFailedCount(),
lastFailedAt: now,
status: scheduleStatusAfterFailedCountIncrement(),
...resetScheduleInfraRetryCount(),
}
}

type RunWorkflowResult =
| {
status: 'skip'
Expand Down Expand Up @@ -191,15 +214,7 @@ async function retryScheduleAfterInfraFailure({
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)
await applyScheduleUpdate(
payload.scheduleId,
{
updatedAt: now,
nextRunAt,
lastQueuedAt: null,
failedCount: incrementScheduleFailedCount(),
lastFailedAt: now,
status: scheduleStatusAfterFailedCountIncrement(),
...resetScheduleInfraRetryCount(),
},
buildScheduleFailureUpdate(now, nextRunAt),
requestId,
`Error updating schedule ${payload.scheduleId} after exhausted infrastructure retries`,
{ expectedLastQueuedAt: claimedAt }
Expand Down Expand Up @@ -777,17 +792,22 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
}

case 402: {
logger.warn(`[${requestId}] Usage limit exceeded, scheduling next run`)
/**
* Usage limits are a billing state, not a broken workflow, but they only
* clear on billing-period rollover or upgrade. Keep retrying at the normal
* cadence, but count each hit toward the shared auto-disable threshold so an
* abandoned over-limit schedule eventually stops instead of running forever.
* A successful run resets failedCount, so transient overages self-heal.
*/
const nextRunAt =
(await calculateNextRunFromDeployment(payload, requestId)) ??
new Date(now.getTime() + 60 * 60 * 1000)
logger.warn(`[${requestId}] Usage limit exceeded, counting as failed run`, {
scheduleId: payload.scheduleId,
nextRunAt: nextRunAt.toISOString(),
})
await updateClaimedSchedule(
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
...resetScheduleInfraRetryCount(),
},
buildScheduleFailureUpdate(now, nextRunAt),
`Error updating schedule ${payload.scheduleId} after usage limit check`
)
return
Expand All @@ -809,15 +829,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)

await updateClaimedSchedule(
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
failedCount: incrementScheduleFailedCount(),
lastFailedAt: now,
status: scheduleStatusAfterFailedCountIncrement(),
...resetScheduleInfraRetryCount(),
},
buildScheduleFailureUpdate(now, nextRunAt),
`Error updating schedule ${payload.scheduleId} after preprocessing failure`
)
return
Expand Down Expand Up @@ -914,15 +926,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const nextRunAt = calculateNextRunTime(payload, executionResult.blocks)

await updateClaimedSchedule(
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
failedCount: incrementScheduleFailedCount(),
lastFailedAt: now,
status: scheduleStatusAfterFailedCountIncrement(),
...resetScheduleInfraRetryCount(),
},
buildScheduleFailureUpdate(now, nextRunAt),
`Error updating schedule ${payload.scheduleId} after failure`
)
} catch (error: unknown) {
Expand All @@ -934,15 +938,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)

await updateClaimedSchedule(
{
updatedAt: now,
lastQueuedAt: null,
nextRunAt,
failedCount: incrementScheduleFailedCount(),
lastFailedAt: now,
status: scheduleStatusAfterFailedCountIncrement(),
...resetScheduleInfraRetryCount(),
},
buildScheduleFailureUpdate(now, nextRunAt),
`Error updating schedule ${payload.scheduleId} after execution error`
)
}
Expand Down
Loading