From 6e4e90ab263e03815cf10f686322ece07d1e32d0 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 25 Feb 2025 19:09:05 +0900 Subject: [PATCH] Fix querier panic when marshaling QueryResultRequest (issue: #6599) Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/querier/stats/stats.go | 10 + pkg/querier/stats/stats_test.go | 20 ++ pkg/querier/worker/scheduler_processor.go | 16 +- .../worker/scheduler_processor_test.go | 174 ++++++++++++++++++ 5 files changed, 217 insertions(+), 4 deletions(-) create mode 100644 pkg/querier/worker/scheduler_processor_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f44817f77..85623b1cc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * [BUGFIX] Compactor: Cleaner would delete bucket index when there is no block in bucket store. #6577 * [BUGFIX] Querier: Fix marshal native histogram with empty bucket when protobuf codec is enabled. #6595 * [BUGFIX] Query Frontend: Fix samples scanned and peak samples query stats when query hits results cache. #6591 +* [BUGFIX] Querier: Fix panic when marshaling QueryResultRequest. #6601 ## 1.19.0 in progress diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index cdf432718d..127c422878 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -49,6 +49,16 @@ func IsEnabled(ctx context.Context) bool { return FromContext(ctx) != nil } +func (s *QueryStats) Copy() *QueryStats { + if s == nil { + return nil + } + + copied := &QueryStats{} + copied.Merge(s) + return copied +} + // AddWallTime adds some time to the counter. func (s *QueryStats) AddWallTime(t time.Duration) { if s == nil { diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index e698d503d9..5f2e850aef 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -8,6 +8,26 @@ import ( "github.com/stretchr/testify/assert" ) +func TestStats_Copy(t *testing.T) { + t.Run("stats is nil", func(t *testing.T) { + var stats *QueryStats + copied := stats.Copy() + assert.Nil(t, copied) + }) + t.Run("stats is not nil", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddWallTime(time.Second) + copied := stats.Copy() + + // value should be the same + assert.Equal(t, time.Second, copied.LoadWallTime()) + p1, p2 := &copied, &stats + + // address should be different + assert.False(t, p1 == p2) + }) +} + func TestStats_WallTime(t *testing.T) { t.Run("add and load wall time", func(t *testing.T) { stats, _ := ContextWithEmptyStats(context.Background()) diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index aea9820153..0d14921028 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -39,6 +39,9 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r querierID: cfg.QuerierID, grpcConfig: cfg.GRPCClientConfig, targetHeaders: cfg.TargetHeaders, + schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient { + return schedulerpb.NewSchedulerForQuerierClient(conn) + }, frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_querier_query_frontend_request_duration_seconds", Help: "Time spend doing requests to frontend.", @@ -72,12 +75,13 @@ type schedulerProcessor struct { frontendPool *client.Pool frontendClientRequestDuration *prometheus.HistogramVec - targetHeaders []string + targetHeaders []string + schedulerClientFactory func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient } // notifyShutdown implements processor. func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) { - client := schedulerpb.NewSchedulerForQuerierClient(conn) + client := sp.schedulerClientFactory(conn) req := &schedulerpb.NotifyQuerierShutdownRequest{QuerierID: sp.querierID} if _, err := client.NotifyQuerierShutdown(ctx, req); err != nil { @@ -87,7 +91,7 @@ func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.Cli } func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) { - schedulerClient := schedulerpb.NewSchedulerForQuerierClient(conn) + schedulerClient := sp.schedulerClientFactory(conn) backoff := backoff.New(ctx, processorBackoffConfig) for backoff.Ongoing() { @@ -211,11 +215,15 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, c, err := sp.frontendPool.GetClientFor(frontendAddress) if err == nil { + // To prevent querier panic, the panic could happen when the go-routines not-exited + // yet in `fetchSeriesFromStores` are increment query-stats while progressing + // (*QueryResultRequest).MarshalToSizedBuffer under the same query-stat objects are used. + copiedStats := stats.Copy() // Response is empty and uninteresting. _, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{ QueryID: queryID, HttpResponse: response, - Stats: stats, + Stats: copiedStats, }) } if err != nil { diff --git a/pkg/querier/worker/scheduler_processor_test.go b/pkg/querier/worker/scheduler_processor_test.go new file mode 100644 index 0000000000..c2c1518119 --- /dev/null +++ b/pkg/querier/worker/scheduler_processor_test.go @@ -0,0 +1,174 @@ +package worker + +import ( + "context" + "net" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "go.uber.org/atomic" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" + "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" +) + +// mock querier request handler +type mockRequestHandler struct { + mock.Mock +} + +func (m *mockRequestHandler) Handle(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + args := m.Called(ctx, req) + return args.Get(0).(*httpgrpc.HTTPResponse), args.Error(1) +} + +type mockFrontendForQuerierServer struct { + mock.Mock +} + +func (m *mockFrontendForQuerierServer) QueryResult(_ context.Context, _ *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) { + return &frontendv2pb.QueryResultResponse{}, nil +} + +type mockSchedulerForQuerierClient struct { + mock.Mock +} + +func (m *mockSchedulerForQuerierClient) QuerierLoop(ctx context.Context, opts ...grpc.CallOption) (schedulerpb.SchedulerForQuerier_QuerierLoopClient, error) { + args := m.Called(ctx, opts) + return args.Get(0).(schedulerpb.SchedulerForQuerier_QuerierLoopClient), args.Error(1) +} + +func (m *mockSchedulerForQuerierClient) NotifyQuerierShutdown(ctx context.Context, in *schedulerpb.NotifyQuerierShutdownRequest, opts ...grpc.CallOption) (*schedulerpb.NotifyQuerierShutdownResponse, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(*schedulerpb.NotifyQuerierShutdownResponse), args.Error(1) +} + +// mock SchedulerForQuerier_QuerierLoopClient +type mockQuerierLoopClient struct { + ctx context.Context + mock.Mock +} + +func (m *mockQuerierLoopClient) Send(msg *schedulerpb.QuerierToScheduler) error { + args := m.Called(msg) + return args.Error(0) +} + +func (m *mockQuerierLoopClient) Recv() (*schedulerpb.SchedulerToQuerier, error) { + args := m.Called() + + if fn, ok := args.Get(0).(func() (*schedulerpb.SchedulerToQuerier, error)); ok { + return fn() + } + + return args.Get(0).(*schedulerpb.SchedulerToQuerier), args.Error(1) +} + +func (m *mockQuerierLoopClient) Header() (metadata.MD, error) { + args := m.Called() + return args.Get(0).(metadata.MD), args.Error(1) +} + +func (m *mockQuerierLoopClient) Trailer() metadata.MD { + args := m.Called() + return args.Get(0).(metadata.MD) +} + +func (m *mockQuerierLoopClient) CloseSend() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockQuerierLoopClient) Context() context.Context { + args := m.Called() + return args.Get(0).(context.Context) +} + +func (m *mockQuerierLoopClient) SendMsg(msg interface{}) error { + args := m.Called(msg) + return args.Error(0) +} + +func (m *mockQuerierLoopClient) RecvMsg(msg interface{}) error { + args := m.Called(msg) + return args.Error(0) +} + +// To show https://github.com/cortexproject/cortex/issues/6599 issue has been resolved +func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) { + cfg := Config{} + frontendAddress := ":50001" + userID := "user-1" + recvCount := 100000 + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + recvCall := atomic.Uint32{} + + querierLoopClient := &mockQuerierLoopClient{} + querierLoopClient.ctx = ctx + querierLoopClient.On("Send", mock.Anything).Return(nil) + querierLoopClient.On("Context").Return(querierLoopClient.ctx) + querierLoopClient.On("Recv").Return(func() (*schedulerpb.SchedulerToQuerier, error) { + recvCall.Add(1) + if recvCall.Load() <= uint32(recvCount) { + return &schedulerpb.SchedulerToQuerier{ + QueryID: 1, + HttpRequest: &httpgrpc.HTTPRequest{}, + FrontendAddress: frontendAddress, + UserID: userID, + StatsEnabled: true, + }, nil + } else { + <-querierLoopClient.ctx.Done() + return nil, context.Canceled + } + + }) + + requestHandler := &mockRequestHandler{} + requestHandler.On("Handle", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + stat := stats.FromContext(args.Get(0).(context.Context)) + + // imitate add query-stat at fetchSeriesFromStores + go stat.AddFetchedChunkBytes(10) + }).Return(&httpgrpc.HTTPResponse{}, nil) + + sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil) + schedulerClient := &mockSchedulerForQuerierClient{} + schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil) + + sp.schedulerClientFactory = func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient { + return schedulerClient + } + + // frontendForQuerierServer + grpcServer := grpc.NewServer() + server := &mockFrontendForQuerierServer{} + frontendv2pb.RegisterFrontendForQuerierServer(grpcServer, server) + + lis, err := net.Listen("tcp", frontendAddress) + require.NoError(t, err) + stopChan := make(chan struct{}) + go func() { + defer close(stopChan) + if err := grpcServer.Serve(lis); err != nil { + return + } + }() + defer func() { + grpcServer.GracefulStop() + <-stopChan // Wait util stop complete + }() + + sp.processQueriesOnSingleStream(ctx, nil, lis.Addr().String()) +}