Skip to content

Commit

Permalink
refactor: refactoring google_drive connector, support token_refresh, … (
Browse files Browse the repository at this point in the history
#61)

* refactor: refactoring google_drive connector, support token_refresh, handle categories

* chore: remove unncessary code

* chore: save datasource with new refres token

* chore: remove unused field

* chore: fix refresh token update
  • Loading branch information
medcl authored Jan 20, 2025
1 parent f389302 commit edf3055
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 45 deletions.
21 changes: 8 additions & 13 deletions docs/content.en/docs/references/connectors/google_drive.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ To use the Google Drive Connector, follow these steps to obtain your token:

## Obtain Google Drive credentials

1. Set the **Authorized Redirect URIs** as shown in the following screenshot:
1. Create your own Google OAuth App, set the **Data Access** as shown in the following screenshot:

{{% load-img "/img/google_drive_scopes.png" "Create a APP" %}}

2. Set the **Authorized Redirect URIs** as shown in the following screenshot:

{{% load-img "/img/google_drive_token.jpg" "Authorized Redirect URIs" %}}

2. The Google Drive connector uses `/connector/google_drive/oauth_redirect` as the callback URL to receive authorization responses.
3. The Google Drive connector uses `/connector/google_drive/oauth_redirect` as the callback URL to receive authorization responses.

3. Once the token is successfully obtained, download the `credentials.json` file.
4. Once the token is successfully obtained, download the `credentials.json` file.

{{% load-img "/img/download_google_drive_token.png" "credentials.json" %}}

Expand All @@ -79,16 +83,7 @@ connector:
enabled: true
queue:
name: indexing_documents
# credential_file: credentials.json
credential:
"client_id": "YOUR_XXX.apps.googleusercontent.com"
"project_id": "infini-YOUR_XXX"
"auth_uri": "https://accounts.google.com/o/oauth2/auth"
"token_uri": "https://oauth2.googleapis.com/token"
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs"
"client_secret": "YOUR_XXX-YOUR_XXX"
"redirect_uris": "http://localhost:9000/connector/google_drive/oauth_redirect"
"javascript_origins": [ "http://localhost:9000" ]
credential_file: credentials.json
interval: 10s
skip_invalid_token: true
```
Expand Down
Binary file added docs/static/img/google_drive_scopes.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions modules/common/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ type DataSource struct {
}

type ConnectorConfig struct {
ConnectorID string `json:"id,omitempty" elastic_mapping:"id:{type:keyword}"` // Connector ID for the datasource
Config map[string]interface{} `json:"config,omitempty" elastic_mapping:"config:{enabled:false}"` // Configs for this Connector
ConnectorID string `json:"id,omitempty" elastic_mapping:"id:{type:keyword}"` // Connector ID for the datasource
Config interface{} `json:"config,omitempty" elastic_mapping:"config:{enabled:false}"` // Configs for this Connector
}
31 changes: 20 additions & 11 deletions modules/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@ type SearchResponse struct {

func (h APIHandler) search(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
query = h.GetParameterOrDefault(req, "query", "")
from = h.GetIntOrDefault(req, "from", 0)
size = h.GetIntOrDefault(req, "size", 10)
datasource = h.GetParameterOrDefault(req, "datasource", "")
category = h.GetParameterOrDefault(req, "category", "")
username = h.GetParameterOrDefault(req, "username", "")
userid = h.GetParameterOrDefault(req, "userid", "")
tags = h.GetParameterOrDefault(req, "tags", "")
subcategory = h.GetParameterOrDefault(req, "subcategory", "")
field = h.GetParameterOrDefault(req, "search_field", "title")
source = h.GetParameterOrDefault(req, "source_fields", "*")
query = h.GetParameterOrDefault(req, "query", "")
from = h.GetIntOrDefault(req, "from", 0)
size = h.GetIntOrDefault(req, "size", 10)
datasource = h.GetParameterOrDefault(req, "datasource", "")
category = h.GetParameterOrDefault(req, "category", "")
username = h.GetParameterOrDefault(req, "username", "")
userid = h.GetParameterOrDefault(req, "userid", "")
tags = h.GetParameterOrDefault(req, "tags", "")
subcategory = h.GetParameterOrDefault(req, "subcategory", "")
richCategory = h.GetParameterOrDefault(req, "rich_category", "")
field = h.GetParameterOrDefault(req, "search_field", "title")
source = h.GetParameterOrDefault(req, "source_fields", "*")
)

mustClauses := []interface{}{}
Expand Down Expand Up @@ -111,6 +112,14 @@ func (h APIHandler) search(w http.ResponseWriter, req *http.Request, ps httprout
})
}

if richCategory != "" {
mustClauses = append(mustClauses, map[string]interface{}{
"term": map[string]interface{}{
"rich_categories.key": richCategory,
},
})
}

if username != "" {
mustClauses = append(mustClauses, map[string]interface{}{
"term": map[string]interface{}{
Expand Down
62 changes: 59 additions & 3 deletions plugins/connectors/google_drive/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package google_drive

import (
"encoding/base64"
"encoding/json"
"fmt"
log "github.com/cihub/seelog"
"golang.org/x/oauth2"
"infini.sh/coco/modules/common"
httprouter "infini.sh/framework/core/api/router"
Expand All @@ -17,6 +19,7 @@ import (
"net/http"
"net/url"
"strings"
"time"
)

func encodeState(args map[string]string) string {
Expand Down Expand Up @@ -62,6 +65,21 @@ func (h *Plugin) connect(w http.ResponseWriter, req *http.Request, _ httprouter.

// Generate OAuth URL
authURL := h.oAuthConfig.AuthCodeURL(state, oauth2.AccessTypeOffline)

// Parse the generated URL to append additional parameters
parsedURL, err := url.Parse(authURL)
if err != nil {
panic("Failed to parse auth URL")
}

// Add the `approval_prompt=force` parameter to ensure the refresh token is returned
query := parsedURL.Query()
query.Set("approval_prompt", "force")
parsedURL.RawQuery = query.Encode()

// Return the updated URL with the necessary parameters
authURL = parsedURL.String()

http.Redirect(w, req, authURL, http.StatusFound)
}

Expand Down Expand Up @@ -97,17 +115,55 @@ func (h *Plugin) oAuthRedirect(w http.ResponseWriter, req *http.Request, _ httpr
panic(err)
}

// Retrieve user info from Google
client := h.oAuthConfig.Client(req.Context(), token)
resp, err := client.Get("https://www.googleapis.com/oauth2/v3/userinfo")
if err != nil {
panic(fmt.Errorf("failed to fetch user info: %w", err))
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
panic(fmt.Errorf("unexpected status code fetching user info: %d", resp.StatusCode))
}

// Parse the user info
var userInfo struct {
Sub string `json:"sub"` // Unique Google user ID
Email string `json:"email"` // User's email address
VerifiedEmail bool `json:"email_verified"` // Whether email is verified
Name string `json:"name"` // User's full name
Picture string `json:"picture"` // User's profile picture URL
}
if err := json.NewDecoder(resp.Body).Decode(&userInfo); err != nil {
panic(fmt.Errorf("failed to parse user info: %w", err))
}
// Use the unique user ID (userInfo.Sub) or email (userInfo.Email) as the unique identifier
log.Infof("google drive authenticated user: ID=%s, Email=%s", userInfo.Sub, userInfo.Email)

datasource := common.DataSource{}
datasource.ID = util.GetUUID() //TODO routing to single task, if connect multi-times
datasource.ID = util.MD5digest(fmt.Sprintf("%v,%v,%v", "google_drive", userInfo.Sub, userInfo.Email))
datasource.Type = "connector"
datasource.Name = "My Google Drive" //TODO, input from user
if userInfo.Name != "" {
datasource.Name = userInfo.Name + "'s Google Drive"
} else {
datasource.Name = "My Google Drive"
}
datasource.Connector = common.ConnectorConfig{
ConnectorID: "google_drive",
Config: util.MapStr{
"token": util.MustToJSON(token),
"access_token": token.AccessToken, // Store access token
"refresh_token": token.RefreshToken, // Store refresh token
"token_expiry": token.Expiry.Format(time.RFC3339), // Format using RFC3339
"profile": userInfo,
},
}

// Check if refresh token is missing or empty
if token.RefreshToken == "" {
log.Warnf("refresh token was not granted for: %v", datasource.Name)
}

err = orm.Save(nil, &datasource)
if err != nil {
panic(err)
Expand Down
79 changes: 77 additions & 2 deletions plugins/connectors/google_drive/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ func getIcon(fileType string) string {
}
}

// Function to get the root folder ID
func getRootFolderID(srv *drive.Service) (string, string) {
rootFolder, err := srv.Files.Get("root").Fields("id, name").Do()
if err != nil {
panic(errors.Errorf("Unable to get root folder ID: %v", err))
}
return rootFolder.Id, rootFolder.Name
}

func (this *Plugin) startIndexingFiles(connector *common.Connector, datasource *common.DataSource, tenantID, userID string, tok *oauth2.Token) {
var filesProcessed = 0
defer func() {
Expand Down Expand Up @@ -93,6 +102,47 @@ func (this *Plugin) startIndexingFiles(connector *common.Connector, datasource *
panic(err)
}

// All directories
var directoryMap = map[string]common.RichLabel{}

// Root Folder
rootFolderID, rootFolderName := getRootFolderID(srv)
directoryMap[rootFolderID] = common.RichLabel{Key: rootFolderID, Label: rootFolderName, Icon: "folder"}

// Fetch all directories
var nextPageToken string
for {
if global.ShuttingDown() {
break
}

call := srv.Files.List().
PageSize(int64(this.PageSize)).
OrderBy("name").
Q("mimeType='application/vnd.google-apps.folder' and trashed=false").
Fields("nextPageToken, files(id, name, parents)")

r, err := call.PageToken(nextPageToken).Do()
if err != nil {
panic(errors.Errorf("Failed to fetch directories: %v", err))
}

// Save directories in the map
for _, i := range r.Files {
//TODO, should save to store in case there are so many crazy directories, OOM risk
directoryMap[i.Id] = common.RichLabel{Key: i.Id, Label: i.Name, Icon: "folder"}
log.Debugf("google drive directory: ID=%s, Name=%s, Parents=%v", i.Id, i.Name, i.Parents)
}

nextPageToken = r.NextPageToken
if nextPageToken == "" {
break
}
}

log.Infof("fetched %d google drive directories", len(directoryMap))

// Fetch all files
var query string

//get last access time from kv
Expand All @@ -113,8 +163,12 @@ func (this *Plugin) startIndexingFiles(connector *common.Connector, datasource *
var lastModifyTime *time.Time

// Start pagination loop
var nextPageToken string
nextPageToken = ""
for {
if global.ShuttingDown() {
break
}

call := srv.Files.List().PageSize(int64(this.PageSize)).OrderBy("modifiedTime asc")

if query != "" {
Expand Down Expand Up @@ -154,7 +208,24 @@ func (this *Plugin) startIndexingFiles(connector *common.Connector, datasource *
}
}

log.Infof("File: %s (ID: %s) | CreatedAt: %s | UpdatedAt: %s", i.Name, i.Id, createdAt, updatedAt)
if i.MimeType == "application/vnd.google-apps.folder" {
directoryMap[i.Id] = common.RichLabel{Key: i.Id, Label: i.Name, Icon: "folder"}
}
categories := []common.RichLabel{}
if len(i.Parents) > 0 {
for _, v := range i.Parents {
folderName, ok := directoryMap[v]
if ok {
//log.Debugf("folder: %v, %v", folderName, v)
categories = append(categories, folderName)
} else {
log.Errorf("missing folder info: %v", v)
//TODO, if the parent_id is not found, delay to handle this file, maybe newly added file, and the folder meta is aware
}
}
}

log.Debugf("Google Drive File: %s (ID: %s) | CreatedAt: %s | UpdatedAt: %s", i.Name, i.Id, createdAt, updatedAt)

// Map Google Drive file to Document struct
document := common.Document{
Expand All @@ -177,6 +248,10 @@ func (this *Plugin) startIndexingFiles(connector *common.Connector, datasource *
Thumbnail: i.ThumbnailLink,
}

if len(categories) > 0 {
document.RichCategories = categories
}

document.ID = i.Id //add tenant namespace and then hash
document.Created = createdAt
document.Updated = updatedAt
Expand Down
Loading

0 comments on commit edf3055

Please sign in to comment.