Skip to content

Commit

Permalink
Row count is incorrect when using aggregator connections. Closes #402
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre authored Dec 22, 2023
1 parent 778b52f commit fbbf0cc
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 36 deletions.
8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/steampipe-postgres-fdw.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 17 additions & 19 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,26 +195,24 @@ func (h *Hub) AddScanMetadata(iter Iterator) {
return
}

// get list of scan metadata from iterator (may be more than 1 for group_iterator)
scanMetadata := iter.GetScanMetadata()
for _, m := range scanMetadata {
// set ID
m.Id = id
id++
log.Printf("[TRACE] got metadata table: %s cache hit: %v, rows fetched %d, hydrate calls: %d",
m.Table, m.CacheHit, m.RowsFetched, m.HydrateCalls)
// read the scan metadata from the iterator and add to our stack
h.scanMetadata = append(h.scanMetadata, m)

// hydrate metric labels
labels := []attribute.KeyValue{
attribute.String("table", m.Table),
attribute.String("connection", connectionName),
attribute.String("plugin", connectionPlugin.PluginName),
}
log.Printf("[TRACE] update hydrate calls counter with %d", m.HydrateCalls)
h.hydrateCallsCounter.Add(ctx, m.HydrateCalls, metric.WithAttributes(labels...))
// get scan metadata from iterator
m := iter.GetScanMetadata()

// set ID
m.Id = id
log.Printf("[TRACE] got metadata table: %s cache hit: %v, rows fetched %d, hydrate calls: %d",
m.Table, m.CacheHit, m.RowsFetched, m.HydrateCalls)
// read the scan metadata from the iterator and add to our stack
h.scanMetadata = append(h.scanMetadata, m)

// hydrate metric labels
labels := []attribute.KeyValue{
attribute.String("table", m.Table),
attribute.String("connection", connectionName),
attribute.String("plugin", connectionPlugin.PluginName),
}
log.Printf("[TRACE] update hydrate calls counter with %d", m.HydrateCalls)
h.hydrateCallsCounter.Add(ctx, m.HydrateCalls, metric.WithAttributes(labels...))

// now trim scan metadata - max 1000 items
const maxMetadataItems = 1000
Expand Down
4 changes: 2 additions & 2 deletions hub/in_memory_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (i *inMemoryIterator) CanIterate() bool {
}
}

func (i *inMemoryIterator) GetScanMetadata() []ScanMetadata {
return nil
func (i *inMemoryIterator) GetScanMetadata() ScanMetadata {
return ScanMetadata{}
}
func (i *inMemoryIterator) GetTraceContext() *telemetry.TraceCtx {
return &telemetry.TraceCtx{Ctx: context.Background()}
Expand Down
2 changes: 1 addition & 1 deletion hub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ type Iterator interface {
Status() queryStatus
Error() error
CanIterate() bool
GetScanMetadata() []ScanMetadata
GetScanMetadata() ScanMetadata
GetTraceContext() *telemetry.TraceCtx
}
29 changes: 15 additions & 14 deletions hub/scan_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,24 @@ func (i *scanIterator) CanIterate() bool {

}

func (i *scanIterator) GetScanMetadata() []ScanMetadata {
res := make([]ScanMetadata, len(i.scanMetadata))
idx := 0
// GetScanMetadata returns the scan metadata for this iterator
// note: if this is an aggregator query, we will have a scan metadata for each connection
// we need to combine them into a single scan metadata object
func (i *scanIterator) GetScanMetadata() ScanMetadata {
res := ScanMetadata{
Table: i.table,
Columns: i.queryContext.Columns,
Quals: i.queryContext.Quals,
StartTime: i.startTime,
Duration: time.Since(i.startTime),
}
for _, m := range i.scanMetadata {
res[idx] = ScanMetadata{
Table: i.table,
CacheHit: m.CacheHit,
RowsFetched: m.RowsFetched,
HydrateCalls: m.HydrateCalls,
Columns: i.queryContext.Columns,
Quals: i.queryContext.Quals,
StartTime: i.startTime,
Duration: time.Since(i.startTime),
}
idx++
res.CacheHit = res.CacheHit || m.CacheHit
res.RowsFetched += m.RowsFetched
res.HydrateCalls += m.HydrateCalls
}
return res

}

func (i *scanIterator) GetTraceContext() *telemetry.TraceCtx {
Expand Down

0 comments on commit fbbf0cc

Please sign in to comment.