Skip to content

Commit

Permalink
Pull request #85: Upgrade Go dependencies
Browse files Browse the repository at this point in the history
Merge in PRODUCT/squirreldb from upgrade-deps-2024-03-15 to main

* commit 'd7c3744ddfd715f88bb7ebcc6d6b0a9405b0f846':
  Remove commented code
  Add test trying to produce the labels mutation bug
  Return request errors under Prometheus format
  Make WrapperEngine use perRequest data
  Fix usage of Store.ContextFromRequest() in test
  Fix linter's context warnings
  Refactor per-request related code
  Upgrade & please linters
  Refactor per-request and querier related code
  Remove debug method
  Fix labels mutation issue with Prometheus engine
  Move querier per-request data to a new type
  Upgrade koanf to v2
  Fix comment
  Upgrade linter & Go dependencies
  Fix caching reader when using Thanos parallelism
  Please linter
  WIP: upgrade Go dependencies
  • Loading branch information
Thomas Delbende authored and PierreF committed Apr 5, 2024
2 parents e2c91b7 + d7c3744 commit 9f1d30f
Show file tree
Hide file tree
Showing 26 changed files with 687 additions and 583 deletions.
36 changes: 33 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewPrometheus(
queryLogger := apiLogger.With().Str("component", "query_engine").Logger()

queryEngine := promql.NewEngine(queryLogger, useThanosPromQLEngine, metricRegistry)
queryEngine = wrapperEngine{QueryEngine: queryEngine, logger: apiLogger}
queryEngine = promql.WrapEngine(queryEngine, apiLogger)

scrapePoolRetrieverFunc := func(_ context.Context) v1.ScrapePoolsRetriever { return mockScrapePoolRetriever{} }
targetRetrieverFunc := func(context.Context) v1.TargetRetriever { return mockTargetRetriever{} }
Expand Down Expand Up @@ -251,8 +251,17 @@ func (a *API) init() {
a.metrics.RequestsSeconds.WithLabelValues(operation).Observe(time.Since(t0).Seconds())
}()

ctx := r.Context()
r = r.WithContext(types.WrapContext(ctx, r))
// We must create the cachingReader at this stage
// so that only one is allocated per request,
// and thus be able to benefit from its cache.
ctx, err := queryable.ContextFromRequest(r)
if err != nil {
a.respondError(rw, http.StatusUnprocessableEntity, err)

return
}

r = r.WithContext(ctx) //nolint: contextcheck

// Prometheus always returns a status 500 when write fails, but if
// the labels are invalid we want to return a status 400, so we use the
Expand Down Expand Up @@ -1015,6 +1024,27 @@ func (a *API) mutableLabelNamesDeleteHandler(w http.ResponseWriter, req *http.Re
fmt.Fprint(w, "ok")
}

// respondError mimics the Prometheus v1.API behavior for returning an error to the client.
func (a *API) respondError(w http.ResponseWriter, status int, err error) {
b, err := json.Marshal(&v1.Response{
Status: "error",
Error: err.Error(),
})
if err != nil {
a.Logger.Err(err).Msg("error marshaling json response")
http.Error(w, err.Error(), http.StatusInternalServerError)

return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)

if _, err := w.Write(b); err != nil {
a.Logger.Err(err).Msg("error writing response")
}
}

// interceptor implements the http.ResponseWriter interface,
// it allows to catch and modify the response status code.
type interceptor struct {
Expand Down
97 changes: 0 additions & 97 deletions api/engine.go

This file was deleted.

2 changes: 1 addition & 1 deletion api/promql/caching_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
)

// cachingReader kept the last MetricData returned in cache and re-use it if that query exactly match the request.
// cachingReader keeps the last MetricData returned in cache and re-uses it if that query exactly matches the request.
type cachingReader struct {
reader types.MetricReader

Expand Down
154 changes: 125 additions & 29 deletions api/promql/caching_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
// * myname{item="1"}: 4 points every 10 seconds between 2023-05-10T12:00:00Z to 2023-05-10T12:00:30Z
// * myname{item="2"}: 4 points every 10 seconds between 2023-05-10T12:00:20Z to 2023-05-10T12:00:50Z
// * myname{item="3"}: 4 points every 1 hour between 2023-05-10T11:00:10Z to 2023-05-10T13:00:10Z
// Metrics use respectively the ID, metricID1, metricID2, metricID3.
// * myname2{item="4"}: 4 points every 10 seconds between 2023-05-10T12:00:20Z to 2023-05-10T12:00:50Z
// Metrics use respectively the ID, metricID1, metricID2, metricID3, metricID4 & metricID4b.
// metricID4b don't have points.
func createTSDB() (*dummy.Index, *dummy.MemoryTSDB) {
metric1 := types.MetricLabel{
Labels: labels.FromMap(map[string]string{
Expand All @@ -48,8 +50,23 @@ func createTSDB() (*dummy.Index, *dummy.MemoryTSDB) {
}),
ID: metricID3,
}
metric4 := types.MetricLabel{
Labels: labels.FromMap(map[string]string{
"__name__": "myname2",
"item": "4",
}),
ID: metricID4,
}
metric4b := types.MetricLabel{
Labels: labels.FromMap(map[string]string{
"__name__": "myname2",
"item": "4",
"extraLabel": "b",
}),
ID: metricID4b,
}

index := dummy.NewIndex([]types.MetricLabel{metric1, metric2, metric3})
index := dummy.NewIndex([]types.MetricLabel{metric1, metric2, metric3, metric4, metric4b})
tsdb := &dummy.MemoryTSDB{
Data: map[types.MetricID]types.MetricData{
metricID1: {
Expand Down Expand Up @@ -114,6 +131,28 @@ func createTSDB() (*dummy.Index, *dummy.MemoryTSDB) {
},
TimeToLive: 86400,
},
metricID4: {
ID: metricID4,
Points: []types.MetricPoint{
{
Timestamp: time.Date(2023, 5, 10, 12, 0, 20, 0, time.UTC).UnixMilli(),
Value: 10,
},
{
Timestamp: time.Date(2023, 5, 10, 12, 0, 30, 0, time.UTC).UnixMilli(),
Value: 20,
},
{
Timestamp: time.Date(2023, 5, 10, 12, 0, 40, 0, time.UTC).UnixMilli(),
Value: 30,
},
{
Timestamp: time.Date(2023, 5, 10, 12, 0, 50, 0, time.UTC).UnixMilli(),
Value: 40,
},
},
TimeToLive: 86400,
},
},
}

Expand Down Expand Up @@ -1389,46 +1428,38 @@ func Test_cachingReader_Querier(t *testing.T) { //nolint:maintidx
prometheus.NewRegistry(),
)

reqCtx := types.WrapContext(context.Background(), httptest.NewRequest(http.MethodGet, "/", nil))
req := httptest.NewRequest(http.MethodGet, "/", nil)

querierIntf, err := store.Querier(tt.minTime.UnixMilli(), tt.maxTime.UnixMilli())
cachingCtx, err := store.ContextFromRequest(req)
if err != nil {
t.Fatal("Failed to parse request:", err)
}

querier, err := store.Querier(tt.minTime.UnixMilli(), tt.maxTime.UnixMilli())
if err != nil {
t.Fatal(err)
}

openSelect := make(map[int]storage.SeriesSet)
closes := make([]func() error, 0)
validatorSelect := make(map[int]storage.SeriesSet)

for _, action := range tt.actions {
ptsBefore := countingReader.PointsRead()

switch action.action {
case actionCallSelect:
result := querierIntf.Select(reqCtx, true, action.selectHints, action.selectMatcher...)
result := querier.Select(cachingCtx, true, action.selectHints, action.selectMatcher...)
openSelect[action.selectIdx] = result

// Open another Querier, because cache is never shared between two Querier (it is only between Select()
// in the same querier) we use this other Querier as validator.
validator, err := unCountedStore.Querier(tt.minTime.UnixMilli(), tt.maxTime.UnixMilli())
if err != nil {
t.Fatal(err)
}

closes = append(closes, validator.Close)

validatorSelect[action.selectIdx] = validator.Select(reqCtx, true, action.selectHints, action.selectMatcher...)
// Creating another context to use a different cache each time.
// We ignore the error since the request was known to be valid a few lines ago.
validatorCtx, _ := unCountedStore.ContextFromRequest(req)
validatorSelect[action.selectIdx] = querier.Select(validatorCtx, true, action.selectHints, action.selectMatcher...) //nolint:lll
case actionClose:
err := querierIntf.Close()
err := querier.Close()
if err != nil {
t.Errorf("%s: %v", action.description, err)
}

for _, f := range closes {
if err := f(); err != nil {
t.Errorf("%s: %v", action.description, err)
}
}
case actionCallNextOnSerieSet:
seriesSet := openSelect[action.selectIdx]
if seriesSet == nil {
Expand Down Expand Up @@ -1513,7 +1544,7 @@ func Test_cachingReader_Querier(t *testing.T) { //nolint:maintidx
}
}

err = querierIntf.Close()
err = querier.Close()
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -1542,6 +1573,42 @@ func Test_cachingReaderFromEngine(t *testing.T) {
start: time.Date(2023, 5, 10, 12, 0, 10, 0, time.UTC),
pointsRead: 2, // PromQL engine have 5 minutes look-back
},
{
name: "range1",
isInstant: false,
query: `avg_over_time(myname{item="2"}[60s])`,
start: time.Date(2023, 5, 10, 12, 0, 0, 0, time.UTC),
end: time.Date(2023, 5, 10, 13, 0, 0, 0, time.UTC),
step: 10 * time.Second,
pointsRead: 4,
},
{
name: "range1b",
isInstant: false,
query: `myname{item="2"}`,
start: time.Date(2023, 5, 10, 12, 0, 0, 0, time.UTC),
end: time.Date(2023, 5, 10, 13, 0, 0, 0, time.UTC),
step: 10 * time.Second,
pointsRead: 4,
},
{
name: "range2",
isInstant: false,
query: `myname2{item="4"}`,
start: time.Date(2023, 5, 10, 12, 0, 0, 0, time.UTC),
end: time.Date(2023, 5, 10, 13, 0, 0, 0, time.UTC),
step: 10 * time.Second,
pointsRead: 4,
},
{
name: "range2b",
isInstant: false,
query: `myname2{item="4"}`,
start: time.Date(2023, 5, 10, 12, 0, 0, 0, time.UTC),
end: time.Date(2023, 5, 10, 13, 0, 0, 0, time.UTC),
step: 10 * time.Second,
pointsRead: 4,
},
{
name: "no-cache-between-promql",
isInstant: true,
Expand Down Expand Up @@ -1626,12 +1693,14 @@ func Test_cachingReaderFromEngine(t *testing.T) {
)

for _, req := range tests {
/*if useThanos || req.name != "count_over_time with filter, cachable" {
continue
}*/
countBefore := countingReader.PointsRead()

reqCtx := types.WrapContext(context.Background(), httptest.NewRequest(http.MethodGet, "/", nil))
testReq := httptest.NewRequest(http.MethodGet, "/", nil)

reqCtx, err := store.ContextFromRequest(testReq)
if err != nil {
t.Fatal("Failed to parse request:", err)
}

if req.isInstant { //nolint:nestif
query, err := engine.NewInstantQuery(reqCtx, store, nil, req.query, req.start)
Expand Down Expand Up @@ -1671,10 +1740,37 @@ func Test_cachingReaderFromEngine(t *testing.T) {

countAfter := countingReader.PointsRead()

pointRead := int(math.Round((countAfter - countBefore)))
pointRead := int(math.Round(countAfter - countBefore))
if pointRead != req.pointsRead {
t.Errorf("req %s: points read = %d, want %d", req.name, pointRead, req.pointsRead)
}

// Make sure __name__ isn't remove from Labels inside the index. We had a case where the labels.Labels
// was mutated.
// time.Time{} is used, because dummy index ignore min/max time.
metrics, err := index.Search(context.Background(), time.Time{}, time.Time{}, nil)
if err != nil {
t.Fatal(err)
}

countMetrics := 0
for metrics.Next() {
countMetrics++

metric := metrics.At()

if metric.Labels.Get("__name__") == "" {
t.Errorf("Metric ID %d had empty name. Labels=%v", metric.ID, metric.Labels)
}
}

if err := metrics.Err(); err != nil {
t.Errorf("metrics.Err() = %v", err)
}

if countMetrics != 5 { // 5 match the number of metrics in createTSDB
t.Errorf("countMetrics = %d, want 5", countMetrics)
}
}
})
}
Expand Down
Loading

0 comments on commit 9f1d30f

Please sign in to comment.