diff --git a/api/cache.go b/api/cache.go index a9204c9e..311f89ed 100644 --- a/api/cache.go +++ b/api/cache.go @@ -16,8 +16,6 @@ type TempCache struct { ctx context.Context items map[string]models.ConfigItem aliases map[string]string - - changes map[string]struct{} } func (t TempCache) FindExternal(ext v1.ExternalID) (*models.ConfigItem, error) { @@ -78,38 +76,6 @@ func (t TempCache) Insert(item models.ConfigItem) { t.items[strings.ToLower(item.ID)] = item } -func (t TempCache) IsChangePersisted(configID, externalChangeID string) (bool, error) { - if configID == "" || externalChangeID == "" { - return false, nil - } - - configID = strings.ToLower(configID) - externalChangeID = strings.ToLower(externalChangeID) - - if t.changes == nil { - t.changes = make(map[string]struct{}) - } - - if _, ok := t.changes[configID+externalChangeID]; ok { - return true, nil - } - - var result models.ConfigChange - if err := t.ctx.DB().Select("id").Where("config_id = ?", configID). - Where("external_change_id = ?", externalChangeID). - Limit(1). - Find(&result).Error; err != nil { - return false, err - } - - if result.ID != "" { - t.changes[configID+externalChangeID] = struct{}{} - return true, nil - } - - return false, nil -} - func (t TempCache) Get(id string) (*models.ConfigItem, error) { id = strings.ToLower(id) if id == "" { diff --git a/db/changes.go b/db/changes.go index d097d9d2..c4a954cf 100644 --- a/db/changes.go +++ b/db/changes.go @@ -5,6 +5,8 @@ import ( "time" sw "github.com/RussellLuo/slidingwindow" + "github.com/patrickmn/go-cache" + "github.com/samber/lo" "github.com/flanksource/commons/collections" "github.com/flanksource/config-db/api" @@ -19,6 +21,8 @@ const ( ChangeTypeTooManyChanges = "TooManyChanges" ) +var configChangesCache = cache.New(time.Hour*24, time.Hour*24) + func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) { var count int64 err := ctx.DB().Table("config_changes"). @@ -122,6 +126,7 @@ func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration if err != nil { return nil, err } + defer rows.Close() output := make(map[string]struct{}) for rows.Next() { @@ -156,6 +161,7 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { if err != nil { return err } + defer rows.Close() for rows.Next() { var configID string @@ -179,3 +185,54 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { return rows.Err() } + +// filterOutPersistedChanges returns only those changes that weren't seen in the db. +func filterOutPersistedChanges(ctx api.ScrapeContext, changes []*models.ConfigChange) ([]*models.ConfigChange, error) { + // use cache to filter out ones that we've already seen before + changes = lo.Filter(changes, func(c *models.ConfigChange, _ int) bool { + _, found := configChangesCache.Get(c.ConfigID + c.ExternalChangeId) + if found { + _ = found + } + return !found + }) + + if len(changes) == 0 { + return nil, nil + } + + query := `SELECT config_id, external_change_id + FROM config_changes + WHERE (config_id, external_change_id) IN ?` + args := lo.Map(changes, func(c *models.ConfigChange, _ int) []string { + return []string{c.ConfigID, c.ExternalChangeId} + }) + + rows, err := ctx.DB().Raw(query, args).Rows() + if err != nil { + return nil, err + } + defer rows.Close() + + existing := make(map[string]struct{}) + for rows.Next() { + var configID, externalChangeID string + if err := rows.Scan(&configID, &externalChangeID); err != nil { + return nil, err + } + + configChangesCache.SetDefault(configID+externalChangeID, struct{}{}) + existing[configID+externalChangeID] = struct{}{} + } + + newOnes := lo.Filter(changes, func(c *models.ConfigChange, _ int) bool { + _, found := existing[c.ConfigID+c.ExternalChangeId] + return !found + }) + + if len(newOnes) > 0 { + _ = query + } + + return newOnes, nil +} diff --git a/db/update.go b/db/update.go index 5af25d87..b8b8cbce 100644 --- a/db/update.go +++ b/db/update.go @@ -216,8 +216,8 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) { var ( - newOnes = []*models.ConfigChange{} - updates = []*models.ConfigChange{} + toInsert = []*models.ConfigChange{} + toUpdate = []*models.ConfigChange{} ) changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) @@ -274,17 +274,19 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C } if changeResult.UpdateExisting { - updates = append(updates, change) + toUpdate = append(toUpdate, change) } else { - if ok, err := ctx.TempCache().IsChangePersisted(change.ConfigID, change.ExternalChangeId); err != nil { - return nil, nil, fmt.Errorf("failed to check if change is persisted: %w", err) - } else if !ok { - newOnes = append(newOnes, change) - } + toInsert = append(toInsert, change) } } - return newOnes, updates, nil + // Remove the changes that have already been inserted. + newOnes, err := filterOutPersistedChanges(ctx, toInsert) + if err != nil { + return nil, nil, err + } + + return newOnes, toUpdate, nil } func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error {