Skip to content

Commit

Permalink
Fix querier panic when marshaling QueryResultRequest (issue: cortexpr…
Browse files Browse the repository at this point in the history
…oject#6599)

Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Feb 25, 2025
1 parent fc6c40d commit 6e4e90a
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [BUGFIX] Compactor: Cleaner would delete bucket index when there is no block in bucket store. #6577
* [BUGFIX] Querier: Fix marshal native histogram with empty bucket when protobuf codec is enabled. #6595
* [BUGFIX] Query Frontend: Fix samples scanned and peak samples query stats when query hits results cache. #6591
* [BUGFIX] Querier: Fix panic when marshaling QueryResultRequest. #6601

## 1.19.0 in progress

Expand Down
10 changes: 10 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ func IsEnabled(ctx context.Context) bool {
return FromContext(ctx) != nil
}

func (s *QueryStats) Copy() *QueryStats {
if s == nil {
return nil
}

copied := &QueryStats{}
copied.Merge(s)
return copied
}

// AddWallTime adds some time to the counter.
func (s *QueryStats) AddWallTime(t time.Duration) {
if s == nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/querier/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@ import (
"github.com/stretchr/testify/assert"
)

func TestStats_Copy(t *testing.T) {
t.Run("stats is nil", func(t *testing.T) {
var stats *QueryStats
copied := stats.Copy()
assert.Nil(t, copied)
})
t.Run("stats is not nil", func(t *testing.T) {
stats, _ := ContextWithEmptyStats(context.Background())
stats.AddWallTime(time.Second)
copied := stats.Copy()

// value should be the same
assert.Equal(t, time.Second, copied.LoadWallTime())
p1, p2 := &copied, &stats

// address should be different
assert.False(t, p1 == p2)
})
}

func TestStats_WallTime(t *testing.T) {
t.Run("add and load wall time", func(t *testing.T) {
stats, _ := ContextWithEmptyStats(context.Background())
Expand Down
16 changes: 12 additions & 4 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
querierID: cfg.QuerierID,
grpcConfig: cfg.GRPCClientConfig,
targetHeaders: cfg.TargetHeaders,
schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
return schedulerpb.NewSchedulerForQuerierClient(conn)
},
frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_querier_query_frontend_request_duration_seconds",
Help: "Time spend doing requests to frontend.",
Expand Down Expand Up @@ -72,12 +75,13 @@ type schedulerProcessor struct {
frontendPool *client.Pool
frontendClientRequestDuration *prometheus.HistogramVec

targetHeaders []string
targetHeaders []string
schedulerClientFactory func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient
}

// notifyShutdown implements processor.
func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) {
client := schedulerpb.NewSchedulerForQuerierClient(conn)
client := sp.schedulerClientFactory(conn)

req := &schedulerpb.NotifyQuerierShutdownRequest{QuerierID: sp.querierID}
if _, err := client.NotifyQuerierShutdown(ctx, req); err != nil {
Expand All @@ -87,7 +91,7 @@ func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.Cli
}

func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
schedulerClient := schedulerpb.NewSchedulerForQuerierClient(conn)
schedulerClient := sp.schedulerClientFactory(conn)

backoff := backoff.New(ctx, processorBackoffConfig)
for backoff.Ongoing() {
Expand Down Expand Up @@ -211,11 +215,15 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger,

c, err := sp.frontendPool.GetClientFor(frontendAddress)
if err == nil {
// To prevent querier panic, the panic could happen when the go-routines not-exited
// yet in `fetchSeriesFromStores` are increment query-stats while progressing
// (*QueryResultRequest).MarshalToSizedBuffer under the same query-stat objects are used.
copiedStats := stats.Copy()
// Response is empty and uninteresting.
_, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{
QueryID: queryID,
HttpResponse: response,
Stats: stats,
Stats: copiedStats,
})
}
if err != nil {
Expand Down
174 changes: 174 additions & 0 deletions pkg/querier/worker/scheduler_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package worker

import (
"context"
"net"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
)

// mock querier request handler
type mockRequestHandler struct {
mock.Mock
}

func (m *mockRequestHandler) Handle(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
args := m.Called(ctx, req)
return args.Get(0).(*httpgrpc.HTTPResponse), args.Error(1)
}

type mockFrontendForQuerierServer struct {
mock.Mock
}

func (m *mockFrontendForQuerierServer) QueryResult(_ context.Context, _ *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) {
return &frontendv2pb.QueryResultResponse{}, nil
}

type mockSchedulerForQuerierClient struct {
mock.Mock
}

func (m *mockSchedulerForQuerierClient) QuerierLoop(ctx context.Context, opts ...grpc.CallOption) (schedulerpb.SchedulerForQuerier_QuerierLoopClient, error) {
args := m.Called(ctx, opts)
return args.Get(0).(schedulerpb.SchedulerForQuerier_QuerierLoopClient), args.Error(1)
}

func (m *mockSchedulerForQuerierClient) NotifyQuerierShutdown(ctx context.Context, in *schedulerpb.NotifyQuerierShutdownRequest, opts ...grpc.CallOption) (*schedulerpb.NotifyQuerierShutdownResponse, error) {
args := m.Called(ctx, in, opts)
return args.Get(0).(*schedulerpb.NotifyQuerierShutdownResponse), args.Error(1)
}

// mock SchedulerForQuerier_QuerierLoopClient
type mockQuerierLoopClient struct {
ctx context.Context
mock.Mock
}

func (m *mockQuerierLoopClient) Send(msg *schedulerpb.QuerierToScheduler) error {
args := m.Called(msg)
return args.Error(0)
}

func (m *mockQuerierLoopClient) Recv() (*schedulerpb.SchedulerToQuerier, error) {
args := m.Called()

if fn, ok := args.Get(0).(func() (*schedulerpb.SchedulerToQuerier, error)); ok {
return fn()
}

return args.Get(0).(*schedulerpb.SchedulerToQuerier), args.Error(1)
}

func (m *mockQuerierLoopClient) Header() (metadata.MD, error) {
args := m.Called()
return args.Get(0).(metadata.MD), args.Error(1)
}

func (m *mockQuerierLoopClient) Trailer() metadata.MD {
args := m.Called()
return args.Get(0).(metadata.MD)
}

func (m *mockQuerierLoopClient) CloseSend() error {
args := m.Called()
return args.Error(0)
}

func (m *mockQuerierLoopClient) Context() context.Context {
args := m.Called()
return args.Get(0).(context.Context)
}

func (m *mockQuerierLoopClient) SendMsg(msg interface{}) error {
args := m.Called(msg)
return args.Error(0)
}

func (m *mockQuerierLoopClient) RecvMsg(msg interface{}) error {
args := m.Called(msg)
return args.Error(0)
}

// To show https://github.com/cortexproject/cortex/issues/6599 issue has been resolved
func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) {
cfg := Config{}
frontendAddress := ":50001"
userID := "user-1"
recvCount := 100000

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

recvCall := atomic.Uint32{}

querierLoopClient := &mockQuerierLoopClient{}
querierLoopClient.ctx = ctx
querierLoopClient.On("Send", mock.Anything).Return(nil)
querierLoopClient.On("Context").Return(querierLoopClient.ctx)
querierLoopClient.On("Recv").Return(func() (*schedulerpb.SchedulerToQuerier, error) {
recvCall.Add(1)
if recvCall.Load() <= uint32(recvCount) {
return &schedulerpb.SchedulerToQuerier{
QueryID: 1,
HttpRequest: &httpgrpc.HTTPRequest{},
FrontendAddress: frontendAddress,
UserID: userID,
StatsEnabled: true,
}, nil
} else {
<-querierLoopClient.ctx.Done()
return nil, context.Canceled
}

})

requestHandler := &mockRequestHandler{}
requestHandler.On("Handle", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
stat := stats.FromContext(args.Get(0).(context.Context))

// imitate add query-stat at fetchSeriesFromStores
go stat.AddFetchedChunkBytes(10)
}).Return(&httpgrpc.HTTPResponse{}, nil)

sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil)
schedulerClient := &mockSchedulerForQuerierClient{}
schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil)

sp.schedulerClientFactory = func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
return schedulerClient
}

// frontendForQuerierServer
grpcServer := grpc.NewServer()
server := &mockFrontendForQuerierServer{}
frontendv2pb.RegisterFrontendForQuerierServer(grpcServer, server)

lis, err := net.Listen("tcp", frontendAddress)
require.NoError(t, err)
stopChan := make(chan struct{})
go func() {
defer close(stopChan)
if err := grpcServer.Serve(lis); err != nil {
return
}
}()
defer func() {
grpcServer.GracefulStop()
<-stopChan // Wait util stop complete
}()

sp.processQueriesOnSingleStream(ctx, nil, lis.Addr().String())
}

0 comments on commit 6e4e90a

Please sign in to comment.