From 4268465734b8ac099f53f7c84813b0325c0b0915 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 8 Apr 2026 21:47:20 -0700 Subject: [PATCH 1/9] improvement(polling): fix correctness and efficiency across all polling handlers - Gmail: paginate history API, add historyTypes filter, differentiate 403/429, fetch fresh historyId on fallback to break 404 retry loop - Outlook: follow @odata.nextLink pagination, use fetchWithRetry for all Graph calls, fix $top alignment, skip folder filter on partial resolution failure, remove Content-Type from GET requests - RSS: add conditional GET (ETag/If-None-Match), raise GUID cap to 500, fix 304 ETag capture per RFC 9111, align GUID tracking with idempotency fallback key - IMAP: single connection reuse, UIDVALIDITY tracking per mailbox, advance UID only on successful fetch, fix messageFlagsAdd range type, remove cross-mailbox legacy UID fallback - Dispatch polling via trigger.dev task with per-provider concurrency key; fall back to synchronous Redis-locked polling for self-hosted --- .../app/api/webhooks/poll/[provider]/route.ts | 91 +++-- apps/sim/background/provider-polling.ts | 34 ++ apps/sim/lib/webhooks/polling/gmail.ts | 103 ++++-- apps/sim/lib/webhooks/polling/imap.ts | 311 +++++++++--------- apps/sim/lib/webhooks/polling/outlook.ts | 111 ++++--- apps/sim/lib/webhooks/polling/rss.ts | 79 ++++- 6 files changed, 473 insertions(+), 256 deletions(-) create mode 100644 apps/sim/background/provider-polling.ts diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts index d314e8563b..5cbeec2b7c 100644 --- a/apps/sim/app/api/webhooks/poll/[provider]/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -1,9 +1,11 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { verifyCronAuth } from '@/lib/auth/internal' +import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' import { acquireLock, releaseLock } from '@/lib/core/config/redis' import { generateShortId } from '@/lib/core/utils/uuid' import { pollProvider, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling' +import { providerPolling } from '@/background/provider-polling' const logger = createLogger('PollingAPI') @@ -20,9 +22,6 @@ export async function GET( const { provider } = await params const requestId = generateShortId() - const LOCK_KEY = `${provider}-polling-lock` - let lockValue: string | undefined - try { const authError = verifyCronAuth(request, `${provider} webhook polling`) if (authError) return authError @@ -31,29 +30,75 @@ export async function GET( return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 }) } - lockValue = requestId - const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) - if (!locked) { - return NextResponse.json( - { + // When trigger.dev is enabled, dispatch polling as an async task and return immediately. + // Per-provider concurrency (concurrencyKey) ensures only one poll per provider runs at a time, + // while different providers (gmail vs outlook) can poll in parallel. + if (isTriggerDevEnabled) { + try { + const handle = await providerPolling.trigger( + { provider, requestId }, + { + concurrencyKey: provider, + tags: [`provider:${provider}`], + } + ) + + logger.info(`[${requestId}] Dispatched ${provider} polling to trigger.dev`, { + runId: handle.id, + }) + + return NextResponse.json({ success: true, - message: 'Polling already in progress – skipped', + message: `${provider} polling dispatched`, requestId, - status: 'skip', - }, - { status: 202 } - ) + runId: handle.id, + status: 'dispatched', + }) + } catch (triggerError) { + // If trigger.dev is unavailable, fall through to synchronous polling below. + logger.warn( + `[${requestId}] Trigger.dev dispatch failed for ${provider}, falling back to synchronous polling`, + { + error: triggerError instanceof Error ? triggerError.message : String(triggerError), + } + ) + } } - const results = await pollProvider(provider) + // Fallback: synchronous polling when trigger.dev is not enabled (self-hosted). + // Redis lock prevents concurrent polls for the same provider. + const LOCK_KEY = `${provider}-polling-lock` + let lockValue: string | undefined + + try { + lockValue = requestId + const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS) + if (!locked) { + return NextResponse.json( + { + success: true, + message: 'Polling already in progress – skipped', + requestId, + status: 'skip', + }, + { status: 202 } + ) + } - return NextResponse.json({ - success: true, - message: `${provider} polling completed`, - requestId, - status: 'completed', - ...results, - }) + const results = await pollProvider(provider) + + return NextResponse.json({ + success: true, + message: `${provider} polling completed`, + requestId, + status: 'completed', + ...results, + }) + } finally { + if (lockValue) { + await releaseLock(LOCK_KEY, lockValue).catch(() => {}) + } + } } catch (error) { logger.error(`Error during ${provider} polling (${requestId}):`, error) return NextResponse.json( @@ -65,9 +110,5 @@ export async function GET( }, { status: 500 } ) - } finally { - if (lockValue) { - await releaseLock(LOCK_KEY, lockValue).catch(() => {}) - } } } diff --git a/apps/sim/background/provider-polling.ts b/apps/sim/background/provider-polling.ts new file mode 100644 index 0000000000..b9a6dace77 --- /dev/null +++ b/apps/sim/background/provider-polling.ts @@ -0,0 +1,34 @@ +import { createLogger } from '@sim/logger' +import { task } from '@trigger.dev/sdk' +import { pollProvider } from '@/lib/webhooks/polling' + +const logger = createLogger('TriggerProviderPolling') + +export type ProviderPollingPayload = { + provider: string + requestId: string +} + +export const providerPolling = task({ + id: 'provider-polling', + machine: 'medium-1x', + maxDuration: 300, + retry: { + maxAttempts: 1, + }, + queue: { + name: 'provider-polling', + concurrencyLimit: 1, + }, + run: async (payload: ProviderPollingPayload) => { + const { provider, requestId } = payload + + logger.info(`[${requestId}] Starting ${provider} polling`) + + const result = await pollProvider(provider) + + logger.info(`[${requestId}] ${provider} polling completed`, result) + + return result + }, +}) diff --git a/apps/sim/lib/webhooks/polling/gmail.ts b/apps/sim/lib/webhooks/polling/gmail.ts index 7db8587d2c..8bbfd6f73a 100644 --- a/apps/sim/lib/webhooks/polling/gmail.ts +++ b/apps/sim/lib/webhooks/polling/gmail.ts @@ -151,44 +151,70 @@ async function fetchNewEmails( let latestHistoryId = config.historyId if (useHistoryApi) { - const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}` + const messageIds = new Set() + let pageToken: string | undefined - const historyResponse = await fetch(historyUrl, { - headers: { Authorization: `Bearer ${accessToken}` }, - }) + do { + let historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}&historyTypes=messageAdded` + if (pageToken) { + historyUrl += `&pageToken=${pageToken}` + } - if (!historyResponse.ok) { - const errorData = await historyResponse.json() - logger.error(`[${requestId}] Gmail history API error:`, { - status: historyResponse.status, - statusText: historyResponse.statusText, - error: errorData, + const historyResponse = await fetch(historyUrl, { + headers: { Authorization: `Bearer ${accessToken}` }, }) - logger.info(`[${requestId}] Falling back to search API after history API failure`) - return searchEmails(accessToken, config, requestId, logger) - } + if (!historyResponse.ok) { + const status = historyResponse.status + const errorData = await historyResponse.json().catch(() => ({})) + logger.error(`[${requestId}] Gmail history API error:`, { + status, + statusText: historyResponse.statusText, + error: errorData, + }) + + if (status === 403 || status === 429) { + throw new Error( + `Gmail API error ${status} — skipping to retry next poll cycle: ${JSON.stringify(errorData)}` + ) + } - const historyData = await historyResponse.json() + logger.info(`[${requestId}] Falling back to search API after history API error ${status}`) + const searchResult = await searchEmails(accessToken, config, requestId, logger) + // When search finds 0 emails after a history API failure, the stored historyId is likely + // invalid. Fetch a fresh one from the profile API to break the potential 404 retry loop. + if (searchResult.emails.length === 0) { + const freshHistoryId = await getGmailProfileHistoryId(accessToken, requestId, logger) + if (freshHistoryId) { + logger.info( + `[${requestId}] Fetched fresh historyId ${freshHistoryId} after invalid historyId (was: ${config.historyId})` + ) + return { emails: [], latestHistoryId: freshHistoryId } + } + } + return searchResult + } - if (!historyData.history || !historyData.history.length) { - return { emails: [], latestHistoryId } - } + const historyData = await historyResponse.json() - if (historyData.historyId) { - latestHistoryId = historyData.historyId - } + if (historyData.historyId) { + latestHistoryId = historyData.historyId + } - const messageIds = new Set() - for (const history of historyData.history) { - if (history.messagesAdded) { - for (const messageAdded of history.messagesAdded) { - messageIds.add(messageAdded.message.id) + if (historyData.history) { + for (const history of historyData.history) { + if (history.messagesAdded) { + for (const messageAdded of history.messagesAdded) { + messageIds.add(messageAdded.message.id) + } + } } } - } - if (messageIds.size === 0) { + pageToken = historyData.nextPageToken + } while (pageToken) + + if (!messageIds.size) { return { emails: [], latestHistoryId } } @@ -352,6 +378,29 @@ async function searchEmails( } } +async function getGmailProfileHistoryId( + accessToken: string, + requestId: string, + logger: ReturnType +): Promise { + try { + const response = await fetch('https://gmail.googleapis.com/gmail/v1/users/me/profile', { + headers: { Authorization: `Bearer ${accessToken}` }, + }) + if (!response.ok) { + logger.warn( + `[${requestId}] Failed to fetch Gmail profile for fresh historyId: ${response.status}` + ) + return null + } + const profile = await response.json() + return (profile.historyId as string | undefined) ?? null + } catch (error) { + logger.warn(`[${requestId}] Error fetching Gmail profile:`, error) + return null + } +} + async function getEmailDetails(accessToken: string, messageId: string): Promise { const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full` diff --git a/apps/sim/lib/webhooks/polling/imap.ts b/apps/sim/lib/webhooks/polling/imap.ts index e5822aa888..fa69ab0fe4 100644 --- a/apps/sim/lib/webhooks/polling/imap.ts +++ b/apps/sim/lib/webhooks/polling/imap.ts @@ -22,6 +22,8 @@ interface ImapWebhookConfig { includeAttachments: boolean lastProcessedUid?: number lastProcessedUidByMailbox?: Record + /** Stores the UIDVALIDITY value per mailbox as strings (bigint cannot be JSON-serialized). */ + uidValidityByMailbox?: Record lastCheckedTimestamp?: string maxEmailsPerPoll?: number } @@ -90,48 +92,92 @@ export const imapPollingHandler: PollingProviderHandler = { return 'failure' } - const { emails, latestUidByMailbox } = await fetchNewEmails( - config, - requestId, - hostValidation.resolvedIP!, - logger - ) - const pollTimestamp = new Date().toISOString() + const client = new ImapFlow({ + host: hostValidation.resolvedIP!, + servername: config.host, + port: config.port || 993, + secure: config.secure ?? true, + auth: { + user: config.username, + pass: config.password, + }, + tls: { rejectUnauthorized: true }, + logger: false, + }) + + let emails: Awaited>['emails'] = [] + let latestUidByMailbox: Record = {} + let uidValidityByMailbox: Record = {} - if (!emails || !emails.length) { - await updateImapState(webhookId, latestUidByMailbox, pollTimestamp, config, logger) - await markWebhookSuccess(webhookId, logger) - logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) - return 'success' - } + try { + await client.connect() + + const result = await fetchNewEmails(client, config, requestId, logger) + emails = result.emails + latestUidByMailbox = result.latestUidByMailbox + uidValidityByMailbox = result.uidValidityByMailbox + + const pollTimestamp = new Date().toISOString() + + if (!emails.length) { + await updateImapState( + webhookId, + latestUidByMailbox, + pollTimestamp, + config, + logger, + uidValidityByMailbox + ) + await markWebhookSuccess(webhookId, logger) + logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`) + await client.logout() + return 'success' + } + + logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) + + const { processedCount, failedCount } = await processEmails( + emails, + webhookData, + workflowData, + config, + client, + requestId, + logger + ) - logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`) + await updateImapState( + webhookId, + latestUidByMailbox, + pollTimestamp, + config, + logger, + uidValidityByMailbox + ) - const { processedCount, failedCount } = await processEmails( - emails, - webhookData, - workflowData, - config, - requestId, - hostValidation.resolvedIP!, - logger - ) + await client.logout() - await updateImapState(webhookId, latestUidByMailbox, pollTimestamp, config, logger) + if (failedCount > 0 && processedCount === 0) { + await markWebhookFailed(webhookId, logger) + logger.warn( + `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` + ) + return 'failure' + } - if (failedCount > 0 && processedCount === 0) { - await markWebhookFailed(webhookId, logger) - logger.warn( - `[${requestId}] All ${failedCount} emails failed to process for webhook ${webhookId}` + await markWebhookSuccess(webhookId, logger) + logger.info( + `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` ) - return 'failure' + return 'success' + } catch (innerError) { + try { + await client.logout() + } catch { + // Ignore logout errors + } + throw innerError } - - await markWebhookSuccess(webhookId, logger) - logger.info( - `[${requestId}] Successfully processed ${processedCount} emails for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}` - ) - return 'success' } catch (error) { logger.error(`[${requestId}] Error processing IMAP webhook ${webhookId}:`, error) await markWebhookFailed(webhookId, logger) @@ -145,7 +191,8 @@ async function updateImapState( uidByMailbox: Record, timestamp: string, config: ImapWebhookConfig, - logger: ReturnType + logger: ReturnType, + uidValidityByMailbox: Record ) { const existingUidByMailbox = config.lastProcessedUidByMailbox || {} const mergedUidByMailbox = { ...existingUidByMailbox } @@ -159,30 +206,18 @@ async function updateImapState( { lastProcessedUidByMailbox: mergedUidByMailbox, lastCheckedTimestamp: timestamp, + uidValidityByMailbox, }, logger ) } async function fetchNewEmails( + client: ImapFlow, config: ImapWebhookConfig, requestId: string, - resolvedIP: string, logger: ReturnType ) { - const client = new ImapFlow({ - host: resolvedIP, - servername: config.host, - port: config.port || 993, - secure: config.secure ?? true, - auth: { - user: config.username, - pass: config.password, - }, - tls: { rejectUnauthorized: true }, - logger: false, - }) - const emails: Array<{ uid: number mailboxPath: string @@ -193,97 +228,93 @@ async function fetchNewEmails( const mailboxes = getMailboxesToCheck(config) const latestUidByMailbox: Record = { ...(config.lastProcessedUidByMailbox || {}) } + const uidValidityByMailbox: Record = { ...(config.uidValidityByMailbox || {}) } - try { - await client.connect() - - const maxEmails = config.maxEmailsPerPoll || 25 - let totalEmailsCollected = 0 + const maxEmails = config.maxEmailsPerPoll || 25 + let totalEmailsCollected = 0 - for (const mailboxPath of mailboxes) { - if (totalEmailsCollected >= maxEmails) break + for (const mailboxPath of mailboxes) { + if (totalEmailsCollected >= maxEmails) break - try { - await client.mailboxOpen(mailboxPath) - - let searchCriteria: Record = { unseen: true } - if (config.searchCriteria) { - if (typeof config.searchCriteria === 'object') { - searchCriteria = config.searchCriteria as unknown as Record - } else if (typeof config.searchCriteria === 'string') { - try { - searchCriteria = JSON.parse(config.searchCriteria) - } catch { - logger.warn(`[${requestId}] Invalid search criteria JSON, using default`) - } - } - } + try { + const mailbox = await client.mailboxOpen(mailboxPath) - const lastUidForMailbox = latestUidByMailbox[mailboxPath] || config.lastProcessedUid + const currentUidValidity = mailbox.uidValidity.toString() + const storedUidValidity = uidValidityByMailbox[mailboxPath] - if (lastUidForMailbox) { - searchCriteria = { ...searchCriteria, uid: `${lastUidForMailbox + 1}:*` } - } + if (storedUidValidity && storedUidValidity !== currentUidValidity) { + logger.warn( + `[${requestId}] UIDVALIDITY changed for ${mailboxPath} (${storedUidValidity} -> ${currentUidValidity}), discarding stored UID` + ) + delete latestUidByMailbox[mailboxPath] + } + uidValidityByMailbox[mailboxPath] = currentUidValidity - if (config.lastCheckedTimestamp) { - const lastChecked = new Date(config.lastCheckedTimestamp) - const bufferTime = new Date(lastChecked.getTime() - 60000) - searchCriteria = { ...searchCriteria, since: bufferTime } - } else { - const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000) - searchCriteria = { ...searchCriteria, since: oneDayAgo } + let searchCriteria: Record = { unseen: true } + if (config.searchCriteria) { + if (typeof config.searchCriteria === 'object') { + searchCriteria = config.searchCriteria as unknown as Record + } else if (typeof config.searchCriteria === 'string') { + try { + searchCriteria = JSON.parse(config.searchCriteria) + } catch { + logger.warn(`[${requestId}] Invalid search criteria JSON, using default`) + } } + } - let messageUids: number[] = [] - try { - const searchResult = await client.search(searchCriteria, { uid: true }) - messageUids = searchResult === false ? [] : searchResult - } catch { - continue - } + const lastUidForMailbox = latestUidByMailbox[mailboxPath] - if (messageUids.length === 0) continue + if (lastUidForMailbox) { + searchCriteria = { ...searchCriteria, uid: `${lastUidForMailbox + 1}:*` } + } - messageUids.sort((a, b) => a - b) - const remainingSlots = maxEmails - totalEmailsCollected - const uidsToProcess = messageUids.slice(0, remainingSlots) + if (config.lastCheckedTimestamp) { + const lastChecked = new Date(config.lastCheckedTimestamp) + const bufferTime = new Date(lastChecked.getTime() - 60000) + searchCriteria = { ...searchCriteria, since: bufferTime } + } else { + const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000) + searchCriteria = { ...searchCriteria, since: oneDayAgo } + } - if (uidsToProcess.length > 0) { - latestUidByMailbox[mailboxPath] = Math.max( - ...uidsToProcess, - latestUidByMailbox[mailboxPath] || 0 - ) - } + let messageUids: number[] = [] + try { + const searchResult = await client.search(searchCriteria, { uid: true }) + messageUids = searchResult === false ? [] : searchResult + } catch { + continue + } - for await (const msg of client.fetch( - uidsToProcess, - { uid: true, envelope: true, bodyStructure: true, source: true }, - { uid: true } - )) { - emails.push({ - uid: msg.uid, - mailboxPath, - envelope: msg.envelope, - bodyStructure: msg.bodyStructure, - source: msg.source, - }) - totalEmailsCollected++ + if (messageUids.length === 0) continue + + messageUids.sort((a, b) => a - b) + const remainingSlots = maxEmails - totalEmailsCollected + const uidsToProcess = messageUids.slice(0, remainingSlots) + + for await (const msg of client.fetch( + uidsToProcess, + { uid: true, envelope: true, bodyStructure: true, source: true }, + { uid: true } + )) { + emails.push({ + uid: msg.uid, + mailboxPath, + envelope: msg.envelope, + bodyStructure: msg.bodyStructure, + source: msg.source, + }) + if (msg.uid > (latestUidByMailbox[mailboxPath] || 0)) { + latestUidByMailbox[mailboxPath] = msg.uid } - } catch (mailboxError) { - logger.warn(`[${requestId}] Error processing mailbox ${mailboxPath}:`, mailboxError) + totalEmailsCollected++ } + } catch (mailboxError) { + logger.warn(`[${requestId}] Error processing mailbox ${mailboxPath}:`, mailboxError) } - - await client.logout() - return { emails, latestUidByMailbox } - } catch (error) { - try { - await client.logout() - } catch { - // Ignore logout errors - } - throw error } + + return { emails, latestUidByMailbox, uidValidityByMailbox } } function getMailboxesToCheck(config: ImapWebhookConfig): string[] { @@ -437,34 +468,17 @@ async function processEmails( webhookData: PollWebhookContext['webhookData'], workflowData: PollWebhookContext['workflowData'], config: ImapWebhookConfig, + client: ImapFlow, requestId: string, - resolvedIP: string, logger: ReturnType ) { let processedCount = 0 let failedCount = 0 - const client = new ImapFlow({ - host: resolvedIP, - servername: config.host, - port: config.port || 993, - secure: config.secure ?? true, - auth: { - user: config.username, - pass: config.password, - }, - tls: { rejectUnauthorized: true }, - logger: false, - }) - let currentOpenMailbox: string | null = null const lockState: { lock: MailboxLockObject | null } = { lock: null } try { - if (config.markAsRead) { - await client.connect() - } - for (const email of emails) { try { await pollingIdempotency.executeWithIdempotency( @@ -541,7 +555,7 @@ async function processEmails( lockState.lock = await client.getMailboxLock(email.mailboxPath) currentOpenMailbox = email.mailboxPath } - await client.messageFlagsAdd({ uid: email.uid }, ['\\Seen'], { uid: true }) + await client.messageFlagsAdd(email.uid, ['\\Seen'], { uid: true }) } catch (flagError) { logger.warn( `[${requestId}] Failed to mark message ${email.uid} as read:`, @@ -565,14 +579,11 @@ async function processEmails( } } } finally { - if (config.markAsRead) { + if (lockState.lock) { try { - if (lockState.lock) { - lockState.lock.release() - } - await client.logout() + lockState.lock.release() } catch { - // Ignore logout errors + // Ignore lock release errors } } } diff --git a/apps/sim/lib/webhooks/polling/outlook.ts b/apps/sim/lib/webhooks/polling/outlook.ts index e6874940c6..c2f7f756b7 100644 --- a/apps/sim/lib/webhooks/polling/outlook.ts +++ b/apps/sim/lib/webhooks/polling/outlook.ts @@ -1,5 +1,6 @@ import { htmlToText } from 'html-to-text' import { pollingIdempotency } from '@/lib/core/idempotency/service' +import { fetchWithRetry } from '@/lib/knowledge/documents/utils' import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types' import { markWebhookFailed, @@ -166,6 +167,9 @@ export const outlookPollingHandler: PollingProviderHandler = { }, } +/** Hard cap on total emails fetched per poll to prevent unbounded pagination loops. */ +const OUTLOOK_HARD_MAX_EMAILS = 200 + async function fetchNewOutlookEmails( accessToken: string, config: OutlookWebhookConfig, @@ -181,53 +185,76 @@ async function fetchNewOutlookEmails( 'id,conversationId,subject,bodyPreview,body,from,toRecipients,ccRecipients,receivedDateTime,sentDateTime,hasAttachments,isRead,parentFolderId' ) params.append('$orderby', 'receivedDateTime desc') - params.append('$top', (config.maxEmailsPerPoll || 25).toString()) + const maxEmails = Math.min(config.maxEmailsPerPoll || 25, OUTLOOK_HARD_MAX_EMAILS) + params.append('$top', maxEmails.toString()) if (config.lastCheckedTimestamp) { const lastChecked = new Date(config.lastCheckedTimestamp) const bufferTime = new Date(lastChecked.getTime() - 60000) params.append('$filter', `receivedDateTime gt ${bufferTime.toISOString()}`) } + const allEmails: OutlookEmail[] = [] + let nextUrl: string | undefined = `${apiUrl}?${params.toString()}` + logger.info(`[${requestId}] Fetching emails from: ${nextUrl}`) - const fullUrl = `${apiUrl}?${params.toString()}` - logger.info(`[${requestId}] Fetching emails from: ${fullUrl}`) + while (nextUrl && allEmails.length < maxEmails) { + const response = await fetchWithRetry(nextUrl, { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + }) - const response = await fetch(fullUrl, { - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - }) + if (!response.ok) { + const errorData = await response + .json() + .catch(() => ({ error: { message: 'Unknown error' } })) + logger.error(`[${requestId}] Microsoft Graph API error:`, { + status: response.status, + statusText: response.statusText, + error: errorData, + }) + throw new Error( + `Microsoft Graph API error: ${response.status} ${response.statusText} - ${JSON.stringify(errorData)}` + ) + } - if (!response.ok) { - const errorData = await response.json().catch(() => ({ error: { message: 'Unknown error' } })) - logger.error(`[${requestId}] Microsoft Graph API error:`, { - status: response.status, - statusText: response.statusText, - error: errorData, - }) - throw new Error( - `Microsoft Graph API error: ${response.status} ${response.statusText} - ${JSON.stringify(errorData)}` - ) + const data = await response.json() + const pageEmails: OutlookEmail[] = data.value || [] + allEmails.push(...pageEmails) + + nextUrl = + allEmails.length < maxEmails ? (data['@odata.nextLink'] as string | undefined) : undefined + + if (pageEmails.length === 0) break } - const data = await response.json() - const emails = data.value || [] + logger.info(`[${requestId}] Fetched ${allEmails.length} emails total`) + + const emails = allEmails.slice(0, maxEmails) let resolvedFolderIds: Map | undefined + let skipFolderFilter = false if (config.folderIds && config.folderIds.length > 0) { - const hasWellKnownFolders = config.folderIds.some(isWellKnownFolderName) - if (hasWellKnownFolders) { + const wellKnownFolders = config.folderIds.filter(isWellKnownFolderName) + if (wellKnownFolders.length > 0) { resolvedFolderIds = await resolveWellKnownFolderIds( accessToken, config.folderIds, requestId, logger ) + if (resolvedFolderIds.size < wellKnownFolders.length) { + logger.warn( + `[${requestId}] Could not resolve all well-known folders (${resolvedFolderIds.size}/${wellKnownFolders.length}) — skipping folder filter to avoid incorrect results` + ) + skipFolderFilter = true + } } } - const filteredEmails = filterEmailsByFolder(emails, config, resolvedFolderIds) + const filteredEmails = skipFolderFilter + ? emails + : filterEmailsByFolder(emails, config, resolvedFolderIds) logger.info( `[${requestId}] Fetched ${emails.length} emails, ${filteredEmails.length} after filtering` @@ -262,12 +289,14 @@ async function resolveWellKnownFolderId( logger: ReturnType ): Promise { try { - const response = await fetch(`https://graph.microsoft.com/v1.0/me/mailFolders/${folderName}`, { - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - }) + const response = await fetchWithRetry( + `https://graph.microsoft.com/v1.0/me/mailFolders/${folderName}`, + { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + } + ) if (!response.ok) { logger.warn( @@ -455,12 +484,11 @@ async function downloadOutlookAttachments( const attachments: OutlookAttachment[] = [] try { - const response = await fetch( + const response = await fetchWithRetry( `https://graph.microsoft.com/v1.0/me/messages/${messageId}/attachments`, { headers: { Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', }, } ) @@ -511,14 +539,17 @@ async function markOutlookEmailAsRead( logger: ReturnType ) { try { - const response = await fetch(`https://graph.microsoft.com/v1.0/me/messages/${messageId}`, { - method: 'PATCH', - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ isRead: true }), - }) + const response = await fetchWithRetry( + `https://graph.microsoft.com/v1.0/me/messages/${messageId}`, + { + method: 'PATCH', + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ isRead: true }), + } + ) if (!response.ok) { logger.error( diff --git a/apps/sim/lib/webhooks/polling/rss.ts b/apps/sim/lib/webhooks/polling/rss.ts index 31044fc892..f182bf247d 100644 --- a/apps/sim/lib/webhooks/polling/rss.ts +++ b/apps/sim/lib/webhooks/polling/rss.ts @@ -12,7 +12,7 @@ import { } from '@/lib/webhooks/polling/utils' import { processPolledWebhookEvent } from '@/lib/webhooks/processor' -const MAX_GUIDS_TO_TRACK = 100 +const MAX_GUIDS_TO_TRACK = 500 interface RssWebhookConfig { feedUrl: string @@ -87,10 +87,15 @@ export const rssPollingHandler: PollingProviderHandler = { } const now = new Date() - const { feed, items: newItems } = await fetchNewRssItems(config, requestId, logger) + const { + feed, + items: newItems, + etag, + lastModified, + } = await fetchNewRssItems(config, requestId, logger) if (!newItems.length) { - await updateRssState(webhookId, now.toISOString(), [], config, logger) + await updateRssState(webhookId, now.toISOString(), [], config, logger, etag, lastModified) await markWebhookSuccess(webhookId, logger) logger.info(`[${requestId}] No new items found for webhook ${webhookId}`) return 'success' @@ -108,10 +113,23 @@ export const rssPollingHandler: PollingProviderHandler = { ) const newGuids = newItems - .map((item) => item.guid || item.link || '') + .map( + (item) => + item.guid || + item.link || + (item.title && item.pubDate ? `${item.title}-${item.pubDate}` : '') + ) .filter((guid) => guid.length > 0) - await updateRssState(webhookId, now.toISOString(), newGuids, config, logger) + await updateRssState( + webhookId, + now.toISOString(), + newGuids, + config, + logger, + etag, + lastModified + ) if (failedCount > 0 && processedCount === 0) { await markWebhookFailed(webhookId, logger) @@ -139,7 +157,9 @@ async function updateRssState( timestamp: string, newGuids: string[], config: RssWebhookConfig, - logger: ReturnType + logger: ReturnType, + etag?: string, + lastModified?: string ) { const existingGuids = config.lastSeenGuids || [] const allGuids = [...newGuids, ...existingGuids].slice(0, MAX_GUIDS_TO_TRACK) @@ -149,6 +169,8 @@ async function updateRssState( { lastCheckedTimestamp: timestamp, lastSeenGuids: allGuids, + ...(etag !== undefined ? { etag } : {}), + ...(lastModified !== undefined ? { lastModified } : {}), }, logger ) @@ -158,7 +180,7 @@ async function fetchNewRssItems( config: RssWebhookConfig, requestId: string, logger: ReturnType -): Promise<{ feed: RssFeed; items: RssItem[] }> { +): Promise<{ feed: RssFeed; items: RssItem[]; etag?: string; lastModified?: string }> { try { const urlValidation = await validateUrlWithDNS(config.feedUrl, 'feedUrl') if (!urlValidation.isValid) { @@ -166,24 +188,45 @@ async function fetchNewRssItems( throw new Error(`Invalid RSS feed URL: ${urlValidation.error}`) } + const headers: Record = { + 'User-Agent': 'Sim/1.0 RSS Poller', + Accept: 'application/rss+xml, application/xml, text/xml, */*', + } + if (config.etag) { + headers['If-None-Match'] = config.etag + } + if (config.lastModified) { + headers['If-Modified-Since'] = config.lastModified + } + const response = await secureFetchWithPinnedIP(config.feedUrl, urlValidation.resolvedIP!, { - headers: { - 'User-Agent': 'Sim/1.0 RSS Poller', - Accept: 'application/rss+xml, application/xml, text/xml, */*', - }, + headers, timeout: 30000, }) + if (response.status === 304) { + logger.info(`[${requestId}] RSS feed not modified (304) for ${config.feedUrl}`) + return { + feed: { items: [] } as RssFeed, + items: [], + etag: response.headers.get('etag') ?? config.etag, + lastModified: response.headers.get('last-modified') ?? config.lastModified, + } + } + if (!response.ok) { await response.text().catch(() => {}) throw new Error(`Failed to fetch RSS feed: ${response.status} ${response.statusText}`) } + const newEtag = response.headers.get('etag') ?? undefined + const newLastModified = response.headers.get('last-modified') ?? undefined + const xmlContent = await response.text() const feed = await parser.parseString(xmlContent) if (!feed.items || !feed.items.length) { - return { feed: feed as RssFeed, items: [] } + return { feed: feed as RssFeed, items: [], etag: newEtag, lastModified: newLastModified } } const lastCheckedTime = config.lastCheckedTimestamp @@ -192,7 +235,10 @@ async function fetchNewRssItems( const lastSeenGuids = new Set(config.lastSeenGuids || []) const newItems = feed.items.filter((item) => { - const itemGuid = item.guid || item.link || '' + const itemGuid = + item.guid || + item.link || + (item.title && item.pubDate ? `${item.title}-${item.pubDate}` : '') if (itemGuid && lastSeenGuids.has(itemGuid)) { return false @@ -220,7 +266,12 @@ async function fetchNewRssItems( `[${requestId}] Found ${newItems.length} new items (processing ${limitedItems.length})` ) - return { feed: feed as RssFeed, items: limitedItems as RssItem[] } + return { + feed: feed as RssFeed, + items: limitedItems as RssItem[], + etag: newEtag, + lastModified: newLastModified, + } } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' logger.error(`[${requestId}] Error fetching RSS feed:`, errorMessage) From f69895f213d4b326d0d79ba8ca83e7e35df946c7 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 8 Apr 2026 22:05:34 -0700 Subject: [PATCH 2/9] fix(rss): align idempotency key GUID fallback with tracking/filter guard --- apps/sim/lib/webhooks/polling/rss.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/webhooks/polling/rss.ts b/apps/sim/lib/webhooks/polling/rss.ts index f182bf247d..8c0833d646 100644 --- a/apps/sim/lib/webhooks/polling/rss.ts +++ b/apps/sim/lib/webhooks/polling/rss.ts @@ -292,7 +292,10 @@ async function processRssItems( for (const item of items) { try { - const itemGuid = item.guid || item.link || `${item.title}-${item.pubDate}` + const itemGuid = + item.guid || + item.link || + (item.title && item.pubDate ? `${item.title}-${item.pubDate}` : '') await pollingIdempotency.executeWithIdempotency( 'rss', From a0c2a75bef5e30d45a9d156f62e3610dbcedc47e Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 8 Apr 2026 22:17:27 -0700 Subject: [PATCH 3/9] removed comments --- .../app/api/webhooks/poll/[provider]/route.ts | 6 ------ apps/sim/lib/webhooks/polling/gmail.ts | 6 +----- apps/sim/lib/webhooks/polling/imap.ts | 21 +++++-------------- 3 files changed, 6 insertions(+), 27 deletions(-) diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts index 5cbeec2b7c..c49d950661 100644 --- a/apps/sim/app/api/webhooks/poll/[provider]/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -30,9 +30,6 @@ export async function GET( return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 }) } - // When trigger.dev is enabled, dispatch polling as an async task and return immediately. - // Per-provider concurrency (concurrencyKey) ensures only one poll per provider runs at a time, - // while different providers (gmail vs outlook) can poll in parallel. if (isTriggerDevEnabled) { try { const handle = await providerPolling.trigger( @@ -55,7 +52,6 @@ export async function GET( status: 'dispatched', }) } catch (triggerError) { - // If trigger.dev is unavailable, fall through to synchronous polling below. logger.warn( `[${requestId}] Trigger.dev dispatch failed for ${provider}, falling back to synchronous polling`, { @@ -65,8 +61,6 @@ export async function GET( } } - // Fallback: synchronous polling when trigger.dev is not enabled (self-hosted). - // Redis lock prevents concurrent polls for the same provider. const LOCK_KEY = `${provider}-polling-lock` let lockValue: string | undefined diff --git a/apps/sim/lib/webhooks/polling/gmail.ts b/apps/sim/lib/webhooks/polling/gmail.ts index 8bbfd6f73a..7ca379194f 100644 --- a/apps/sim/lib/webhooks/polling/gmail.ts +++ b/apps/sim/lib/webhooks/polling/gmail.ts @@ -181,8 +181,6 @@ async function fetchNewEmails( logger.info(`[${requestId}] Falling back to search API after history API error ${status}`) const searchResult = await searchEmails(accessToken, config, requestId, logger) - // When search finds 0 emails after a history API failure, the stored historyId is likely - // invalid. Fetch a fresh one from the profile API to break the potential 404 retry loop. if (searchResult.emails.length === 0) { const freshHistoryId = await getGmailProfileHistoryId(accessToken, requestId, logger) if (freshHistoryId) { @@ -491,9 +489,7 @@ async function processEmails( if (headers.date) { try { date = new Date(headers.date).toISOString() - } catch (_e) { - // Keep date as null if parsing fails - } + } catch (_e) {} } else if (email.internalDate) { date = new Date(Number.parseInt(email.internalDate)).toISOString() } diff --git a/apps/sim/lib/webhooks/polling/imap.ts b/apps/sim/lib/webhooks/polling/imap.ts index fa69ab0fe4..e3e087c631 100644 --- a/apps/sim/lib/webhooks/polling/imap.ts +++ b/apps/sim/lib/webhooks/polling/imap.ts @@ -22,7 +22,6 @@ interface ImapWebhookConfig { includeAttachments: boolean lastProcessedUid?: number lastProcessedUidByMailbox?: Record - /** Stores the UIDVALIDITY value per mailbox as strings (bigint cannot be JSON-serialized). */ uidValidityByMailbox?: Record lastCheckedTimestamp?: string maxEmailsPerPoll?: number @@ -173,9 +172,7 @@ export const imapPollingHandler: PollingProviderHandler = { } catch (innerError) { try { await client.logout() - } catch { - // Ignore logout errors - } + } catch {} throw innerError } } catch (error) { @@ -362,9 +359,7 @@ function extractTextFromSource(source: Buffer): { text: string; html: string } { if (lowerPart.includes('base64')) { try { text = Buffer.from(text.replace(/\s/g, ''), 'base64').toString('utf-8') - } catch { - // Keep as-is if base64 decode fails - } + } catch {} } } } else if (lowerPart.includes('content-type: text/html')) { @@ -379,9 +374,7 @@ function extractTextFromSource(source: Buffer): { text: string; html: string } { if (lowerPart.includes('base64')) { try { html = Buffer.from(html.replace(/\s/g, ''), 'base64').toString('utf-8') - } catch { - // Keep as-is if base64 decode fails - } + } catch {} } } } @@ -436,9 +429,7 @@ function extractAttachmentsFromSource( mimeType, size: buffer.length, }) - } catch { - // Skip if decode fails - } + } catch {} } } } @@ -582,9 +573,7 @@ async function processEmails( if (lockState.lock) { try { lockState.lock.release() - } catch { - // Ignore lock release errors - } + } catch {} } } From cc1e911e017650ce0af8f80b37e9ae9b90e8d57b Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 8 Apr 2026 22:34:49 -0700 Subject: [PATCH 4/9] fix(imap): clear stale UID when UIDVALIDITY changes during state merge --- apps/sim/lib/webhooks/polling/imap.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/sim/lib/webhooks/polling/imap.ts b/apps/sim/lib/webhooks/polling/imap.ts index e3e087c631..28043cd68a 100644 --- a/apps/sim/lib/webhooks/polling/imap.ts +++ b/apps/sim/lib/webhooks/polling/imap.ts @@ -198,6 +198,13 @@ async function updateImapState( mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0) } + const prevUidValidity = config.uidValidityByMailbox || {} + for (const [mailbox, validity] of Object.entries(uidValidityByMailbox)) { + if (prevUidValidity[mailbox] !== undefined && prevUidValidity[mailbox] !== validity) { + delete mergedUidByMailbox[mailbox] + } + } + await updateWebhookProviderConfig( webhookId, { From c8f5ed32dd262eb558c3e3ad8ccde256adefc5a6 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 8 Apr 2026 23:37:28 -0700 Subject: [PATCH 5/9] fix(rss): skip items with no identifiable GUID to avoid idempotency key collisions --- apps/sim/lib/webhooks/polling/rss.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/sim/lib/webhooks/polling/rss.ts b/apps/sim/lib/webhooks/polling/rss.ts index 8c0833d646..5f52ad8afd 100644 --- a/apps/sim/lib/webhooks/polling/rss.ts +++ b/apps/sim/lib/webhooks/polling/rss.ts @@ -297,6 +297,13 @@ async function processRssItems( item.link || (item.title && item.pubDate ? `${item.title}-${item.pubDate}` : '') + if (!itemGuid) { + logger.warn( + `[${requestId}] Skipping RSS item with no identifiable GUID for webhook ${webhookData.id}` + ) + continue + } + await pollingIdempotency.executeWithIdempotency( 'rss', `${webhookData.id}:${itemGuid}`, From 992935f96d8e269f27f462bfaf9d01eb14727277 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 8 Apr 2026 23:39:55 -0700 Subject: [PATCH 6/9] fix(schedules): convert dynamic import of getWorkflowById to static import --- apps/sim/app/api/schedules/execute/route.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 0d1e41a9e1..176103d682 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -8,6 +8,7 @@ import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' import { generateRequestId } from '@/lib/core/utils/request' import { generateId } from '@/lib/core/utils/uuid' import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' +import { getWorkflowById } from '@/lib/workflows/utils' import { executeJobInline, executeScheduleJob, @@ -115,7 +116,6 @@ export async function GET(request: NextRequest) { } try { - const { getWorkflowById } = await import('@/lib/workflows/utils') const resolvedWorkflow = schedule.workflowId ? await getWorkflowById(schedule.workflowId) : null From 9ed0b82f86a818dda445b912b3fac1fa141babeb Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 8 Apr 2026 23:58:07 -0700 Subject: [PATCH 7/9] fix(imap): preserve fresh UID after UIDVALIDITY reset in state merge --- apps/sim/lib/webhooks/polling/imap.ts | 28 ++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/apps/sim/lib/webhooks/polling/imap.ts b/apps/sim/lib/webhooks/polling/imap.ts index 28043cd68a..f82a8bb0bb 100644 --- a/apps/sim/lib/webhooks/polling/imap.ts +++ b/apps/sim/lib/webhooks/polling/imap.ts @@ -192,16 +192,30 @@ async function updateImapState( uidValidityByMailbox: Record ) { const existingUidByMailbox = config.lastProcessedUidByMailbox || {} - const mergedUidByMailbox = { ...existingUidByMailbox } + const prevUidValidity = config.uidValidityByMailbox || {} - for (const [mailbox, uid] of Object.entries(uidByMailbox)) { - mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0) + const resetMailboxes = new Set( + Object.entries(uidValidityByMailbox) + .filter( + ([mailbox, validity]) => + prevUidValidity[mailbox] !== undefined && prevUidValidity[mailbox] !== validity + ) + .map(([mailbox]) => mailbox) + ) + + const mergedUidByMailbox: Record = {} + + for (const [mailbox, uid] of Object.entries(existingUidByMailbox)) { + if (!resetMailboxes.has(mailbox)) { + mergedUidByMailbox[mailbox] = uid + } } - const prevUidValidity = config.uidValidityByMailbox || {} - for (const [mailbox, validity] of Object.entries(uidValidityByMailbox)) { - if (prevUidValidity[mailbox] !== undefined && prevUidValidity[mailbox] !== validity) { - delete mergedUidByMailbox[mailbox] + for (const [mailbox, uid] of Object.entries(uidByMailbox)) { + if (resetMailboxes.has(mailbox)) { + mergedUidByMailbox[mailbox] = uid + } else { + mergedUidByMailbox[mailbox] = Math.max(uid, mergedUidByMailbox[mailbox] || 0) } } From 2949517b4cf856f8b7fa58fd94ab51478bf0e4de Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 9 Apr 2026 10:47:03 -0700 Subject: [PATCH 8/9] improvement(polling): remove trigger.dev dispatch, use synchronous Redis-locked polling --- .../app/api/webhooks/poll/[provider]/route.ts | 33 ------------------ apps/sim/background/provider-polling.ts | 34 ------------------- 2 files changed, 67 deletions(-) delete mode 100644 apps/sim/background/provider-polling.ts diff --git a/apps/sim/app/api/webhooks/poll/[provider]/route.ts b/apps/sim/app/api/webhooks/poll/[provider]/route.ts index c49d950661..053d328b0d 100644 --- a/apps/sim/app/api/webhooks/poll/[provider]/route.ts +++ b/apps/sim/app/api/webhooks/poll/[provider]/route.ts @@ -1,11 +1,9 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { verifyCronAuth } from '@/lib/auth/internal' -import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' import { acquireLock, releaseLock } from '@/lib/core/config/redis' import { generateShortId } from '@/lib/core/utils/uuid' import { pollProvider, VALID_POLLING_PROVIDERS } from '@/lib/webhooks/polling' -import { providerPolling } from '@/background/provider-polling' const logger = createLogger('PollingAPI') @@ -30,37 +28,6 @@ export async function GET( return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 }) } - if (isTriggerDevEnabled) { - try { - const handle = await providerPolling.trigger( - { provider, requestId }, - { - concurrencyKey: provider, - tags: [`provider:${provider}`], - } - ) - - logger.info(`[${requestId}] Dispatched ${provider} polling to trigger.dev`, { - runId: handle.id, - }) - - return NextResponse.json({ - success: true, - message: `${provider} polling dispatched`, - requestId, - runId: handle.id, - status: 'dispatched', - }) - } catch (triggerError) { - logger.warn( - `[${requestId}] Trigger.dev dispatch failed for ${provider}, falling back to synchronous polling`, - { - error: triggerError instanceof Error ? triggerError.message : String(triggerError), - } - ) - } - } - const LOCK_KEY = `${provider}-polling-lock` let lockValue: string | undefined diff --git a/apps/sim/background/provider-polling.ts b/apps/sim/background/provider-polling.ts deleted file mode 100644 index b9a6dace77..0000000000 --- a/apps/sim/background/provider-polling.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { createLogger } from '@sim/logger' -import { task } from '@trigger.dev/sdk' -import { pollProvider } from '@/lib/webhooks/polling' - -const logger = createLogger('TriggerProviderPolling') - -export type ProviderPollingPayload = { - provider: string - requestId: string -} - -export const providerPolling = task({ - id: 'provider-polling', - machine: 'medium-1x', - maxDuration: 300, - retry: { - maxAttempts: 1, - }, - queue: { - name: 'provider-polling', - concurrencyLimit: 1, - }, - run: async (payload: ProviderPollingPayload) => { - const { provider, requestId } = payload - - logger.info(`[${requestId}] Starting ${provider} polling`) - - const result = await pollProvider(provider) - - logger.info(`[${requestId}] ${provider} polling completed`, result) - - return result - }, -}) From f82c7f7b6cbb397ac4285852a9330c5dd5e57731 Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 9 Apr 2026 10:53:31 -0700 Subject: [PATCH 9/9] fix(polling): decouple outlook page size from total email cap so pagination works --- apps/sim/lib/webhooks/polling/outlook.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/sim/lib/webhooks/polling/outlook.ts b/apps/sim/lib/webhooks/polling/outlook.ts index c2f7f756b7..faef69776e 100644 --- a/apps/sim/lib/webhooks/polling/outlook.ts +++ b/apps/sim/lib/webhooks/polling/outlook.ts @@ -170,6 +170,9 @@ export const outlookPollingHandler: PollingProviderHandler = { /** Hard cap on total emails fetched per poll to prevent unbounded pagination loops. */ const OUTLOOK_HARD_MAX_EMAILS = 200 +/** Number of items to request per Graph API page. Decoupled from the total cap so pagination actually runs. */ +const OUTLOOK_PAGE_SIZE = 50 + async function fetchNewOutlookEmails( accessToken: string, config: OutlookWebhookConfig, @@ -186,7 +189,7 @@ async function fetchNewOutlookEmails( ) params.append('$orderby', 'receivedDateTime desc') const maxEmails = Math.min(config.maxEmailsPerPoll || 25, OUTLOOK_HARD_MAX_EMAILS) - params.append('$top', maxEmails.toString()) + params.append('$top', OUTLOOK_PAGE_SIZE.toString()) if (config.lastCheckedTimestamp) { const lastChecked = new Date(config.lastCheckedTimestamp) @@ -220,7 +223,8 @@ async function fetchNewOutlookEmails( const data = await response.json() const pageEmails: OutlookEmail[] = data.value || [] - allEmails.push(...pageEmails) + const remaining = maxEmails - allEmails.length + allEmails.push(...pageEmails.slice(0, remaining)) nextUrl = allEmails.length < maxEmails ? (data['@odata.nextLink'] as string | undefined) : undefined @@ -230,7 +234,7 @@ async function fetchNewOutlookEmails( logger.info(`[${requestId}] Fetched ${allEmails.length} emails total`) - const emails = allEmails.slice(0, maxEmails) + const emails = allEmails let resolvedFolderIds: Map | undefined let skipFolderFilter = false