From 98acaaf6c06df3e978ec986d300609e0ea34d066 Mon Sep 17 00:00:00 2001 From: kalpadhwaryu Date: Tue, 18 Mar 2025 19:02:43 +0530 Subject: [PATCH 1/6] fix(oauth-ingestion): Get new oauthclient when 401 error occurs --- server/api/oauth.ts | 4 +- server/integrations/google/gmail-worker.ts | 25 ++++-- server/integrations/google/gmail/index.ts | 25 ++++-- server/integrations/google/index.ts | 94 ++++++++++++++++++++-- server/integrations/google/sync.ts | 31 ++++++- server/integrations/google/utils.ts | 6 +- server/integrations/google/worker-utils.ts | 5 +- server/utils.ts | 10 ++- 8 files changed, 172 insertions(+), 28 deletions(-) diff --git a/server/api/oauth.ts b/server/api/oauth.ts index b35e861d..0cec6f7c 100644 --- a/server/api/oauth.ts +++ b/server/api/oauth.ts @@ -72,7 +72,9 @@ export const OAuthCallback = async (c: Context) => { email: sub, } // Enqueue the background job within the same transaction - const jobId = await boss.send(SaaSQueue, SaasJobPayload) + const jobId = await boss.send(SaaSQueue, SaasJobPayload, { + expireInHours: 23, + }) Logger.info(`Job ${jobId} enqueued for connection ${connector.id}`) diff --git a/server/integrations/google/gmail-worker.ts b/server/integrations/google/gmail-worker.ts index 81b45dc1..7df7177f 100644 --- a/server/integrations/google/gmail-worker.ts +++ b/server/integrations/google/gmail-worker.ts @@ -110,6 +110,8 @@ export const handleGmailIngestion = async ( const profile = await retryWithBackoff( () => gmail.users.getProfile({ userId: "me" }), "Fetching Gmail user profile", + 0, + client, ) const historyId = profile.data.historyId! if (!historyId) { @@ -127,6 +129,8 @@ export const handleGmailIngestion = async ( fields: "messages(id), nextPageToken", }), `Fetching Gmail messages list (pageToken: ${nextPageToken})`, + 0, + client, ) nextPageToken = resp.data.nextPageToken ?? "" @@ -144,8 +148,10 @@ export const handleGmailIngestion = async ( format: "full", }), `Fetching Gmail message (id: ${message.id})`, + 0, + client, ) - const mail = await parseMail(msgResp.data, gmail) + const mail = await parseMail(msgResp.data, gmail, client) attachmentCount += mail.attachments.length await insert(mail, mailSchema) // updateUserStats(email, StatType.Gmail, 1) @@ -219,6 +225,7 @@ const extractEmailAddresses = (headerValue: string): string[] => { export const parseMail = async ( email: gmail_v1.Schema$Message, gmail: gmail_v1.Gmail, + client: GoogleClient, ): Promise => { const messageId = email.id const threadId = email.threadId @@ -299,12 +306,16 @@ export const parseMail = async ( ) { try { const { attachmentId, size } = body - const attachmentChunks = await getGmailAttachmentChunks(gmail, { - attachmentId: attachmentId, - filename: filename, - size: size ? size : 0, - messageId: messageId, - }) + const attachmentChunks = await getGmailAttachmentChunks( + gmail, + { + attachmentId: attachmentId, + filename: filename, + size: size ? size : 0, + messageId: messageId, + }, + client, + ) if (!attachmentChunks) continue const attachmentDoc: MailAttachment = { diff --git a/server/integrations/google/gmail/index.ts b/server/integrations/google/gmail/index.ts index 3cd7d59a..0b3becfb 100644 --- a/server/integrations/google/gmail/index.ts +++ b/server/integrations/google/gmail/index.ts @@ -46,6 +46,8 @@ export const handleGmailIngestion = async ( const profile = await retryWithBackoff( () => gmail.users.getProfile({ userId: "me" }), "Fetching Gmail user profile", + 0, + client, ) const historyId = profile.data.historyId! if (!historyId) { @@ -63,6 +65,8 @@ export const handleGmailIngestion = async ( fields: "messages(id), nextPageToken", }), `Fetching Gmail messages list (pageToken: ${nextPageToken})`, + 0, + client, ) nextPageToken = resp.data.nextPageToken ?? "" @@ -80,9 +84,11 @@ export const handleGmailIngestion = async ( format: "full", }), `Fetching Gmail message (id: ${message.id})`, + 0, + client, ) await insert( - await parseMail(msgResp.data, gmail, email), + await parseMail(msgResp.data, gmail, email, client), mailSchema, ) updateUserStats(email, StatType.Gmail, 1) @@ -144,6 +150,7 @@ export const parseMail = async ( email: gmail_v1.Schema$Message, gmail: gmail_v1.Gmail, userEmail: string, + client: GoogleClient, ): Promise => { const messageId = email.id const threadId = email.threadId @@ -230,12 +237,16 @@ export const parseMail = async ( ) { try { const { attachmentId, size } = body - const attachmentChunks = await getGmailAttachmentChunks(gmail, { - attachmentId: attachmentId, - filename: filename, - size: size ? size : 0, - messageId: messageId, - }) + const attachmentChunks = await getGmailAttachmentChunks( + gmail, + { + attachmentId: attachmentId, + filename: filename, + size: size ? size : 0, + messageId: messageId, + }, + client, + ) if (!attachmentChunks) continue const attachmentDoc: MailAttachment = { diff --git a/server/integrations/google/index.ts b/server/integrations/google/index.ts index 56d7eebf..93358cfa 100644 --- a/server/integrations/google/index.ts +++ b/server/integrations/google/index.ts @@ -38,8 +38,14 @@ import { SaaSQueue } from "@/queue" import { wsConnections } from "@/integrations/google/ws" import type { WSContext } from "hono/ws" import { db } from "@/db/client" -import { connectors, type SelectConnector } from "@/db/schema" -import { eq } from "drizzle-orm" +import { + connectors, + oauthProviders, + selectConnectorSchema, + type SelectConnector, + type SelectOAuthProvider, +} from "@/db/schema" +import { and, eq } from "drizzle-orm" import { getWorkspaceById } from "@/db/workspace" import { Apps, @@ -77,6 +83,9 @@ import { DeleteDocumentError, DownloadDocumentError, CalendarEventsListingError, + NoOauthConnectorFound, + MissingOauthConnectorCredentialsError, + FetchProviderFailed, } from "@/errors" import fs, { existsSync, mkdirSync } from "node:fs" import path, { join } from "node:path" @@ -437,6 +446,8 @@ const insertCalendarEvents = async ( fields: eventFields, }), `Fetching all calendar events`, + 0, + client, ) if (res.data.items) { events = events.concat(res.data.items) @@ -566,7 +577,50 @@ export const handleGoogleOAuthIngestion = async ( ) const userEmail = job.data.email const oauthTokens = (connector.oauthCredentials as OAuthCredentials).data - const oauth2Client = new google.auth.OAuth2() + + const connectorId = data.connectorId + const res = await db + .select() + .from(connectors) + .where( + and( + eq(connectors.id, connectorId), + eq(connectors.authType, AuthType.OAuth), + ), + ) + .limit(1) + + if (!res.length) { + throw new NoOauthConnectorFound({ + message: `Could not get the oauth connector with id: ${connectorId}`, + }) + } + + const oauthRes: SelectConnector = selectConnectorSchema.parse(res[0]) + + if (!oauthRes.oauthCredentials) { + throw new MissingOauthConnectorCredentialsError({}) + } + + const providers: SelectOAuthProvider[] = await db + .select() + .from(oauthProviders) + .where(eq(oauthProviders.connectorId, oauthRes.id)) + .limit(1) + + if (!providers.length) { + Logger.error("Could not fetch provider while refreshing Google Token") + throw new FetchProviderFailed({ + message: "Could not fetch provider while refreshing Google Token", + }) + } + const [googleProvider] = providers + + const oauth2Client = new google.auth.OAuth2({ + clientId: googleProvider.clientId!, + clientSecret: googleProvider.clientSecret, + redirectUri: `${config.host}/oauth/callback`, + }) setOAuthUser(userEmail) const interval = setInterval(() => { @@ -581,7 +635,10 @@ export const handleGoogleOAuthIngestion = async ( // we have guarantee that when we started this job access Token at least // hand one hour, we should increase this time - oauth2Client.setCredentials({ access_token: oauthTokens.access_token }) + oauth2Client.setCredentials({ + access_token: oauthTokens.access_token, + refresh_token: oauthTokens.refresh_token, + }) const driveClient = google.drive({ version: "v3", auth: oauth2Client }) const { contacts, otherContacts, contactsToken, otherContactsToken } = await listAllContacts(oauth2Client) @@ -701,6 +758,7 @@ type IngestionMetadata = { } import { z } from "zod" +import config from "@/config" const stats = z.object({ type: z.literal(WorkerResponseTypes.Stats), @@ -992,6 +1050,8 @@ export const getPresentationToBeIngested = async ( presentationId: presentation.id!, }), `Fetching presentation with id ${presentation.id}`, + 0, + client, ) const slidesData = presentationData?.data?.slides! let chunks: string[] = [] @@ -1247,6 +1307,7 @@ export const getAllSheetsFromSpreadSheet = async ( sheets: sheets_v4.Sheets, spreadsheet: sheets_v4.Schema$Spreadsheet, spreadsheetId: string, + client: GoogleClient, ) => { const allSheets = [] @@ -1268,6 +1329,8 @@ export const getAllSheetsFromSpreadSheet = async ( valueRenderOption: "FORMATTED_VALUE", }), `Fetching sheets '${ranges.join(", ")}' from spreadsheet`, + 0, + client, ) const valueRanges = response?.data?.valueRanges @@ -1303,11 +1366,14 @@ export const getAllSheetsFromSpreadSheet = async ( export const getSpreadsheet = async ( sheets: sheets_v4.Sheets, id: string, + client: GoogleClient, ): Promise | null> => { try { return retryWithBackoff( () => sheets.spreadsheets.get({ spreadsheetId: id }), `Fetching spreadsheet with ID ${id}`, + 0, + client, ) } catch (error) { if (error instanceof GaxiosError) { @@ -1383,7 +1449,11 @@ export const getSheetsListFromOneSpreadsheet = async ( ): Promise => { const sheetsArr = [] try { - const spreadSheetData = await getSpreadsheet(sheets, spreadsheet.id!) + const spreadSheetData = await getSpreadsheet( + sheets, + spreadsheet.id!, + client, + ) if (spreadSheetData) { // Now we should get all sheets inside this spreadsheet using the spreadSheetData @@ -1391,6 +1461,7 @@ export const getSheetsListFromOneSpreadsheet = async ( sheets, spreadSheetData.data, spreadsheet.id!, + client, ) // There can be multiple parents @@ -1539,6 +1610,7 @@ export const downloadPDF = async ( drive: drive_v3.Drive, fileId: string, fileName: string, + client: GoogleClient, ): Promise => { const filePath = path.join(downloadDir, fileName) const file = Bun.file(filePath) @@ -1550,6 +1622,8 @@ export const downloadPDF = async ( { responseType: "stream" }, ), `Getting PDF content of fileId ${fileId}`, + 0, + client, ) return new Promise((resolve, reject) => { res.data.on("data", (chunk) => { @@ -1606,7 +1680,7 @@ export const googlePDFsVespa = async ( const pdfFileName = `${userEmail}_${pdf.id}_${pdf.name}` const pdfPath = `${downloadDir}/${pdfFileName}` try { - await downloadPDF(drive, pdf.id!, pdfFileName) + await downloadPDF(drive, pdf.id!, pdfFileName, client) const docs: Document[] = await safeLoadPDF(pdfPath) if (!docs || docs.length === 0) { @@ -1755,6 +1829,8 @@ const listAllContacts = async ( requestSyncToken: true, }), `Fetching contacts with pageToken ${pageToken}`, + 0, + client, ) if (response.data.connections) { @@ -1779,6 +1855,8 @@ const listAllContacts = async ( sources: ["READ_SOURCE_TYPE_PROFILE", "READ_SOURCE_TYPE_CONTACT"], }), `Fetching other contacts with pageToken ${pageToken}`, + 0, + client, ) if (response.data.otherContacts) { @@ -1951,6 +2029,8 @@ export async function* listFiles( pageToken: nextPageToken, }), `Fetching all files from Google Drive`, + 0, + client, ) if (res.data.files) { @@ -1990,6 +2070,8 @@ export const googleDocsVespa = async ( documentId: doc.id as string, }), `Fetching document with documentId ${doc.id}`, + 0, + client, ) if (!docResponse || !docResponse.data) { throw new DocsParsingError( diff --git a/server/integrations/google/sync.ts b/server/integrations/google/sync.ts index a349f649..b8523faa 100644 --- a/server/integrations/google/sync.ts +++ b/server/integrations/google/sync.ts @@ -112,7 +112,7 @@ const deleteUpdateStatsForGoogleSheets = async ( const spreadsheetId = docId const sheets = google.sheets({ version: "v4", auth: client }) try { - const spreadsheet = await getSpreadsheet(sheets, spreadsheetId!) + const spreadsheet = await getSpreadsheet(sheets, spreadsheetId!, client) if (spreadsheet) { const totalSheets = spreadsheet?.data?.sheets?.length! @@ -400,6 +400,7 @@ const contactKeys = [ const getDriveChanges = async ( driveClient: drive_v3.Drive, config: GoogleChangeToken, + oauth2Client: GoogleClient, ): Promise<{ changes: drive_v3.Schema$Change[] | undefined newStartPageToken: string | null | undefined @@ -408,6 +409,8 @@ const getDriveChanges = async ( const response = await retryWithBackoff( () => driveClient.changes.list({ pageToken: config.driveToken }), `Fetching drive changes with pageToken ${config.driveToken}`, + 0, + oauth2Client, ) const { changes, newStartPageToken } = response.data return { changes, newStartPageToken } @@ -459,7 +462,11 @@ export const handleGoogleOAuthChanges = async ( const driveClient = google.drive({ version: "v3", auth: oauth2Client }) // TODO: add pagination for all the possible changes - const driveChanges = await getDriveChanges(driveClient, config) + const driveChanges = await getDriveChanges( + driveClient, + config, + oauth2Client, + ) const { changes = [], newStartPageToken = "" } = driveChanges ?? {} // there are changes @@ -508,6 +515,8 @@ export const handleGoogleOAuthChanges = async ( pageToken: nextPageToken, // Use the nextPageToken for pagination }), `Fetching contacts changes with syncToken ${config.contactsToken}`, + 0, + oauth2Client, ) contactsToken = response.data.nextSyncToken ?? contactsToken nextPageToken = response.data.nextPageToken ?? "" @@ -517,6 +526,7 @@ export const handleGoogleOAuthChanges = async ( response.data.connections, user.email, GooglePeopleEntity.Contacts, + oauth2Client, ) stats = mergeStats(stats, changeStats) changesExist = true @@ -549,6 +559,8 @@ export const handleGoogleOAuthChanges = async ( ], }), `Fetching other contacts changes with syncToken ${otherContactsToken}`, + 0, + oauth2Client, ) otherContactsToken = response.data.nextSyncToken ?? otherContactsToken nextPageToken = response.data.nextPageToken ?? "" @@ -666,6 +678,7 @@ export const handleGoogleOAuthChanges = async ( config.historyId, syncJob.id, syncJob.email, + oauth2Client, ) if (changesExist) { @@ -760,6 +773,7 @@ export const handleGoogleOAuthChanges = async ( calendar, config.calendarEventsToken, syncJob.email, + oauth2Client, ) if (changesExist) { @@ -888,6 +902,7 @@ const handleGoogleCalendarEventsChanges = async ( calendar: calendar_v3.Calendar, syncToken: string, userEmail: string, + oauth2Client?: GoogleClient, ) => { let changesExist = false const stats = newStats() @@ -909,6 +924,8 @@ const handleGoogleCalendarEventsChanges = async ( fields: eventFields, }), `Fetching calendar events changes with syncToken ${syncToken}`, + 0, + oauth2Client, ) newSyncTokenCalendarEvents = res.data.nextSyncToken ?? syncToken @@ -1086,6 +1103,7 @@ const handleGmailChanges = async ( historyId: string, syncJobId: number, userEmail: string, + client?: GoogleClient, ): Promise<{ historyId: string stats: ChangeStats @@ -1107,6 +1125,8 @@ const handleGmailChanges = async ( pageToken: nextPageToken, }), `Fetching gmail changes with historyId ${historyId}`, + 0, + client, ) newHistoryId = res.data.historyId ?? historyId @@ -1136,10 +1156,12 @@ const handleGmailChanges = async ( format: "full", }), `Fetching gmail email with id ${message?.id}`, + 0, + client, ) await insert( - await parseMail(msgResp.data, gmail, userEmail), + await parseMail(msgResp.data, gmail, userEmail, client!), mailSchema, ) stats.added += 1 @@ -1265,6 +1287,7 @@ const syncContacts = async ( contacts: people_v1.Schema$Person[], email: string, entity: GooglePeopleEntity, + oauth2Client?: GoogleClient, ): Promise => { const stats = newStats() const connections = contacts || [] // Get contacts from current page @@ -1287,6 +1310,8 @@ const syncContacts = async ( personFields: contactKeys.join(","), }), `Fetching contact with resourceName ${contact.resourceName}`, + 0, + oauth2Client, ) await insertContact(contactResp.data, entity, email) } else if (entity === GooglePeopleEntity.OtherContacts) { diff --git a/server/integrations/google/utils.ts b/server/integrations/google/utils.ts index 3d0da8cd..6894d890 100644 --- a/server/integrations/google/utils.ts +++ b/server/integrations/google/utils.ts @@ -101,6 +101,8 @@ export const getFile = async ( fields, }), `Getting file with fileId ${fileId}`, + 0, + client ) return file?.data @@ -136,6 +138,8 @@ export const getFileContent = async ( documentId: file.id as string, }), `Getting document with documentId ${file.id}`, + 0, + client ) const documentContent: docs_v1.Schema$Document = docResponse.data const rawTextContent = documentContent?.body?.content @@ -198,7 +202,7 @@ export const getPDFContent = async ( return } try { - await downloadPDF(drive, pdfFile.id!, pdfFile.name!) + await downloadPDF(drive, pdfFile.id!, pdfFile.name!, client) const pdfPath = `${downloadDir}/${pdfFile?.name}` let docs: Document[] = [] diff --git a/server/integrations/google/worker-utils.ts b/server/integrations/google/worker-utils.ts index 57aaa06e..a6c30f05 100644 --- a/server/integrations/google/worker-utils.ts +++ b/server/integrations/google/worker-utils.ts @@ -1,4 +1,4 @@ -import { Subsystem } from "@/types" +import { Subsystem, type GoogleClient } from "@/types" import fs from "node:fs/promises" import { getLogger } from "@/logger" import { gmail_v1 } from "googleapis" @@ -44,6 +44,7 @@ export const getGmailAttachmentChunks = async ( filename: string size: number }, + client: GoogleClient, ): Promise => { const { attachmentId, filename, messageId, size } = attachmentMetadata let attachmentChunks: string[] = [] @@ -68,6 +69,8 @@ export const getGmailAttachmentChunks = async ( userId: "me", }), "Fetching Gmail Attachments", + 0, + client, ) await saveGmailAttachment( diff --git a/server/utils.ts b/server/utils.ts index 3b3fa16c..13e5cb36 100644 --- a/server/utils.ts +++ b/server/utils.ts @@ -6,6 +6,7 @@ import { getLogger } from "@/logger" import { Subsystem } from "@/types" import { stopwords as englishStopwords } from "@orama/stopwords/english" import { Apps } from "@/search/types" +import type { OAuth2Client } from "google-auth-library" const Logger = getLogger(Subsystem.Utils) @@ -105,6 +106,7 @@ export const retryWithBackoff = async ( fn: () => Promise, context: string, retries = 0, + oauth2Client?: OAuth2Client, ): Promise => { try { return await fn() // Attempt the function @@ -133,8 +135,12 @@ export const retryWithBackoff = async ( )}ms (Attempt ${retries + 1}/${MAX_RETRIES})`, ) await delay(waitTime) - - return retryWithBackoff(fn, context, retries + 1) // Retry recursively + return retryWithBackoff(fn, context, retries + 1, oauth2Client) // Retry recursively + } else if (error.code === 401 && retries < MAX_RETRIES) { + Logger.info(`401 encountered, refreshing OAuth access token...`) + const { credentials } = await oauth2Client?.refreshAccessToken()! + oauth2Client?.setCredentials(credentials) + return retryWithBackoff(fn, context, retries + 1, oauth2Client) } else { Logger.error( `[${context}] Failed after ${retries} retries: ${error.message}`, From 8837b85f5328daf7ba67e8b72c1ad5063eeb1042 Mon Sep 17 00:00:00 2001 From: kalpadhwaryu Date: Tue, 18 Mar 2025 19:27:39 +0530 Subject: [PATCH 2/6] Add const for Job Expiry Hours --- server/api/oauth.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/api/oauth.ts b/server/api/oauth.ts index 0cec6f7c..cbdaba0d 100644 --- a/server/api/oauth.ts +++ b/server/api/oauth.ts @@ -16,6 +16,8 @@ const { JwtPayloadKey } = config const Logger = getLogger(Subsystem.Api).child({ module: "oauth" }) +const JOB_EXPIRY_HOURS = 23 + interface OAuthCallbackQuery { state: string code: string @@ -73,7 +75,7 @@ export const OAuthCallback = async (c: Context) => { } // Enqueue the background job within the same transaction const jobId = await boss.send(SaaSQueue, SaasJobPayload, { - expireInHours: 23, + expireInHours: JOB_EXPIRY_HOURS, }) Logger.info(`Job ${jobId} enqueued for connection ${connector.id}`) From cf5e434e350d1dd765d914396384fa10255f9d09 Mon Sep 17 00:00:00 2001 From: kalpadhwaryu Date: Wed, 19 Mar 2025 19:01:02 +0530 Subject: [PATCH 3/6] Make retryWithBackoff generic which accepts app as param --- server/integrations/google/gmail-worker.ts | 3 +++ server/integrations/google/gmail/index.ts | 3 +++ server/integrations/google/index.ts | 10 ++++++++++ server/integrations/google/sync.ts | 10 ++++++++++ server/integrations/google/utils.ts | 6 ++++-- server/integrations/google/worker-utils.ts | 3 ++- server/utils.ts | 23 ++++++++++++++++------ 7 files changed, 49 insertions(+), 9 deletions(-) diff --git a/server/integrations/google/gmail-worker.ts b/server/integrations/google/gmail-worker.ts index 7df7177f..52fed2d1 100644 --- a/server/integrations/google/gmail-worker.ts +++ b/server/integrations/google/gmail-worker.ts @@ -110,6 +110,7 @@ export const handleGmailIngestion = async ( const profile = await retryWithBackoff( () => gmail.users.getProfile({ userId: "me" }), "Fetching Gmail user profile", + Apps.Gmail, 0, client, ) @@ -129,6 +130,7 @@ export const handleGmailIngestion = async ( fields: "messages(id), nextPageToken", }), `Fetching Gmail messages list (pageToken: ${nextPageToken})`, + Apps.Gmail, 0, client, ) @@ -148,6 +150,7 @@ export const handleGmailIngestion = async ( format: "full", }), `Fetching Gmail message (id: ${message.id})`, + Apps.Gmail, 0, client, ) diff --git a/server/integrations/google/gmail/index.ts b/server/integrations/google/gmail/index.ts index 0b3becfb..f6d45ab4 100644 --- a/server/integrations/google/gmail/index.ts +++ b/server/integrations/google/gmail/index.ts @@ -46,6 +46,7 @@ export const handleGmailIngestion = async ( const profile = await retryWithBackoff( () => gmail.users.getProfile({ userId: "me" }), "Fetching Gmail user profile", + Apps.Gmail, 0, client, ) @@ -65,6 +66,7 @@ export const handleGmailIngestion = async ( fields: "messages(id), nextPageToken", }), `Fetching Gmail messages list (pageToken: ${nextPageToken})`, + Apps.Gmail, 0, client, ) @@ -84,6 +86,7 @@ export const handleGmailIngestion = async ( format: "full", }), `Fetching Gmail message (id: ${message.id})`, + Apps.Gmail, 0, client, ) diff --git a/server/integrations/google/index.ts b/server/integrations/google/index.ts index 93358cfa..acaf86a0 100644 --- a/server/integrations/google/index.ts +++ b/server/integrations/google/index.ts @@ -138,6 +138,7 @@ const listUsers = async ( ...(nextPageToken! ? { pageToken: nextPageToken } : {}), }), `Fetching all users`, + Apps.GoogleDrive, ) if (res.data.users) { users = users.concat(res.data.users) @@ -446,6 +447,7 @@ const insertCalendarEvents = async ( fields: eventFields, }), `Fetching all calendar events`, + Apps.GoogleCalendar, 0, client, ) @@ -1050,6 +1052,7 @@ export const getPresentationToBeIngested = async ( presentationId: presentation.id!, }), `Fetching presentation with id ${presentation.id}`, + Apps.GoogleDrive, 0, client, ) @@ -1329,6 +1332,7 @@ export const getAllSheetsFromSpreadSheet = async ( valueRenderOption: "FORMATTED_VALUE", }), `Fetching sheets '${ranges.join(", ")}' from spreadsheet`, + Apps.GoogleDrive, 0, client, ) @@ -1372,6 +1376,7 @@ export const getSpreadsheet = async ( return retryWithBackoff( () => sheets.spreadsheets.get({ spreadsheetId: id }), `Fetching spreadsheet with ID ${id}`, + Apps.GoogleDrive, 0, client, ) @@ -1622,6 +1627,7 @@ export const downloadPDF = async ( { responseType: "stream" }, ), `Getting PDF content of fileId ${fileId}`, + Apps.GoogleDrive, 0, client, ) @@ -1829,6 +1835,7 @@ const listAllContacts = async ( requestSyncToken: true, }), `Fetching contacts with pageToken ${pageToken}`, + Apps.GoogleDrive, 0, client, ) @@ -1855,6 +1862,7 @@ const listAllContacts = async ( sources: ["READ_SOURCE_TYPE_PROFILE", "READ_SOURCE_TYPE_CONTACT"], }), `Fetching other contacts with pageToken ${pageToken}`, + Apps.GoogleDrive, 0, client, ) @@ -2029,6 +2037,7 @@ export async function* listFiles( pageToken: nextPageToken, }), `Fetching all files from Google Drive`, + Apps.GoogleDrive, 0, client, ) @@ -2070,6 +2079,7 @@ export const googleDocsVespa = async ( documentId: doc.id as string, }), `Fetching document with documentId ${doc.id}`, + Apps.GoogleDrive, 0, client, ) diff --git a/server/integrations/google/sync.ts b/server/integrations/google/sync.ts index b8523faa..f68dbacf 100644 --- a/server/integrations/google/sync.ts +++ b/server/integrations/google/sync.ts @@ -409,6 +409,7 @@ const getDriveChanges = async ( const response = await retryWithBackoff( () => driveClient.changes.list({ pageToken: config.driveToken }), `Fetching drive changes with pageToken ${config.driveToken}`, + Apps.GoogleDrive, 0, oauth2Client, ) @@ -515,6 +516,7 @@ export const handleGoogleOAuthChanges = async ( pageToken: nextPageToken, // Use the nextPageToken for pagination }), `Fetching contacts changes with syncToken ${config.contactsToken}`, + Apps.GoogleDrive, 0, oauth2Client, ) @@ -559,6 +561,7 @@ export const handleGoogleOAuthChanges = async ( ], }), `Fetching other contacts changes with syncToken ${otherContactsToken}`, + Apps.GoogleDrive, 0, oauth2Client, ) @@ -924,6 +927,7 @@ const handleGoogleCalendarEventsChanges = async ( fields: eventFields, }), `Fetching calendar events changes with syncToken ${syncToken}`, + Apps.GoogleCalendar, 0, oauth2Client, ) @@ -1125,6 +1129,7 @@ const handleGmailChanges = async ( pageToken: nextPageToken, }), `Fetching gmail changes with historyId ${historyId}`, + Apps.Gmail, 0, client, ) @@ -1156,6 +1161,7 @@ const handleGmailChanges = async ( format: "full", }), `Fetching gmail email with id ${message?.id}`, + Apps.Gmail, 0, client, ) @@ -1310,6 +1316,7 @@ const syncContacts = async ( personFields: contactKeys.join(","), }), `Fetching contact with resourceName ${contact.resourceName}`, + Apps.GoogleDrive, 0, oauth2Client, ) @@ -1361,6 +1368,7 @@ export const handleGoogleServiceAccountChanges = async ( await retryWithBackoff( () => driveClient.changes.list({ pageToken: config.driveToken }), `Fetching drive changes with pageToken ${config.driveToken}`, + Apps.GoogleDrive, ) ).data // there are changes @@ -1413,6 +1421,7 @@ export const handleGoogleServiceAccountChanges = async ( pageToken: nextPageToken, // Use the nextPageToken for pagination }), `Fetching contacts changes with syncToken ${config.contactsToken}`, + Apps.GoogleDrive, ) if (response.data.connections && response.data.connections.length) { Logger.info( @@ -1465,6 +1474,7 @@ export const handleGoogleServiceAccountChanges = async ( ], }), `Fetching other contacts changes with syncToken ${otherContactsToken}`, + Apps.GoogleDrive, ) otherContactsToken = response.data.nextSyncToken ?? otherContactsToken if ( diff --git a/server/integrations/google/utils.ts b/server/integrations/google/utils.ts index 6894d890..bef5cb4e 100644 --- a/server/integrations/google/utils.ts +++ b/server/integrations/google/utils.ts @@ -101,8 +101,9 @@ export const getFile = async ( fields, }), `Getting file with fileId ${fileId}`, + Apps.GoogleDrive, 0, - client + client, ) return file?.data @@ -138,8 +139,9 @@ export const getFileContent = async ( documentId: file.id as string, }), `Getting document with documentId ${file.id}`, + Apps.GoogleDrive, 0, - client + client, ) const documentContent: docs_v1.Schema$Document = docResponse.data const rawTextContent = documentContent?.body?.content diff --git a/server/integrations/google/worker-utils.ts b/server/integrations/google/worker-utils.ts index a6c30f05..653c86db 100644 --- a/server/integrations/google/worker-utils.ts +++ b/server/integrations/google/worker-utils.ts @@ -10,7 +10,7 @@ import { } from "@/integrations/google/pdf-utils" import { retryWithBackoff } from "@/utils" import { chunkDocument } from "@/chunks" -import type { Attachment } from "@/search/types" +import { Apps, type Attachment } from "@/search/types" import { MAX_ATTACHMENT_PDF_SIZE } from "@/integrations/google/config" import path from "node:path" const Logger = getLogger(Subsystem.Integrations).child({ module: "google" }) @@ -69,6 +69,7 @@ export const getGmailAttachmentChunks = async ( userId: "me", }), "Fetching Gmail Attachments", + Apps.Gmail, 0, client, ) diff --git a/server/utils.ts b/server/utils.ts index 13e5cb36..278fd8d6 100644 --- a/server/utils.ts +++ b/server/utils.ts @@ -105,8 +105,9 @@ const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) export const retryWithBackoff = async ( fn: () => Promise, context: string, + app: Apps, retries = 0, - oauth2Client?: OAuth2Client, + googleOauth2Client?: OAuth2Client, ): Promise => { try { return await fn() // Attempt the function @@ -135,12 +136,22 @@ export const retryWithBackoff = async ( )}ms (Attempt ${retries + 1}/${MAX_RETRIES})`, ) await delay(waitTime) - return retryWithBackoff(fn, context, retries + 1, oauth2Client) // Retry recursively + return retryWithBackoff(fn, context, app, retries + 1, googleOauth2Client) // Retry recursively } else if (error.code === 401 && retries < MAX_RETRIES) { - Logger.info(`401 encountered, refreshing OAuth access token...`) - const { credentials } = await oauth2Client?.refreshAccessToken()! - oauth2Client?.setCredentials(credentials) - return retryWithBackoff(fn, context, retries + 1, oauth2Client) + if (IsGoogleApp(app)) { + Logger.info(`401 encountered, refreshing OAuth access token...`) + const { credentials } = await googleOauth2Client?.refreshAccessToken()! + googleOauth2Client?.setCredentials(credentials) + return retryWithBackoff( + fn, + context, + app, + retries + 1, + googleOauth2Client, + ) + } else { + throw new Error("401 error for unsupported app") + } } else { Logger.error( `[${context}] Failed after ${retries} retries: ${error.message}`, From c65c3ef035ffccb9c8360a87a95b067890fb50ad Mon Sep 17 00:00:00 2001 From: kalpadhwaryu Date: Thu, 20 Mar 2025 11:02:31 +0530 Subject: [PATCH 4/6] Use exisiting fns to get connector & provider --- server/db/connector.ts | 8 +++--- server/db/oauthProvider.ts | 16 ++++++++++++ server/integrations/google/index.ts | 38 +++-------------------------- 3 files changed, 22 insertions(+), 40 deletions(-) diff --git a/server/db/connector.ts b/server/db/connector.ts index b88ce90d..7d684f58 100644 --- a/server/db/connector.ts +++ b/server/db/connector.ts @@ -23,6 +23,7 @@ import { UpdateConnectorFailed, } from "@/errors" import { IsGoogleApp } from "@/utils" +import { getOAuthProviderByConnectorId } from "@/db/oauthProvider" const Logger = getLogger(Subsystem.Db).child({ module: "connector" }) export const insertConnector = async ( @@ -169,11 +170,8 @@ export const getOAuthConnectorWithCredentials = async ( // update it in place if (IsGoogleApp(oauthRes.app)) { // we will need the provider now to refresh the token - const providers: SelectOAuthProvider[] = await trx - .select() - .from(oauthProviders) - .where(eq(oauthProviders.connectorId, oauthRes.id)) - .limit(1) + const providers: SelectOAuthProvider[] = + await getOAuthProviderByConnectorId(trx, connectorId) if (!providers.length) { Logger.error("Could not fetch provider while refreshing Google Token") diff --git a/server/db/oauthProvider.ts b/server/db/oauthProvider.ts index 028c2a90..512a117a 100644 --- a/server/db/oauthProvider.ts +++ b/server/db/oauthProvider.ts @@ -48,3 +48,19 @@ export const getOAuthProvider = async ( throw new Error("Could not get the connector") } } + +export const getOAuthProviderByConnectorId = async ( + trx: TxnOrClient, + connectorId: number, +): Promise => { + const res = await trx + .select() + .from(oauthProviders) + .where(eq(oauthProviders.connectorId, connectorId)) + .limit(1) + if (res.length) { + return res + } else { + throw new Error("Could not get the provider") + } +} diff --git a/server/integrations/google/index.ts b/server/integrations/google/index.ts index acaf86a0..dc9b0ce5 100644 --- a/server/integrations/google/index.ts +++ b/server/integrations/google/index.ts @@ -113,6 +113,7 @@ import { StatType, updateUserStats, } from "./tracking" +import { getOAuthProviderByConnectorId } from "@/db/oauthProvider" const htmlToText = require("html-to-text") const Logger = getLogger(Subsystem.Integrations).child({ module: "google" }) @@ -580,42 +581,9 @@ export const handleGoogleOAuthIngestion = async ( const userEmail = job.data.email const oauthTokens = (connector.oauthCredentials as OAuthCredentials).data - const connectorId = data.connectorId - const res = await db - .select() - .from(connectors) - .where( - and( - eq(connectors.id, connectorId), - eq(connectors.authType, AuthType.OAuth), - ), - ) - .limit(1) - - if (!res.length) { - throw new NoOauthConnectorFound({ - message: `Could not get the oauth connector with id: ${connectorId}`, - }) - } - - const oauthRes: SelectConnector = selectConnectorSchema.parse(res[0]) + const providers: SelectOAuthProvider[] = + await getOAuthProviderByConnectorId(db, data.connectorId) - if (!oauthRes.oauthCredentials) { - throw new MissingOauthConnectorCredentialsError({}) - } - - const providers: SelectOAuthProvider[] = await db - .select() - .from(oauthProviders) - .where(eq(oauthProviders.connectorId, oauthRes.id)) - .limit(1) - - if (!providers.length) { - Logger.error("Could not fetch provider while refreshing Google Token") - throw new FetchProviderFailed({ - message: "Could not fetch provider while refreshing Google Token", - }) - } const [googleProvider] = providers const oauth2Client = new google.auth.OAuth2({ From fe613ce0b6595d8632ca576115158261c82f798f Mon Sep 17 00:00:00 2001 From: kalpadhwaryu Date: Thu, 20 Mar 2025 11:14:45 +0530 Subject: [PATCH 5/6] Remove unwanted code --- server/integrations/google/index.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/server/integrations/google/index.ts b/server/integrations/google/index.ts index dc9b0ce5..fafbf2e7 100644 --- a/server/integrations/google/index.ts +++ b/server/integrations/google/index.ts @@ -40,12 +40,10 @@ import type { WSContext } from "hono/ws" import { db } from "@/db/client" import { connectors, - oauthProviders, - selectConnectorSchema, type SelectConnector, type SelectOAuthProvider, } from "@/db/schema" -import { and, eq } from "drizzle-orm" +import { eq } from "drizzle-orm" import { getWorkspaceById } from "@/db/workspace" import { Apps, @@ -83,9 +81,6 @@ import { DeleteDocumentError, DownloadDocumentError, CalendarEventsListingError, - NoOauthConnectorFound, - MissingOauthConnectorCredentialsError, - FetchProviderFailed, } from "@/errors" import fs, { existsSync, mkdirSync } from "node:fs" import path, { join } from "node:path" From e4d9594650fa05d2e3025866c1844ae25d2269d5 Mon Sep 17 00:00:00 2001 From: kalpadhwaryu Date: Fri, 21 Mar 2025 18:58:12 +0530 Subject: [PATCH 6/6] Remove job expiry const --- server/api/oauth.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/api/oauth.ts b/server/api/oauth.ts index cbdaba0d..b35e861d 100644 --- a/server/api/oauth.ts +++ b/server/api/oauth.ts @@ -16,8 +16,6 @@ const { JwtPayloadKey } = config const Logger = getLogger(Subsystem.Api).child({ module: "oauth" }) -const JOB_EXPIRY_HOURS = 23 - interface OAuthCallbackQuery { state: string code: string @@ -74,9 +72,7 @@ export const OAuthCallback = async (c: Context) => { email: sub, } // Enqueue the background job within the same transaction - const jobId = await boss.send(SaaSQueue, SaasJobPayload, { - expireInHours: JOB_EXPIRY_HOURS, - }) + const jobId = await boss.send(SaaSQueue, SaasJobPayload) Logger.info(`Job ${jobId} enqueued for connection ${connector.id}`)