Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 46 additions & 47 deletions apps/sim/app/api/chat/[identifier]/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,36 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { createMockRequest } from '@/app/api/__test-utils__/utils'

vi.mock('@/lib/execution/preprocessing', () => ({
preprocessExecution: vi.fn().mockResolvedValue({
success: true,
actorUserId: 'test-user-id',
workflowRecord: {
id: 'test-workflow-id',
userId: 'test-user-id',
isDeployed: true,
workspaceId: 'test-workspace-id',
variables: {},
},
userSubscription: {
plan: 'pro',
status: 'active',
},
rateLimitInfo: {
allowed: true,
remaining: 100,
resetAt: new Date(),
},
}),
}))

vi.mock('@/lib/logs/execution/logging-session', () => ({
LoggingSession: vi.fn().mockImplementation(() => ({
safeStart: vi.fn().mockResolvedValue(undefined),
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
})),
}))

describe('Chat Identifier API Route', () => {
const createMockStream = () => {
return new ReadableStream({
Expand Down Expand Up @@ -307,48 +337,16 @@ describe('Chat Identifier API Route', () => {
})

it('should return 503 when workflow is not available', async () => {
// Override the default workflow result to return non-deployed
vi.doMock('@sim/db', () => {
// Track call count to return different results
let callCount = 0

const mockLimit = vi.fn().mockImplementation(() => {
callCount++
if (callCount === 1) {
// First call - chat query
return [
{
id: 'chat-id',
workflowId: 'unavailable-workflow',
userId: 'user-id',
isActive: true,
authType: 'public',
outputConfigs: [{ blockId: 'block-1', path: 'output' }],
},
]
}
if (callCount === 2) {
// Second call - workflow query
return [
{
isDeployed: false,
},
]
}
return []
})

const mockWhere = vi.fn().mockReturnValue({ limit: mockLimit })
const mockFrom = vi.fn().mockReturnValue({ where: mockWhere })
const mockSelect = vi.fn().mockReturnValue({ from: mockFrom })

return {
db: {
select: mockSelect,
},
chat: {},
workflow: {},
}
const { preprocessExecution } = await import('@/lib/execution/preprocessing')
const originalImplementation = vi.mocked(preprocessExecution).getMockImplementation()

vi.mocked(preprocessExecution).mockResolvedValueOnce({
success: false,
error: {
message: 'Workflow is not deployed',
statusCode: 403,
logCreated: true,
},
})

const req = createMockRequest('POST', { input: 'Hello' })
Expand All @@ -358,11 +356,15 @@ describe('Chat Identifier API Route', () => {

const response = await POST(req, { params })

expect(response.status).toBe(503)
expect(response.status).toBe(403)

const data = await response.json()
expect(data).toHaveProperty('error')
expect(data).toHaveProperty('message', 'Chat workflow is not available')
expect(data).toHaveProperty('message', 'Workflow is not deployed')

if (originalImplementation) {
vi.mocked(preprocessExecution).mockImplementation(originalImplementation)
}
})

it('should return streaming response for valid chat messages', async () => {
Expand All @@ -378,7 +380,6 @@ describe('Chat Identifier API Route', () => {
expect(response.headers.get('Cache-Control')).toBe('no-cache')
expect(response.headers.get('Connection')).toBe('keep-alive')

// Verify createStreamingResponse was called with correct workflow info
expect(mockCreateStreamingResponse).toHaveBeenCalledWith(
expect.objectContaining({
workflow: expect.objectContaining({
Expand Down Expand Up @@ -408,7 +409,6 @@ describe('Chat Identifier API Route', () => {
expect(response.status).toBe(200)
expect(response.body).toBeInstanceOf(ReadableStream)

// Test that we can read from the response stream
if (response.body) {
const reader = response.body.getReader()
const { value, done } = await reader.read()
Expand Down Expand Up @@ -447,7 +447,6 @@ describe('Chat Identifier API Route', () => {
})

it('should handle invalid JSON in request body', async () => {
// Create a request with invalid JSON
const req = {
method: 'POST',
headers: new Headers(),
Expand Down
125 changes: 35 additions & 90 deletions apps/sim/app/api/chat/[identifier]/route.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { chat, workflow, workspace } from '@sim/db/schema'
import { chat } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { ChatFiles } from '@/lib/uploads'
Expand Down Expand Up @@ -93,7 +94,7 @@ export async function POST(
if (!deployment.isActive) {
logger.warn(`[${requestId}] Chat is not active: ${identifier}`)

const executionId = uuidv4()
const executionId = randomUUID()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
Expand Down Expand Up @@ -140,82 +141,35 @@ export async function POST(
return addCorsHeaders(createErrorResponse('No input provided', 400), request)
}

const workflowResult = await db
.select({
isDeployed: workflow.isDeployed,
workspaceId: workflow.workspaceId,
variables: workflow.variables,
})
.from(workflow)
.where(eq(workflow.id, deployment.workflowId))
.limit(1)
const executionId = randomUUID()

if (workflowResult.length === 0 || !workflowResult[0].isDeployed) {
logger.warn(`[${requestId}] Workflow not found or not deployed: ${deployment.workflowId}`)

const executionId = uuidv4()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)
const loggingSession = new LoggingSession(deployment.workflowId, executionId, 'chat', requestId)

await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: workflowResult[0]?.workspaceId || '',
variables: {},
})
const preprocessResult = await preprocessExecution({
workflowId: deployment.workflowId,
userId: deployment.userId,
triggerType: 'chat',
executionId,
requestId,
checkRateLimit: false, // Chat bypasses rate limits
checkDeployment: true, // Chat requires deployed workflows
loggingSession,
})

await loggingSession.safeCompleteWithError({
error: {
message: 'Chat workflow is not available. The workflow is not deployed.',
stackTrace: undefined,
},
traceSpans: [],
})

return addCorsHeaders(createErrorResponse('Chat workflow is not available', 503), request)
if (!preprocessResult.success) {
logger.warn(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`)
return addCorsHeaders(
createErrorResponse(
preprocessResult.error?.message || 'Failed to process request',
preprocessResult.error?.statusCode || 500
),
request
)
}

let workspaceOwnerId = deployment.userId
if (workflowResult[0].workspaceId) {
const workspaceData = await db
.select({ ownerId: workspace.ownerId })
.from(workspace)
.where(eq(workspace.id, workflowResult[0].workspaceId))
.limit(1)

if (workspaceData.length === 0) {
logger.error(`[${requestId}] Workspace not found for workflow ${deployment.workflowId}`)

const executionId = uuidv4()
const loggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)

await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: workflowResult[0].workspaceId || '',
variables: {},
})

await loggingSession.safeCompleteWithError({
error: {
message: 'Workspace not found. Critical configuration error - please contact support.',
stackTrace: undefined,
},
traceSpans: [],
})

return addCorsHeaders(createErrorResponse('Workspace not found', 500), request)
}

workspaceOwnerId = workspaceData[0].ownerId
}
const { actorUserId, workflowRecord } = preprocessResult
const workspaceOwnerId = actorUserId!
const workspaceId = workflowRecord?.workspaceId || ''

try {
const selectedOutputs: string[] = []
Expand All @@ -232,12 +186,10 @@ export async function POST(
const { SSE_HEADERS } = await import('@/lib/utils')
const { createFilteredResult } = await import('@/app/api/workflows/[id]/execute/route')

const executionId = crypto.randomUUID()

const workflowInput: any = { input, conversationId }
if (files && Array.isArray(files) && files.length > 0) {
const executionContext = {
workspaceId: workflowResult[0].workspaceId || '',
workspaceId,
workflowId: deployment.workflowId,
executionId,
}
Expand All @@ -257,20 +209,13 @@ export async function POST(
} catch (fileError: any) {
logger.error(`[${requestId}] Failed to process chat files:`, fileError)

const fileLoggingSession = new LoggingSession(
deployment.workflowId,
executionId,
'chat',
requestId
)

await fileLoggingSession.safeStart({
await loggingSession.safeStart({
userId: workspaceOwnerId,
workspaceId: workflowResult[0].workspaceId || '',
workspaceId,
variables: {},
})

await fileLoggingSession.safeCompleteWithError({
await loggingSession.safeCompleteWithError({
error: {
message: `File upload failed: ${fileError.message || 'Unable to process uploaded files'}`,
stackTrace: fileError.stack,
Expand All @@ -285,9 +230,9 @@ export async function POST(
const workflowForExecution = {
id: deployment.workflowId,
userId: deployment.userId,
workspaceId: workflowResult[0].workspaceId,
isDeployed: true,
variables: workflowResult[0].variables || {},
workspaceId,
isDeployed: workflowRecord?.isDeployed ?? false,
variables: workflowRecord?.variables || {},
}

const stream = await createStreamingResponse({
Expand Down
Loading