Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { generateRequestId } from '@/lib/core/utils/request'
import { generateId } from '@/lib/core/utils/uuid'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import { getWorkflowById } from '@/lib/workflows/utils'
import {
executeJobInline,
executeScheduleJob,
Expand Down Expand Up @@ -115,7 +116,6 @@ export async function GET(request: NextRequest) {
}

try {
const { getWorkflowById } = await import('@/lib/workflows/utils')
const resolvedWorkflow = schedule.workflowId
? await getWorkflowById(schedule.workflowId)
: null
Expand Down
58 changes: 30 additions & 28 deletions apps/sim/app/api/webhooks/poll/[provider]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ export async function GET(
const { provider } = await params
const requestId = generateShortId()

const LOCK_KEY = `${provider}-polling-lock`
let lockValue: string | undefined

try {
const authError = verifyCronAuth(request, `${provider} webhook polling`)
if (authError) return authError
Expand All @@ -31,29 +28,38 @@ export async function GET(
return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 })
}

lockValue = requestId
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
success: true,
message: 'Polling already in progress – skipped',
requestId,
status: 'skip',
},
{ status: 202 }
)
}
const LOCK_KEY = `${provider}-polling-lock`
let lockValue: string | undefined

try {
lockValue = requestId
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
success: true,
message: 'Polling already in progress – skipped',
requestId,
status: 'skip',
},
{ status: 202 }
)
}

const results = await pollProvider(provider)
const results = await pollProvider(provider)

return NextResponse.json({
success: true,
message: `${provider} polling completed`,
requestId,
status: 'completed',
...results,
})
return NextResponse.json({
success: true,
message: `${provider} polling completed`,
requestId,
status: 'completed',
...results,
})
} finally {
if (lockValue) {
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
}
}
} catch (error) {
logger.error(`Error during ${provider} polling (${requestId}):`, error)
return NextResponse.json(
Expand All @@ -65,9 +71,5 @@ export async function GET(
},
{ status: 500 }
)
} finally {
if (lockValue) {
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
}
}
}
105 changes: 75 additions & 30 deletions apps/sim/lib/webhooks/polling/gmail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,44 +151,68 @@ async function fetchNewEmails(
let latestHistoryId = config.historyId

if (useHistoryApi) {
const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}`
const messageIds = new Set<string>()
let pageToken: string | undefined

const historyResponse = await fetch(historyUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
})
do {
let historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}&historyTypes=messageAdded`
if (pageToken) {
historyUrl += `&pageToken=${pageToken}`
}

if (!historyResponse.ok) {
const errorData = await historyResponse.json()
logger.error(`[${requestId}] Gmail history API error:`, {
status: historyResponse.status,
statusText: historyResponse.statusText,
error: errorData,
const historyResponse = await fetch(historyUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
})

logger.info(`[${requestId}] Falling back to search API after history API failure`)
return searchEmails(accessToken, config, requestId, logger)
}
if (!historyResponse.ok) {
const status = historyResponse.status
const errorData = await historyResponse.json().catch(() => ({}))
logger.error(`[${requestId}] Gmail history API error:`, {
status,
statusText: historyResponse.statusText,
error: errorData,
})

if (status === 403 || status === 429) {
throw new Error(
`Gmail API error ${status} — skipping to retry next poll cycle: ${JSON.stringify(errorData)}`
)
}

const historyData = await historyResponse.json()
logger.info(`[${requestId}] Falling back to search API after history API error ${status}`)
const searchResult = await searchEmails(accessToken, config, requestId, logger)
if (searchResult.emails.length === 0) {
const freshHistoryId = await getGmailProfileHistoryId(accessToken, requestId, logger)
if (freshHistoryId) {
logger.info(
`[${requestId}] Fetched fresh historyId ${freshHistoryId} after invalid historyId (was: ${config.historyId})`
)
return { emails: [], latestHistoryId: freshHistoryId }
}
}
return searchResult
}

if (!historyData.history || !historyData.history.length) {
return { emails: [], latestHistoryId }
}
const historyData = await historyResponse.json()

if (historyData.historyId) {
latestHistoryId = historyData.historyId
}
if (historyData.historyId) {
latestHistoryId = historyData.historyId
}

const messageIds = new Set<string>()
for (const history of historyData.history) {
if (history.messagesAdded) {
for (const messageAdded of history.messagesAdded) {
messageIds.add(messageAdded.message.id)
if (historyData.history) {
for (const history of historyData.history) {
if (history.messagesAdded) {
for (const messageAdded of history.messagesAdded) {
messageIds.add(messageAdded.message.id)
}
}
}
}
}

if (messageIds.size === 0) {
pageToken = historyData.nextPageToken
} while (pageToken)

if (!messageIds.size) {
return { emails: [], latestHistoryId }
}

Expand Down Expand Up @@ -352,6 +376,29 @@ async function searchEmails(
}
}

async function getGmailProfileHistoryId(
accessToken: string,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<string | null> {
try {
const response = await fetch('https://gmail.googleapis.com/gmail/v1/users/me/profile', {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!response.ok) {
logger.warn(
`[${requestId}] Failed to fetch Gmail profile for fresh historyId: ${response.status}`
)
return null
}
const profile = await response.json()
return (profile.historyId as string | undefined) ?? null
} catch (error) {
logger.warn(`[${requestId}] Error fetching Gmail profile:`, error)
return null
}
}

async function getEmailDetails(accessToken: string, messageId: string): Promise<GmailEmail> {
const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full`

Expand Down Expand Up @@ -442,9 +489,7 @@ async function processEmails(
if (headers.date) {
try {
date = new Date(headers.date).toISOString()
} catch (_e) {
// Keep date as null if parsing fails
}
} catch (_e) {}
} else if (email.internalDate) {
date = new Date(Number.parseInt(email.internalDate)).toISOString()
}
Expand Down
Loading
Loading