Skip to content

Commit

Permalink
Add CallId to Scan Iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre committed Feb 28, 2023
1 parent 0ca8f8c commit 9646a61
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
13 changes: 6 additions & 7 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,24 +648,23 @@ func (h *Hub) shouldPushdownLimit(table string, qualMap map[string]*proto.Quals,

// StartScan starts a scan (for scanIterators only = legacy iterators will have already started)
func (h *Hub) StartScan(i Iterator) error {
// iterator must be a scan iterator
// if iterator is not a scan iterator, do nothing
iterator, ok := i.(*scanIterator)
if !ok {
return nil
}

// iterator must be a scan iterator
// ensure we do not call execute too frequently
h.throttle()

table := iterator.table
connectionPlugin := iterator.connectionPlugin
callId := grpc.BuildCallId()

req := &proto.ExecuteRequest{
Table: table,
QueryContext: iterator.queryContext,
CallId: callId,
Table: table,
QueryContext: iterator.queryContext,
CallId: iterator.callId,
// pass connection name - used for aggregators
Connection: iterator.ConnectionName(),
TraceContext: grpc.CreateCarrierFromContext(iterator.traceCtx.Ctx),
Expand All @@ -684,7 +683,7 @@ func (h *Hub) StartScan(i Iterator) error {
req.ExecuteConnectionData[connectionName] = data
}

log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, callId, req.CacheEnabled, iterator)
log.Printf("[INFO] StartScan for table: %s, cache enabled: %v, iterator %p, %d quals (%s)", table, req.CacheEnabled, iterator, len(iterator.queryContext.Quals), iterator.callId)
stream, ctx, cancel, err := connectionPlugin.PluginClient.Execute(req)
// format GRPC errors and ignore not implemented errors for backwards compatibility
err = grpc.HandleGrpcError(err, connectionPlugin.PluginName, "Execute")
Expand Down Expand Up @@ -773,7 +772,7 @@ func (h *Hub) cacheTTL(connectionName string) time.Duration {
// ask the steampipe config for resolved plugin options - this will use default values where needed
connectionOptions := steampipeconfig.GlobalConfig.GetConnectionOptions(connectionName)

// the config loading code shouls ALWAYS populate the connection options, using defaults if needed
// the config loading code should ALWAYS populate the connection options, using defaults if needed
if connectionOptions.CacheTTL == nil {
panic(fmt.Sprintf("No cache options found for connection %s", connectionName))
}
Expand Down
7 changes: 5 additions & 2 deletions hub/scan_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc"
"log"
"time"

Expand Down Expand Up @@ -45,6 +46,7 @@ type scanIterator struct {
queryContext *proto.QueryContext

startTime time.Time
callId string
}

func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugin, connectionName, table string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, limit int64, traceCtx *telemetry.TraceCtx) *scanIterator {
Expand All @@ -60,6 +62,7 @@ func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugi
traceCtx: traceCtx,
startTime: time.Now(),
queryContext: proto.NewQueryContext(columns, qualMap, limit),
callId: grpc.BuildCallId(),
}
}

Expand Down Expand Up @@ -269,7 +272,7 @@ func (i *scanIterator) readPluginResult(ctx context.Context) bool {
continueReading = false
case rowResult := <-rcvChan:
if rowResult == nil {
log.Printf("[TRACE] readPluginResult nil row received - stop reading (%p)", i)
log.Printf("[TRACE] readPluginResult nil row received - stop reading (%p) (%s)", i, i.callId)
// stop reading
continueReading = false
} else {
Expand All @@ -281,7 +284,7 @@ func (i *scanIterator) readPluginResult(ctx context.Context) bool {
}
case err := <-errChan:
if err.Error() == "EOF" {
log.Printf("[TRACE] readPluginResult EOF error received - stop reading (%p)", i)
log.Printf("[TRACE] readPluginResult EOF error received - stop reading (%p) (%s)", i, i.callId)
} else {
log.Printf("[WARN] stream receive error %v (%p)\n", err, i)
i.setError(err)
Expand Down

0 comments on commit 9646a61

Please sign in to comment.