diff --git a/vendor/github.com/v3io/v3io-tsdb/Jenkinsfile b/vendor/github.com/v3io/v3io-tsdb/Jenkinsfile
index 91e37379cf4..e17fae995ce 100644
--- a/vendor/github.com/v3io/v3io-tsdb/Jenkinsfile
+++ b/vendor/github.com/v3io/v3io-tsdb/Jenkinsfile
@@ -43,6 +43,11 @@ def build_v3io_tsdb(TAG_VERSION) {
github.upload_asset(git_project, git_project_user, "tsdbctl-${TAG_VERSION}-linux-amd64", RELEASE_ID, GIT_TOKEN)
github.upload_asset(git_project, git_project_user, "tsdbctl-${TAG_VERSION}-darwin-amd64", RELEASE_ID, GIT_TOKEN)
github.upload_asset(git_project, git_project_user, "tsdbctl-${TAG_VERSION}-windows-amd64", RELEASE_ID, GIT_TOKEN)
+ withCredentials([
+ string(credentialsId: pipelinex.PackagesRepo.ARTIFACTORY_IGUAZIO[2], variable: 'PACKAGES_ARTIFACTORY_PASSWORD')
+ ]) {
+ common.upload_file_to_artifactory(pipelinex.PackagesRepo.ARTIFACTORY_IGUAZIO[0], pipelinex.PackagesRepo.ARTIFACTORY_IGUAZIO[1], PACKAGES_ARTIFACTORY_PASSWORD, "iguazio-devops/k8s", "tsdbctl-${TAG_VERSION}-linux-amd64")
+ }
}
}
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/README.md b/vendor/github.com/v3io/v3io-tsdb/README.md
index 95b623d78f4..b99720ebfb3 100644
--- a/vendor/github.com/v3io/v3io-tsdb/README.md
+++ b/vendor/github.com/v3io/v3io-tsdb/README.md
@@ -81,8 +81,9 @@ A user can run the CLI to add (append) or query the DB, to use the CLI, build th
it has built-in help, see the following add/query examples:
```
- # create a DB with expected ingestion rate of one sample per second and some aggregates (at 30 min interval)
- tsdbctl create -t
--ingestion-rate 1/s -a count,sum,max -i 30m
+ # create a DB with expected ingestion rate of one sample per second and some aggregates (at 30 min interval)
+ # and cross-label aggregates for "host"
+ tsdbctl create -t --ingestion-rate 1/s -a count,sum,max -i 30m -l label1
# display DB info with metric names (types)
tsdbctl info -t -n
@@ -115,7 +116,8 @@ such as partitioning strategy, retention, aggregates, etc. this can be done via
samplesIngestionRate = "1/s"
aggregationGranularity = "1h"
aggregatesList = "scount,avg,min,max"
- schema, err := schema.NewSchema(v3iocfg, samplesIngestionRate, aggregationGranularity, aggregatesList)
+ crossLabel = "label1,label2;label3"
+ schema, err := schema.NewSchema(v3iocfg, samplesIngestionRate, aggregationGranularity, aggregatesList, crossLabel)
if err != nil {
// TODO: handle error
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/aggregate/aggregate.go b/vendor/github.com/v3io/v3io-tsdb/pkg/aggregate/aggregate.go
index 07d492816a7..68794091c52 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/aggregate/aggregate.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/aggregate/aggregate.go
@@ -129,6 +129,25 @@ func RawAggregatesToStringList(aggregates string) ([]string, error) {
return list, nil
}
+func ParseCrossLabelSets(str string) [][]string {
+ var res [][]string
+ labelSetStrings := strings.Split(str, ";")
+ for _, labelSetString := range labelSetStrings {
+ labelSet := strings.Split(strings.TrimSpace(labelSetString), ",")
+ var trimmedLabelSet []string
+ for _, label := range labelSet {
+ trimmedLabel := strings.TrimSpace(label)
+ if trimmedLabel != "" {
+ trimmedLabelSet = append(trimmedLabelSet, trimmedLabel)
+ }
+ }
+ if len(trimmedLabelSet) > 0 {
+ res = append(res, trimmedLabelSet)
+ }
+ }
+ return res
+}
+
// Convert a comma-separated aggregation-functions string to an aggregates mask
func AggregatesFromStringListWithCount(split []string) (AggrType, []AggrType, error) {
var aggrMask AggrType
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go b/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go
index 533f7255bb0..0e172a0cc0b 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/appender/appender.go
@@ -220,17 +220,15 @@ func (mc *MetricsCache) Add(lset utils.LabelsIfc, t int64, v interface{}) (uint6
var aggrMetrics []*MetricState
if !ok {
- for _, layer := range mc.partitionMngr.GetConfig().TableSchemaInfo.RollupLayers {
- for _, preAggr := range layer.PreAggregates {
- subLset := lset.Filter(preAggr.Labels)
- name, key, hash := subLset.GetKey()
- aggrMetric, ok := mc.getMetric(name, hash)
- if !ok {
- aggrMetric = &MetricState{Lset: subLset, key: key, name: name, hash: hash}
- aggrMetric.store = NewChunkStore(mc.logger, subLset.LabelNames(), true)
- mc.addMetric(hash, name, aggrMetric)
- aggrMetrics = append(aggrMetrics, aggrMetric)
- }
+ for _, preAggr := range mc.partitionMngr.GetConfig().TableSchemaInfo.PreAggregates {
+ subLset := lset.Filter(preAggr.Labels)
+ name, key, hash := subLset.GetKey()
+ aggrMetric, ok := mc.getMetric(name, hash)
+ if !ok {
+ aggrMetric = &MetricState{Lset: subLset, key: key, name: name, hash: hash}
+ aggrMetric.store = NewChunkStore(mc.logger, subLset.LabelNames(), true)
+ mc.addMetric(hash, name, aggrMetric)
+ aggrMetrics = append(aggrMetrics, aggrMetric)
}
}
metric = &MetricState{Lset: lset, key: key, name: name, hash: hash, aggrs: aggrMetrics}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/config/config.go b/vendor/github.com/v3io/v3io-tsdb/pkg/config/config.go
index f1d2cb61fda..0bb94f1680f 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/config/config.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/config/config.go
@@ -161,6 +161,8 @@ type V3ioConfig struct {
DisableClientAggr bool `json:"disableClientAggr,omitempty"`
// Build Info
BuildInfo *BuildInfo `json:"buildInfo,omitempty"`
+ // Override nginx bug
+ DisableNginxMitigation bool `json:"disableNginxMitigation,omitempty"`
}
type MetricsReporterConfig struct {
@@ -183,8 +185,7 @@ type Rollup struct {
SampleRetention int `json:"sampleRetention"`
// Layer retention time, in months ('m'), days ('d'), or hours ('h').
// Format: "[0-9]+[hmd]". For example: "3h", "7d", "1m"
- LayerRetentionTime string `json:"layerRetentionTime"`
- PreAggregates []PreAggregate `json:"preAggregates"`
+ LayerRetentionTime string `json:"layerRetentionTime"`
}
type PreAggregate struct {
@@ -194,11 +195,12 @@ type PreAggregate struct {
}
type TableSchema struct {
- Version int `json:"version"`
- RollupLayers []Rollup `json:"rollupLayers"`
- ShardingBucketsCount int `json:"shardingBucketsCount"`
- PartitionerInterval string `json:"partitionerInterval"`
- ChunckerInterval string `json:"chunckerInterval"`
+ Version int `json:"version"`
+ RollupLayers []Rollup `json:"rollupLayers"`
+ ShardingBucketsCount int `json:"shardingBucketsCount"`
+ PartitionerInterval string `json:"partitionerInterval"`
+ ChunckerInterval string `json:"chunckerInterval"`
+ PreAggregates []PreAggregate `json:"preAggregates"`
}
type PartitionSchema struct {
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr.go b/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr.go
index 54a191a1517..fd9f7f6ee17 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr.go
@@ -311,6 +311,10 @@ type DBPartition struct {
rollupBuckets int // Total number of aggregation buckets per partition
}
+func (p *DBPartition) PreAggregates() []config.PreAggregate {
+ return p.manager.GetConfig().TableSchemaInfo.PreAggregates
+}
+
func (p *DBPartition) IsCyclic() bool {
return p.manager.cyclic
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr_test.go b/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr_test.go
index 7d659e0d0f3..b0e54932660 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr_test.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr_test.go
@@ -79,7 +79,7 @@ func getPartitionManager(tst *testing.T) *PartitionManager {
tst.Fatalf("Failed to obtain a TSDB configuration. Error: %v", err)
}
- schm, err := schema.NewSchema(v3ioConfig, "1/s", "1h", "*")
+ schm, err := schema.NewSchema(v3ioConfig, "1/s", "1h", "*", "")
if err != nil {
tst.Fatalf("Failed to create a TSDB schema. Error: %v", err)
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/chunkIterator.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/chunkIterator.go
index 8816e475c8c..a890f2b5f49 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/chunkIterator.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/chunkIterator.go
@@ -11,7 +11,7 @@ import (
)
// Chunk-list series iterator
-type rawChunkIterator struct {
+type RawChunkIterator struct {
mint, maxt int64
chunks []chunkenc.Chunk
@@ -33,8 +33,8 @@ func newRawChunkIterator(queryResult *qryResults, log logger.Logger) utils.Serie
maxt = int64(maxTime.(int))
}
- newIterator := rawChunkIterator{
- mint: queryResult.query.mint, maxt: maxt, log: log, encoding: queryResult.encoding}
+ newIterator := RawChunkIterator{
+ mint: queryResult.query.mint, maxt: maxt, log: log.GetChild("rawChunkIterator"), encoding: queryResult.encoding}
newIterator.AddChunks(queryResult)
@@ -48,7 +48,7 @@ func newRawChunkIterator(queryResult *qryResults, log logger.Logger) utils.Serie
}
// Advance the iterator to the specified chunk and time
-func (it *rawChunkIterator) Seek(t int64) bool {
+func (it *RawChunkIterator) Seek(t int64) bool {
// Seek time is after the item's end time (maxt)
if t > it.maxt {
@@ -93,21 +93,25 @@ func (it *rawChunkIterator) Seek(t int64) bool {
if it.chunkIndex == len(it.chunks)-1 {
return false
}
+
+ // Free up memory of old chunk
+ it.chunks[it.chunkIndex] = nil
+
it.chunkIndex++
it.iter = it.chunks[it.chunkIndex].Iterator()
}
}
}
-func (it *rawChunkIterator) updatePrevPoint() {
+func (it *RawChunkIterator) updatePrevPoint() {
t, v := it.At()
- if t != 0 && v != 0 {
+ if !(t == 0 && v == 0) {
it.prevT, it.prevV = t, v
}
}
// Move to the next iterator item
-func (it *rawChunkIterator) Next() bool {
+func (it *RawChunkIterator) Next() bool {
it.updatePrevPoint()
if it.iter.Next() {
t, _ := it.iter.At()
@@ -132,21 +136,24 @@ func (it *rawChunkIterator) Next() bool {
return false
}
+ // Free up memory of old chunk
+ it.chunks[it.chunkIndex] = nil
+
it.chunkIndex++
it.iter = it.chunks[it.chunkIndex].Iterator()
return it.Next()
}
// Read the time and value at the current location
-func (it *rawChunkIterator) At() (t int64, v float64) { return it.iter.At() }
+func (it *RawChunkIterator) At() (t int64, v float64) { return it.iter.At() }
-func (it *rawChunkIterator) AtString() (t int64, v string) { return it.iter.AtString() }
+func (it *RawChunkIterator) AtString() (t int64, v string) { return it.iter.AtString() }
-func (it *rawChunkIterator) Err() error { return it.iter.Err() }
+func (it *RawChunkIterator) Err() error { return it.iter.Err() }
-func (it *rawChunkIterator) Encoding() chunkenc.Encoding { return it.encoding }
+func (it *RawChunkIterator) Encoding() chunkenc.Encoding { return it.encoding }
-func (it *rawChunkIterator) AddChunks(item *qryResults) {
+func (it *RawChunkIterator) AddChunks(item *qryResults) {
var chunks []chunkenc.Chunk
var chunksMax []int64
if item.query.maxt > it.maxt {
@@ -184,24 +191,25 @@ func (it *rawChunkIterator) AddChunks(item *qryResults) {
it.chunksMax = append(it.chunksMax, chunksMax...)
}
-func (it *rawChunkIterator) PeakBack() (t int64, v float64) { return it.prevT, it.prevV }
+func (it *RawChunkIterator) PeakBack() (t int64, v float64) { return it.prevT, it.prevV }
func NewRawSeries(results *qryResults, logger logger.Logger) (utils.Series, error) {
- newSeries := V3ioRawSeries{fields: results.fields, logger: logger}
+ newSeries := V3ioRawSeries{fields: results.fields, logger: logger, encoding: results.encoding}
err := newSeries.initLabels()
if err != nil {
return nil, err
}
- newSeries.iter = newRawChunkIterator(results, nil)
+ newSeries.iter = newRawChunkIterator(results, logger)
return &newSeries, nil
}
type V3ioRawSeries struct {
- fields map[string]interface{}
- lset utils.Labels
- iter utils.SeriesIterator
- logger logger.Logger
- hash uint64
+ fields map[string]interface{}
+ lset utils.Labels
+ iter utils.SeriesIterator
+ logger logger.Logger
+ hash uint64
+ encoding chunkenc.Encoding
}
func (s *V3ioRawSeries) Labels() utils.Labels { return s.lset }
@@ -217,7 +225,12 @@ func (s *V3ioRawSeries) GetKey() uint64 {
func (s *V3ioRawSeries) Iterator() utils.SeriesIterator { return s.iter }
func (s *V3ioRawSeries) AddChunks(results *qryResults) {
- s.iter.(*rawChunkIterator).AddChunks(results)
+ switch iter := s.iter.(type) {
+ case *RawChunkIterator:
+ iter.AddChunks(results)
+ case utils.NullSeriesIterator:
+ s.iter = newRawChunkIterator(results, s.logger)
+ }
}
// Initialize the label set from _lset and _name attributes
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/chunkIterator_test.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/chunkIterator_test.go
new file mode 100644
index 00000000000..ce214746241
--- /dev/null
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/chunkIterator_test.go
@@ -0,0 +1,103 @@
+// +build integration
+
+package pquerier_test
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/v3io/v3io-tsdb/pkg/config"
+ "github.com/v3io/v3io-tsdb/pkg/pquerier"
+ "github.com/v3io/v3io-tsdb/pkg/tsdb"
+ "github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest"
+ "github.com/v3io/v3io-tsdb/pkg/utils"
+)
+
+const baseTestTime = int64(1547510400000) // 15/01/2019 00:00:00
+
+type testRawChunkIterSuite struct {
+ suite.Suite
+ v3ioConfig *config.V3ioConfig
+ suiteTimestamp int64
+}
+
+func (suite *testRawChunkIterSuite) SetupSuite() {
+ v3ioConfig, err := tsdbtest.LoadV3ioConfig()
+ suite.Require().NoError(err)
+
+ suite.v3ioConfig = v3ioConfig
+ suite.suiteTimestamp = time.Now().Unix()
+}
+
+func (suite *testRawChunkIterSuite) SetupTest() {
+ suite.v3ioConfig.TablePath = fmt.Sprintf("%s-%v", suite.T().Name(), suite.suiteTimestamp)
+ tsdbtest.CreateTestTSDB(suite.T(), suite.v3ioConfig)
+}
+
+func (suite *testRawChunkIterSuite) TearDownTest() {
+ suite.v3ioConfig.TablePath = fmt.Sprintf("%s-%v", suite.T().Name(), suite.suiteTimestamp)
+ if !suite.T().Failed() {
+ tsdbtest.DeleteTSDB(suite.T(), suite.v3ioConfig)
+ }
+}
+
+func (suite *testRawChunkIterSuite) TestRawChunkIteratorWithZeroValue() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.Require().NoError(err)
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ ingestData := []tsdbtest.DataPoint{{baseTestTime, 10},
+ {baseTestTime + tsdbtest.MinuteInMillis, 0},
+ {baseTestTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTestTime + 3*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestData},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ querierV2, err := adapter.QuerierV2()
+ suite.Require().NoError(err)
+
+ params, _, _ := pquerier.ParseQuery("select cpu")
+ params.From = baseTestTime
+ params.To = baseTestTime + int64(numberOfEvents*eventsInterval)
+
+ set, err := querierV2.Select(params)
+ suite.Require().NoError(err)
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator().(*pquerier.RawChunkIterator)
+
+ var index int
+ for iter.Next() {
+ t, v := iter.At()
+ prevT, prevV := iter.PeakBack()
+
+ suite.Require().Equal(ingestData[index].Time, t, "current time does not match")
+ suite.Require().Equal(ingestData[index].Value, v, "current value does not match")
+
+ if index > 0 {
+ suite.Require().Equal(ingestData[index-1].Time, prevT, "current time does not match")
+ suite.Require().Equal(ingestData[index-1].Value, prevV, "current value does not match")
+ }
+ index++
+ }
+ }
+
+ suite.Require().Equal(1, seriesCount, "series count didn't match expected")
+}
+
+func TestRawChunkIterSuite(t *testing.T) {
+ suite.Run(t, new(testRawChunkIterSuite))
+}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/collector.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/collector.go
index 723eff8c90e..05624bc87dd 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/collector.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/collector.go
@@ -5,6 +5,8 @@ import (
"math"
"github.com/v3io/v3io-tsdb/pkg/aggregate"
+ "github.com/v3io/v3io-tsdb/pkg/config"
+ "github.com/v3io/v3io-tsdb/pkg/utils"
)
/* main query flow logic
@@ -59,8 +61,8 @@ once collectors are done (wg.done) return SeriesSet (prom compatible) or FrameSe
func mainCollector(ctx *selectQueryContext, responseChannel chan *qryResults) {
defer ctx.wg.Done()
- lastTimePerMetric := make(map[string]int64, len(ctx.columnsSpecByMetric))
- lastValuePerMetric := make(map[string]float64, len(ctx.columnsSpecByMetric))
+ lastTimePerMetric := make(map[uint64]int64, len(ctx.columnsSpecByMetric))
+ lastValuePerMetric := make(map[uint64]float64, len(ctx.columnsSpecByMetric))
for res := range responseChannel {
if res.IsRawQuery() {
@@ -72,16 +74,25 @@ func mainCollector(ctx *selectQueryContext, responseChannel chan *qryResults) {
ctx.errorChannel <- err
return
}
+ lsetAttr, _ := res.fields[config.LabelSetAttrName].(string)
+ lset, _ := utils.LabelsFromString(lsetAttr)
+ currentResultHash := lset.Hash()
- if res.IsServerAggregates() {
- aggregateServerAggregates(ctx, res)
- } else if res.IsClientAggregates() {
- aggregateClientAggregates(ctx, res)
+ // Aggregating cross series aggregates, only supported over raw data.
+ if ctx.isCrossSeriesAggregate {
+ lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], _ = aggregateClientAggregatesCrossSeries(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash])
+ } else {
+ // Aggregating over time aggregates
+ if res.IsServerAggregates() {
+ aggregateServerAggregates(ctx, res)
+ } else if res.IsClientAggregates() {
+ aggregateClientAggregates(ctx, res)
+ }
}
// It is possible to query an aggregate and down sample raw chunks in the same df.
if res.IsDownsample() {
- lastTimePerMetric[res.name], lastValuePerMetric[res.name], err = downsampleRawData(ctx, res, lastTimePerMetric[res.name], lastValuePerMetric[res.name])
+ lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], err = downsampleRawData(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash])
if err != nil {
ctx.logger.Error("problem downsampling '%v', lset: %v, err:%v", res.name, res.frame.lset, err)
ctx.errorChannel <- err
@@ -110,14 +121,14 @@ func rawCollector(ctx *selectQueryContext, res *qryResults) {
func aggregateClientAggregates(ctx *selectQueryContext, res *qryResults) {
ctx.logger.Debug("using Client Aggregates Collector for metric %v", res.name)
- it := newRawChunkIterator(res, nil)
+ it := newRawChunkIterator(res, ctx.logger)
for it.Next() {
t, v := it.At()
currentCell := (t - ctx.queryParams.From) / res.query.aggregationParams.Interval
for _, col := range res.frame.columns {
if col.GetColumnSpec().metric == res.name {
- col.SetDataAt(int(currentCell), v)
+ _ = col.SetDataAt(int(currentCell), v)
}
}
}
@@ -152,7 +163,7 @@ func aggregateServerAggregates(ctx *selectQueryContext, res *qryResults) {
} else {
floatVal = math.Float64frombits(val)
}
- col.SetDataAt(int(currentCell), floatVal)
+ _ = col.SetDataAt(int(currentCell), floatVal)
}
}
}
@@ -163,9 +174,10 @@ func downsampleRawData(ctx *selectQueryContext, res *qryResults,
previousPartitionLastTime int64, previousPartitionLastValue float64) (int64, float64, error) {
ctx.logger.Debug("using Downsample Collector for metric %v", res.name)
- var lastT int64
- var lastV float64
- it := newRawChunkIterator(res, nil).(*rawChunkIterator)
+ it, ok := newRawChunkIterator(res, ctx.logger).(*RawChunkIterator)
+ if !ok {
+ return previousPartitionLastTime, previousPartitionLastValue, nil
+ }
col, err := res.frame.Column(res.name)
if err != nil {
return previousPartitionLastTime, previousPartitionLastValue, err
@@ -176,7 +188,7 @@ func downsampleRawData(ctx *selectQueryContext, res *qryResults,
t, v := it.At()
tBucketIndex := (t - ctx.queryParams.From) / ctx.queryParams.Step
if t == currBucketTime {
- col.SetDataAt(currBucket, v)
+ _ = col.SetDataAt(currBucket, v)
} else if tBucketIndex == int64(currBucket) {
prevT, prevV := it.PeakBack()
@@ -185,22 +197,73 @@ func downsampleRawData(ctx *selectQueryContext, res *qryResults,
prevT = previousPartitionLastTime
prevV = previousPartitionLastValue
}
- // If previous point is too old for interpolation
- interpolateFunc, tolerance := col.GetInterpolationFunction()
- if prevT != 0 && t-prevT > tolerance {
- col.SetDataAt(currBucket, math.NaN())
+ interpolatedT, interpolatedV := col.GetInterpolationFunction()(prevT, t, currBucketTime, prevV, v)
+
+ // Check if the interpolation was successful in terms of exceeding tolerance
+ if interpolatedT == 0 && interpolatedV == 0 {
+ _ = col.SetDataAt(currBucket, math.NaN())
} else {
- _, interpolatedV := interpolateFunc(prevT, t, currBucketTime, prevV, v)
- col.SetDataAt(currBucket, interpolatedV)
+ _ = col.SetDataAt(currBucket, interpolatedV)
}
} else {
- col.SetDataAt(currBucket, math.NaN())
+ _ = col.SetDataAt(currBucket, math.NaN())
+ }
+ } else {
+ _ = col.SetDataAt(currBucket, math.NaN())
+ }
+ }
+
+ lastT, lastV := it.At()
+ return lastT, lastV, nil
+}
+
+func aggregateClientAggregatesCrossSeries(ctx *selectQueryContext, res *qryResults, previousPartitionLastTime int64, previousPartitionLastValue float64) (int64, float64, error) {
+ ctx.logger.Debug("using Client Aggregates Collector for metric %v", res.name)
+ it := newRawChunkIterator(res, ctx.logger).(*RawChunkIterator)
+
+ var previousPartitionEndBucket int
+ if previousPartitionLastTime != 0 {
+ previousPartitionEndBucket = int((previousPartitionLastTime-ctx.queryParams.From)/ctx.queryParams.Step) + 1
+ }
+ maxBucketForPartition := int((res.query.partition.GetEndTime() - ctx.queryParams.From) / ctx.queryParams.Step)
+ if maxBucketForPartition > ctx.getResultBucketsSize() {
+ maxBucketForPartition = ctx.getResultBucketsSize()
+ }
+
+ for currBucket := previousPartitionEndBucket; currBucket < maxBucketForPartition; currBucket++ {
+ currBucketTime := int64(currBucket)*ctx.queryParams.Step + ctx.queryParams.From
+
+ if it.Seek(currBucketTime) {
+ t, v := it.At()
+ if t == currBucketTime {
+ for _, col := range res.frame.columns {
+ if col.GetColumnSpec().metric == res.name {
+ _ = col.SetDataAt(currBucket, v)
+ }
+ }
+ } else {
+ prevT, prevV := it.PeakBack()
+
+ // In case it's the first point in the partition use the last point of the previous partition for the interpolation
+ if prevT == 0 {
+ prevT = previousPartitionLastTime
+ prevV = previousPartitionLastValue
+ }
+
+ for _, col := range res.frame.columns {
+ if col.GetColumnSpec().metric == res.name {
+ interpolatedT, interpolatedV := col.GetInterpolationFunction()(prevT, t, currBucketTime, prevV, v)
+ if !(interpolatedT == 0 && interpolatedV == 0) {
+ _ = col.SetDataAt(currBucket, interpolatedV)
+ }
+ }
+ }
}
} else {
- lastT, lastV = it.At()
- col.SetDataAt(currBucket, math.NaN())
+ break
}
}
+ lastT, lastV := it.At()
return lastT, lastV, nil
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go
index c9db07a3a47..df38d21ee43 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go
@@ -97,8 +97,8 @@ func (fi *frameIterator) Err() error {
}
// data frame, holds multiple value columns and an index (time) column
-func NewDataFrame(columnsSpec []columnMeta, indexColumn Column, lset utils.Labels, hash uint64, isRawQuery, getAllMetrics bool, columnSize int, useServerAggregates, showAggregateLabel bool, encoding chunkenc.Encoding) (*dataFrame, error) {
- df := &dataFrame{lset: lset, hash: hash, isRawSeries: isRawQuery, showAggregateLabel: showAggregateLabel, encoding: encoding}
+func NewDataFrame(columnsSpec []columnMeta, indexColumn Column, lset utils.Labels, hash uint64, isRawQuery, getAllMetrics bool, columnSize int, useServerAggregates, showAggregateLabel bool) (*dataFrame, error) {
+ df := &dataFrame{lset: lset, hash: hash, isRawSeries: isRawQuery, showAggregateLabel: showAggregateLabel}
// is raw query
if isRawQuery {
df.columnByName = make(map[string]int, len(columnsSpec))
@@ -221,8 +221,6 @@ type dataFrame struct {
metrics map[string]struct{}
metricToCountColumn map[string]Column
-
- encoding chunkenc.Encoding
}
func (d *dataFrame) addMetricIfNotExist(metricName string, columnSize int, useServerAggregates bool) error {
@@ -324,13 +322,13 @@ func (d *dataFrame) TimeSeries(i int) (utils.Series, error) {
if err != nil {
return nil, err
}
+
return NewDataFrameColumnSeries(d.index,
currentColumn,
d.metricToCountColumn[currentColumn.GetColumnSpec().metric],
d.Labels(),
d.hash,
- d.showAggregateLabel,
- d.encoding), nil
+ d.showAggregateLabel), nil
}
}
@@ -350,15 +348,28 @@ func (d *dataFrame) rawSeriesToColumns() {
var timeData []time.Time
columns := make([][]interface{}, len(d.rawColumns))
nonExhaustedIterators := len(d.rawColumns)
-
+ seriesToDataType := make([]DType, len(d.rawColumns))
currentTime := int64(math.MaxInt64)
nextTime := int64(math.MaxInt64)
- for _, rawSeries := range d.rawColumns {
- rawSeries.Iterator().Next()
- t, _ := rawSeries.Iterator().At()
- if t < nextTime {
- nextTime = t
+ for seriesIndex, rawSeries := range d.rawColumns {
+ if rawSeries.Iterator().Next() {
+ t, _ := rawSeries.Iterator().At()
+ if t < nextTime {
+ nextTime = t
+ }
+ } else {
+ nonExhaustedIterators--
+ }
+
+ currentEnc := chunkenc.EncXOR
+ seriesToDataType[seriesIndex] = FloatType
+ if ser, ok := rawSeries.(*V3ioRawSeries); ok {
+ currentEnc = ser.encoding
+ }
+
+ if currentEnc == chunkenc.EncVariant {
+ seriesToDataType[seriesIndex] = StringType
}
}
@@ -373,7 +384,7 @@ func (d *dataFrame) rawSeriesToColumns() {
var v interface{}
var t int64
- if d.encoding == chunkenc.EncVariant {
+ if seriesToDataType[seriesIndex] == StringType {
t, v = iter.AtString()
} else {
t, v = iter.At()
@@ -405,7 +416,7 @@ func (d *dataFrame) rawSeriesToColumns() {
for i, series := range d.rawColumns {
name := series.Labels().Get(config.PrometheusMetricNameAttribute)
spec := columnMeta{metric: name}
- col := NewDataColumn(name, spec, numberOfRows, FloatType)
+ col := NewDataColumn(name, spec, numberOfRows, seriesToDataType[i])
col.SetData(columns[i], numberOfRows)
d.columns[i] = col
}
@@ -429,16 +440,15 @@ type Column interface {
GetColumnSpec() columnMeta // Get the column's metadata
SetDataAt(i int, value interface{}) error
SetData(d interface{}, size int) error
- GetInterpolationFunction() (InterpolationFunction, int64)
+ GetInterpolationFunction() InterpolationFunction
setMetricName(name string)
}
type basicColumn struct {
- name string
- size int
- spec columnMeta
- interpolationFunction InterpolationFunction
- interpolationTolerance int64
+ name string
+ size int
+ spec columnMeta
+ interpolationFunction InterpolationFunction
}
// Name returns the column name
@@ -464,8 +474,8 @@ func (c *basicColumn) SetDataAt(i int, value interface{}) error { return nil }
func (c *basicColumn) SetData(d interface{}, size int) error {
return errors.New("method not supported")
}
-func (dc *basicColumn) GetInterpolationFunction() (InterpolationFunction, int64) {
- return dc.interpolationFunction, dc.interpolationTolerance
+func (c *basicColumn) GetInterpolationFunction() InterpolationFunction {
+ return c.interpolationFunction
}
// DType is data type
@@ -473,8 +483,7 @@ type DType reflect.Type
func NewDataColumn(name string, colSpec columnMeta, size int, datatype DType) *dataColumn {
dc := &dataColumn{basicColumn: basicColumn{name: name, spec: colSpec, size: size,
- interpolationFunction: GetInterpolateFunc(colSpec.interpolationType),
- interpolationTolerance: colSpec.interpolationTolerance}}
+ interpolationFunction: GetInterpolateFunc(colSpec.interpolationType, colSpec.interpolationTolerance)}, dtype: datatype}
dc.initializeData(datatype)
return dc
@@ -482,7 +491,8 @@ func NewDataColumn(name string, colSpec columnMeta, size int, datatype DType) *d
type dataColumn struct {
basicColumn
- data interface{}
+ data interface{}
+ dtype DType
}
func (dc *dataColumn) initializeData(dataType DType) {
@@ -506,7 +516,7 @@ func (dc *dataColumn) initializeData(dataType DType) {
// DType returns the data type
func (dc *dataColumn) DType() DType {
- return reflect.TypeOf(dc.data)
+ return dc.dtype
}
// FloatAt returns float64 value at index i
@@ -603,7 +613,7 @@ func (dc *dataColumn) SetDataAt(i int, value interface{}) error {
func NewConcreteColumn(name string, colSpec columnMeta, size int, setFunc func(old, new interface{}) interface{}) *ConcreteColumn {
col := &ConcreteColumn{basicColumn: basicColumn{name: name, spec: colSpec, size: size,
- interpolationFunction: GetInterpolateFunc(interpolateNone)}, setFunc: setFunc}
+ interpolationFunction: GetInterpolateFunc(colSpec.interpolationType, colSpec.interpolationTolerance)}, setFunc: setFunc}
col.data = make([]interface{}, size)
return col
}
@@ -644,7 +654,7 @@ func (c *ConcreteColumn) SetDataAt(i int, val interface{}) error {
func NewVirtualColumn(name string, colSpec columnMeta, size int, function func([]Column, int) (interface{}, error)) Column {
col := &virtualColumn{basicColumn: basicColumn{name: name, spec: colSpec, size: size,
- interpolationFunction: GetInterpolateFunc(interpolateNone)},
+ interpolationFunction: GetInterpolateFunc(colSpec.interpolationType, colSpec.interpolationTolerance)},
function: function}
return col
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/interpolate.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/interpolate.go
index 5178bb9d7ca..3eec3da89c7 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/interpolate.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/interpolate.go
@@ -28,6 +28,23 @@ import (
type InterpolationType uint8
+func (it InterpolationType) String() string {
+ switch it {
+ case interpolateNone:
+ return "none"
+ case interpolateNaN:
+ return "nan"
+ case interpolatePrev:
+ return "prev"
+ case interpolateNext:
+ return "next"
+ case interpolateLinear:
+ return "linear"
+ default:
+ return "unknown"
+ }
+}
+
const (
interpolateNone InterpolationType = 0
interpolateNaN InterpolationType = 1
@@ -56,46 +73,48 @@ func StrToInterpolateType(str string) (InterpolationType, error) {
}
// return line interpolation function, estimate seek value based on previous and next points
-func GetInterpolateFunc(alg InterpolationType) InterpolationFunction {
+func GetInterpolateFunc(alg InterpolationType, tolerance int64) InterpolationFunction {
switch alg {
case interpolateNaN:
- return projectNaN
+ return func(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
+ return tseek, math.NaN()
+ }
case interpolatePrev:
- return projectPrev
+ return func(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
+ if absoluteDiff(tseek, tprev) > tolerance {
+ return 0, 0
+ }
+ return tseek, vprev
+ }
case interpolateNext:
- return projectNext
+ return func(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
+ if absoluteDiff(tnext, tseek) > tolerance {
+ return 0, 0
+ }
+ return tseek, vnext
+ }
case interpolateLinear:
- return projectLinear
+ return func(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
+ if (absoluteDiff(tseek, tprev) > tolerance) || absoluteDiff(tnext, tseek) > tolerance {
+ return 0, 0
+ }
+ if math.IsNaN(vprev) || math.IsNaN(vnext) {
+ return tseek, math.NaN()
+ }
+ v := vprev + (vnext-vprev)*float64(tseek-tprev)/float64(tnext-tprev)
+ return tseek, v
+ }
default:
- return projectNone
+ // None interpolation
+ return func(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
+ return tnext, vnext
+ }
}
}
-// skip to next point
-func projectNone(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
- return tnext, vnext
-}
-
-// return NaN, there is no valid data in this point
-func projectNaN(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
- return tseek, math.NaN()
-}
-
-// return same value as in previous point
-func projectPrev(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
- return tseek, vprev
-}
-
-// return the same value as in the next point
-func projectNext(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
- return tseek, vnext
-}
-
-// linear estimator (smooth graph)
-func projectLinear(tprev, tnext, tseek int64, vprev, vnext float64) (int64, float64) {
- if math.IsNaN(vprev) || math.IsNaN(vnext) {
- return tseek, math.NaN()
+func absoluteDiff(a, b int64) int64 {
+ if a > b {
+ return a - b
}
- v := vprev + (vnext-vprev)*float64(tseek-tprev)/float64(tnext-tprev)
- return tseek, v
+ return b - a
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/interpolate_test.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/interpolate_test.go
index f89431b4b24..468c957fbee 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/interpolate_test.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/interpolate_test.go
@@ -34,7 +34,7 @@ type testInterpolationSuite struct {
func (suite *testInterpolationSuite) TestNone() {
fntype, err := StrToInterpolateType("")
suite.Require().Nil(err)
- fn := GetInterpolateFunc(fntype)
+ fn := GetInterpolateFunc(fntype, math.MaxInt64)
t, v := fn(10, 110, 60, 100, 200)
suite.Require().Equal(t, int64(110))
suite.Require().Equal(v, 200.0)
@@ -43,7 +43,7 @@ func (suite *testInterpolationSuite) TestNone() {
func (suite *testInterpolationSuite) TestNaN() {
fntype, err := StrToInterpolateType("nan")
suite.Require().Nil(err)
- fn := GetInterpolateFunc(fntype)
+ fn := GetInterpolateFunc(fntype, math.MaxInt64)
t, v := fn(10, 110, 60, 100, 200)
suite.Require().Equal(t, int64(60))
suite.Require().Equal(math.IsNaN(v), true)
@@ -52,7 +52,7 @@ func (suite *testInterpolationSuite) TestNaN() {
func (suite *testInterpolationSuite) TestPrev() {
fntype, err := StrToInterpolateType("prev")
suite.Require().Nil(err)
- fn := GetInterpolateFunc(fntype)
+ fn := GetInterpolateFunc(fntype, math.MaxInt64)
t, v := fn(10, 110, 60, 100, 200)
suite.Require().Equal(t, int64(60))
suite.Require().Equal(v, 100.0)
@@ -61,7 +61,7 @@ func (suite *testInterpolationSuite) TestPrev() {
func (suite *testInterpolationSuite) TestNext() {
fntype, err := StrToInterpolateType("next")
suite.Require().Nil(err)
- fn := GetInterpolateFunc(fntype)
+ fn := GetInterpolateFunc(fntype, math.MaxInt64)
t, v := fn(10, 110, 60, 100, 200)
suite.Require().Equal(t, int64(60))
suite.Require().Equal(v, 200.0)
@@ -70,7 +70,7 @@ func (suite *testInterpolationSuite) TestNext() {
func (suite *testInterpolationSuite) TestLin() {
fntype, err := StrToInterpolateType("lin")
suite.Require().Nil(err)
- fn := GetInterpolateFunc(fntype)
+ fn := GetInterpolateFunc(fntype, math.MaxInt64)
t, v := fn(10, 110, 60, 100, 200)
suite.Require().Equal(t, int64(60))
suite.Require().Equal(v, 150.0)
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go
index 78f9e067bf9..17649b453b4 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go
@@ -63,7 +63,7 @@ func (s *SelectParams) getRequestedColumns() []RequestedColumn {
for _, function := range functions {
trimmed := strings.TrimSpace(function)
metricName := strings.TrimSpace(metric)
- newCol := RequestedColumn{Function: trimmed, Metric: metricName, Interpolator: "next"}
+ newCol := RequestedColumn{Function: trimmed, Metric: metricName, Interpolator: defaultInterpolation.String()}
columns[index] = newCol
index++
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go
index 38f495cd88c..6db8d1f5e9e 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go
@@ -1880,12 +1880,900 @@ func (suite *testQuerySuite) TestGroupByNotExistingLabel() {
}
}
-func (suite *testQuerySuite) toMillis(date string) int64 {
- t, err := time.Parse(time.RFC3339, date)
+func (suite *testQuerySuite) TestAggregatesWithZeroStep() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
if err != nil {
- suite.T().Fatal(err)
+ suite.T().Fatalf("failed to create v3io adapter. reason: %s", err)
}
- return t.Unix() * 1000
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime, 10},
+ {int64(baseTime + tsdbtest.MinuteInMillis), 20},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{"max": {{Time: baseTime, Value: 40}},
+ "min": {{Time: baseTime, Value: 10}},
+ "sum": {{Time: baseTime, Value: 100}},
+ "count": {{Time: baseTime, Value: 4}},
+ }
+
+ querierV2, err := adapter.QuerierV2()
+ if err != nil {
+ suite.T().Fatalf("Failed to create querier v2, err: %v", err)
+ }
+
+ params := &pquerier.SelectParams{Name: "cpu", Functions: "max, sum,count,min", Step: 0, From: baseTime, To: baseTime + int64(numberOfEvents*eventsInterval)}
+ set, err := querierV2.Select(params)
+ if err != nil {
+ suite.T().Fatalf("Failed to exeute query, err: %v", err)
+ }
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+
+ data, err := tsdbtest.IteratorToSlice(iter)
+ agg := set.At().Labels().Get(aggregate.AggregateLabel)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+
+ assert.Equal(suite.T(), expected[agg], data, "queried data does not match expected")
+ }
+
+ assert.Equal(suite.T(), 4, seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestAggregatesWithZeroStepSelectDataframe() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ if err != nil {
+ suite.T().Fatalf("failed to create v3io adapter. reason: %s", err)
+ }
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime, 10},
+ {int64(baseTime + tsdbtest.MinuteInMillis), 20},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string]tsdbtest.DataPoint{"max": {Time: baseTime, Value: 40},
+ "min": {Time: baseTime, Value: 10},
+ "sum": {Time: baseTime, Value: 100},
+ "count": {Time: baseTime, Value: 4},
+ }
+
+ querierV2, err := adapter.QuerierV2()
+ if err != nil {
+ suite.T().Fatalf("Failed to create querier v2, err: %v", err)
+ }
+
+ params := &pquerier.SelectParams{Name: "cpu", Functions: "max, sum,count,min", Step: 0, From: baseTime, To: baseTime + int64(numberOfEvents*eventsInterval)}
+ set, err := querierV2.SelectDataFrame(params)
+ if err != nil {
+ suite.T().Fatalf("Failed to exeute query, err: %v", err)
+ }
+
+ var seriesCount int
+ for set.NextFrame() {
+ seriesCount++
+ frame := set.GetFrame()
+
+ assert.Equal(suite.T(), 1, frame.Index().Len())
+ t, err := frame.Index().TimeAt(0)
+ assert.NoError(suite.T(), err)
+ assert.Equal(suite.T(), baseTime, t.UnixNano()/int64(time.Millisecond))
+
+ for _, col := range frame.Columns() {
+ assert.Equal(suite.T(), 1, col.Len())
+ currentColAggregate := strings.Split(col.Name(), "(")[0]
+ f, err := col.FloatAt(0)
+ assert.NoError(suite.T(), err)
+ assert.Equal(suite.T(), expected[currentColAggregate].Value, f)
+ }
+ }
+
+ assert.Equal(suite.T(), 1, seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestEmptyRawDataSelectDataframe() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ if err != nil {
+ suite.T().Fatalf("failed to create v3io adapter. reason: %s", err)
+ }
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime, 10},
+ {int64(baseTime + tsdbtest.MinuteInMillis), 20},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ querierV2, err := adapter.QuerierV2()
+ if err != nil {
+ suite.T().Fatalf("Failed to create querier v2, err: %v", err)
+ }
+
+ params := &pquerier.SelectParams{Name: "cpu", From: baseTime - 10*tsdbtest.MinuteInMillis, To: baseTime - 1*tsdbtest.MinuteInMillis}
+ set, err := querierV2.SelectDataFrame(params)
+ if err != nil {
+ suite.T().Fatalf("Failed to exeute query, err: %v", err)
+ }
+
+ var seriesCount int
+ for set.NextFrame() {
+ seriesCount++
+ frame := set.GetFrame()
+
+ assert.Equal(suite.T(), 0, frame.Index().Len())
+
+ for _, col := range frame.Columns() {
+ assert.Equal(suite.T(), 0, col.Len())
+ }
+ }
+
+ assert.Equal(suite.T(), 1, seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) Test2Series1EmptySelectDataframe() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ if err != nil {
+ suite.T().Fatalf("failed to create v3io adapter. reason: %s", err)
+ }
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime, 10},
+ {int64(baseTime + tsdbtest.MinuteInMillis), 20},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ tsdbtest.Metric{
+ Name: "diskio",
+ Labels: labels1,
+ Data: []tsdbtest.DataPoint{{baseTime + 10*tsdbtest.MinuteInMillis, 10}}},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{"cpu": ingestedData,
+ "diskio": {{baseTime, math.NaN()},
+ {int64(baseTime + tsdbtest.MinuteInMillis), math.NaN()},
+ {baseTime + 2*tsdbtest.MinuteInMillis, math.NaN()},
+ {baseTime + 3*tsdbtest.MinuteInMillis, math.NaN()}},
+ }
+
+ querierV2, err := adapter.QuerierV2()
+ if err != nil {
+ suite.T().Fatalf("Failed to create querier v2, err: %v", err)
+ }
+
+ params, _, _ := pquerier.ParseQuery("select cpu,diskio")
+ params.From = baseTime
+ params.To = baseTime + 4*tsdbtest.MinuteInMillis
+
+ set, err := querierV2.SelectDataFrame(params)
+ if err != nil {
+ suite.T().Fatalf("Failed to exeute query, err: %v", err)
+ }
+
+ var seriesCount int
+ for set.NextFrame() {
+ seriesCount++
+ frame := set.GetFrame()
+
+ assert.Equal(suite.T(), len(ingestedData), frame.Index().Len())
+ for i := 0; i < frame.Index().Len(); i++ {
+ t, err := frame.Index().TimeAt(i)
+ assert.NoError(suite.T(), err)
+ assert.Equal(suite.T(), ingestedData[i].Time, t.UnixNano()/int64(time.Millisecond))
+ }
+
+ for _, col := range frame.Columns() {
+ assert.Equal(suite.T(), len(ingestedData), col.Len())
+ for i := 0; i < col.Len(); i++ {
+ currentExpected := expected[col.Name()][i].Value
+ f, err := col.FloatAt(i)
+ assert.NoError(suite.T(), err)
+
+ if !(math.IsNaN(currentExpected) && math.IsNaN(f)) {
+ assert.Equal(suite.T(), currentExpected, f)
+ }
+ }
+ }
+ }
+
+ assert.Equal(suite.T(), 1, seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestAggregateSeriesWithAlias() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ if err != nil {
+ suite.T().Fatalf("failed to create v3io adapter. reason: %s", err)
+ }
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+ ingestData := []tsdbtest.DataPoint{{baseTime, 10},
+ {int64(baseTime + tsdbtest.MinuteInMillis), 20},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestData},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+ expectedResult := 40.0
+
+ querierV2, err := adapter.QuerierV2()
+ if err != nil {
+ suite.T().Fatalf("Failed to create querier v2, err: %v", err)
+ }
+
+ aliasName := "iguaz"
+ params, _, _ := pquerier.ParseQuery(fmt.Sprintf("select max(cpu) as %v", aliasName))
+
+ params.From = baseTime
+ params.To = baseTime + int64(numberOfEvents*eventsInterval)
+
+ set, err := querierV2.Select(params)
+ if err != nil {
+ suite.T().Fatalf("Failed to exeute query, err: %v", err)
+ }
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+ data, err := tsdbtest.IteratorToSlice(iter)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+ assert.Equal(suite.T(), 1, len(data), "queried data does not match expected")
+ assert.Equal(suite.T(), expectedResult, data[0].Value, "queried data does not match expected")
+
+ seriesName := set.At().Labels().Get(config.PrometheusMetricNameAttribute)
+ suite.Equal(aliasName, seriesName)
+ }
+
+ assert.Equal(suite.T(), 1, seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestStringAndFloatMetricsDataframe() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.NoError(err, "failed to create v3io adapter")
+
+ metricName1 := "cpu"
+ metricName2 := "log"
+ labels := utils.LabelsFromStringList("os", "linux")
+ labelsWithName := append(labels, utils.LabelsFromStringList("__name__", metricName2)...)
+
+ baseTime := suite.toMillis("2019-01-21T00:00:00Z")
+ expectedTimeColumn := []int64{baseTime, baseTime + tsdbtest.MinuteInMillis, baseTime + 2*tsdbtest.MinuteInMillis}
+ logData := []interface{}{"a", "b", "c"}
+ expectedColumns := map[string][]interface{}{metricName1: {10.0, 20.0, 30.0},
+ metricName2: logData}
+ appender, err := adapter.Appender()
+ suite.NoError(err, "failed to create v3io appender")
+
+ ref, err := appender.Add(labelsWithName, expectedTimeColumn[0], logData[0])
+ suite.NoError(err, "failed to add data to the TSDB appender")
+ for i := 1; i < len(expectedTimeColumn); i++ {
+ appender.AddFast(labels, ref, expectedTimeColumn[i], logData[i])
+ }
+
+ _, err = appender.WaitForCompletion(0)
+ suite.NoError(err, "failed to wait for TSDB append completion")
+
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: metricName1,
+ Labels: labels,
+ Data: []tsdbtest.DataPoint{{baseTime, 10},
+ {int64(baseTime + tsdbtest.MinuteInMillis), 20},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30}}},
+ }})
+
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ querierV2, err := adapter.QuerierV2()
+ suite.NoError(err, "failed to create querier")
+
+ params := &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: metricName1}, {Metric: metricName2}},
+ From: baseTime, To: baseTime + 5*tsdbtest.MinuteInMillis}
+ iter, err := querierV2.SelectDataFrame(params)
+ suite.NoError(err, "failed to execute query")
+
+ var seriesCount int
+ for iter.NextFrame() {
+ seriesCount++
+ frame := iter.GetFrame()
+ in := frame.Index()
+ cols := frame.Columns()
+
+ for i := 0; i < frame.Index().Len(); i++ {
+ t, _ := in.TimeAt(i)
+ timeMillis := t.UnixNano() / int64(time.Millisecond)
+ suite.Require().Equal(expectedTimeColumn[i], timeMillis, "time column does not match at index %v", i)
+ for _, column := range cols {
+ var v interface{}
+
+ if column.DType() == pquerier.FloatType {
+ v, _ = column.FloatAt(i)
+ } else if column.DType() == pquerier.StringType {
+ v, _ = column.StringAt(i)
+ } else {
+ suite.Failf("column type is not as expected: %v", column.DType().String())
+ }
+
+ suite.Require().Equal(expectedColumns[column.Name()][i], v, "column %v does not match at index %v", column.Name(), i)
+ }
+ }
+ }
+}
+
+func (suite *testQuerySuite) toMillis(date string) int64 {
+ t, err := time.Parse(time.RFC3339, date)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+ return t.Unix() * 1000
+}
+
+func (suite *testQuerySuite) TestCrossSeriesAggregatesTimesFallsOnStep() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.Require().NoError(err, "failed to create v3io adapter")
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ labels2 := utils.LabelsFromStringList("os", "mac")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime, 10},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 20},
+ {baseTime + 4*tsdbtest.MinuteInMillis, 30}}
+ ingestedData2 := []tsdbtest.DataPoint{{baseTime, 20},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 4*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels2,
+ Data: ingestedData2},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{
+ "sum": {{Time: baseTime, Value: 30},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 50},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 70}},
+ "min": {{Time: baseTime, Value: 10},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 20},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 30}},
+ "avg": {{Time: baseTime, Value: 15},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 25},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 35}}}
+
+ querierV2, err := adapter.QuerierV2()
+ suite.Require().NoError(err, "failed to create querier v2")
+
+ params := &pquerier.SelectParams{Name: "cpu", Functions: "sum_all,min_all,avg_all", Step: 2 * 60 * 1000, From: baseTime, To: baseTime + int64(numberOfEvents*eventsInterval)}
+ set, err := querierV2.Select(params)
+ suite.Require().NoError(err, "Failed to execute query")
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+
+ data, err := tsdbtest.IteratorToSlice(iter)
+ agg := set.At().Labels().Get(aggregate.AggregateLabel)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+
+ suite.Require().Equal(expected[agg], data, "queried data does not match expected")
+ }
+
+ suite.Require().Equal(len(expected), seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestCrossSeriesAggregates() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.Require().NoError(err, "failed to create v3io adapter")
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ labels2 := utils.LabelsFromStringList("os", "mac")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime, 10},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 20},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 4*tsdbtest.MinuteInMillis, 30}}
+ ingestedData2 := []tsdbtest.DataPoint{{baseTime, 20},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 4*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels2,
+ Data: ingestedData2},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{
+ "sum": {{Time: baseTime, Value: 30},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 50},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 70}},
+ "min": {{Time: baseTime, Value: 10},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 20},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 30}},
+ "avg": {{Time: baseTime, Value: 15},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 25},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 35}}}
+
+ querierV2, err := adapter.QuerierV2()
+ suite.Require().NoError(err, "failed to create querier v2")
+
+ params := &pquerier.SelectParams{Name: "cpu", Functions: "sum_all,min_all,avg_all", Step: 2 * 60 * 1000, From: baseTime, To: baseTime + int64(numberOfEvents*eventsInterval)}
+ set, err := querierV2.Select(params)
+ suite.Require().NoError(err, "Failed to execute query")
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+
+ data, err := tsdbtest.IteratorToSlice(iter)
+ agg := set.At().Labels().Get(aggregate.AggregateLabel)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+
+ suite.Require().Equal(expected[agg], data, "queried data does not match expected")
+ }
+
+ suite.Require().Equal(len(expected), seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestCrossSeriesAggregatesMultiPartition() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.Require().NoError(err, "failed to create v3io adapter")
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ labels2 := utils.LabelsFromStringList("os", "mac")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime - 7*tsdbtest.DaysInMillis, 10},
+ {baseTime - 7*tsdbtest.DaysInMillis + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime, 20},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 60}}
+ ingestedData2 := []tsdbtest.DataPoint{{baseTime - 7*tsdbtest.DaysInMillis, 20},
+ {baseTime - 7*tsdbtest.DaysInMillis + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime, 30},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels2,
+ Data: ingestedData2},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{
+ "max": {{Time: baseTime - 7*tsdbtest.DaysInMillis, Value: 20},
+ {Time: baseTime, Value: 30},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 60}}}
+
+ querierV2, err := adapter.QuerierV2()
+ suite.Require().NoError(err, "failed to create querier v2")
+
+ params := &pquerier.SelectParams{Name: "cpu", Functions: "max_all", Step: 2 * 60 * 1000, From: baseTime - 7*tsdbtest.DaysInMillis, To: baseTime + int64(numberOfEvents*eventsInterval)}
+ set, err := querierV2.Select(params)
+ suite.Require().NoError(err, "Failed to execute query")
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+
+ data, err := tsdbtest.IteratorToSlice(iter)
+ agg := set.At().Labels().Get(aggregate.AggregateLabel)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+
+ suite.Require().Equal(expected[agg], data, "queried data does not match expected")
+ }
+
+ suite.Require().Equal(len(expected), seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestCrossSeriesAggregatesWithInterpolation() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.Require().NoError(err, "failed to create v3io adapter")
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ labels2 := utils.LabelsFromStringList("os", "mac")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime, 10},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 20},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 5*tsdbtest.MinuteInMillis, 40}}
+ ingestedData2 := []tsdbtest.DataPoint{{baseTime, 20},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 4*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels2,
+ Data: ingestedData2},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{
+ "sum": {{Time: baseTime, Value: 30},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 50},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 70}},
+ "min": {{Time: baseTime, Value: 10},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 20},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 30}},
+ "max": {{Time: baseTime, Value: 20},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 30},
+ {Time: baseTime + 4*tsdbtest.MinuteInMillis, Value: 40}}}
+
+ querierV2, err := adapter.QuerierV2()
+ suite.Require().NoError(err, "failed to create querier v2")
+
+ selectParams, _, err := pquerier.ParseQuery("select sum_all(prev(cpu)), min_all(prev(cpu)), max_all(prev(cpu))")
+ suite.NoError(err)
+ selectParams.Step = 2 * tsdbtest.MinuteInMillis
+ selectParams.From = baseTime
+ selectParams.To = baseTime + 5*tsdbtest.MinuteInMillis
+ set, err := querierV2.Select(selectParams)
+ suite.Require().NoError(err, "Failed to execute query")
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+
+ data, err := tsdbtest.IteratorToSlice(iter)
+ agg := set.At().Labels().Get(aggregate.AggregateLabel)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+
+ suite.Require().Equal(expected[agg], data, "queried data does not match expected")
+ }
+
+ suite.Require().Equal(len(expected), seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestCrossSeriesAggregatesMultiPartitionExactlyOnStep() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.Require().NoError(err, "failed to create v3io adapter")
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ labels2 := utils.LabelsFromStringList("os", "mac")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime - 7*tsdbtest.DaysInMillis, 10},
+ {baseTime - 7*tsdbtest.DaysInMillis + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime, 20},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 60}}
+ ingestedData2 := []tsdbtest.DataPoint{{baseTime - 7*tsdbtest.DaysInMillis, 20},
+ {baseTime - 7*tsdbtest.DaysInMillis + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime, 30},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels2,
+ Data: ingestedData2},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{
+ "sum": {{Time: baseTime - 7*tsdbtest.DaysInMillis, Value: 30},
+ {Time: baseTime, Value: 50},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 100}},
+ "min": {{Time: baseTime - 7*tsdbtest.DaysInMillis, Value: 10},
+ {Time: baseTime, Value: 20},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 40}},
+ "avg": {{Time: baseTime - 7*tsdbtest.DaysInMillis, Value: 15},
+ {Time: baseTime, Value: 25},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 50}}}
+
+ querierV2, err := adapter.QuerierV2()
+ suite.Require().NoError(err, "failed to create querier v2")
+
+ selectParams, _, err := pquerier.ParseQuery("select sum_all(prev(cpu)), min_all(prev(cpu)),avg_all(prev(cpu))")
+ suite.NoError(err)
+ selectParams.Step = 2 * tsdbtest.MinuteInMillis
+ selectParams.From = baseTime - 7*tsdbtest.DaysInMillis
+ selectParams.To = baseTime + 5*tsdbtest.MinuteInMillis
+ set, err := querierV2.Select(selectParams)
+ suite.Require().NoError(err, "Failed to execute query")
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+
+ data, err := tsdbtest.IteratorToSlice(iter)
+ agg := set.At().Labels().Get(aggregate.AggregateLabel)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+
+ suite.Require().Equal(expected[agg], data, "queried data does not match expected")
+ }
+
+ suite.Require().Equal(len(expected), seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestCrossSeriesAggregatesMultiPartitionWithInterpolation() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.Require().NoError(err, "failed to create v3io adapter")
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ labels2 := utils.LabelsFromStringList("os", "mac")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime - 7*tsdbtest.DaysInMillis, 10},
+ {baseTime - 7*tsdbtest.DaysInMillis + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime - 7*tsdbtest.DaysInMillis + 3*tsdbtest.MinuteInMillis, 20},
+ {baseTime, 20},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 2*tsdbtest.MinuteInMillis, 60}}
+ ingestedData2 := []tsdbtest.DataPoint{{baseTime - 7*tsdbtest.DaysInMillis, 20},
+ {baseTime - 7*tsdbtest.DaysInMillis + 2*tsdbtest.MinuteInMillis, 1},
+ {baseTime, 30},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 1},
+ {baseTime + 3*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels2,
+ Data: ingestedData2},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{
+ "sum": {{Time: baseTime - 7*tsdbtest.DaysInMillis, Value: 30},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 2*tsdbtest.MinuteInMillis, Value: 2},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 4*tsdbtest.MinuteInMillis, Value: 21},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 6*tsdbtest.MinuteInMillis, Value: 21},
+ {Time: baseTime, Value: 50},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 61}},
+ "count": {{Time: baseTime - 7*tsdbtest.DaysInMillis, Value: 2},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 2*tsdbtest.MinuteInMillis, Value: 2},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 4*tsdbtest.MinuteInMillis, Value: 2},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 6*tsdbtest.MinuteInMillis, Value: 2},
+ {Time: baseTime, Value: 2},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 2}},
+ "min": {{Time: baseTime - 7*tsdbtest.DaysInMillis, Value: 10},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 2*tsdbtest.MinuteInMillis, Value: 1},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 4*tsdbtest.MinuteInMillis, Value: 1},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 6*tsdbtest.MinuteInMillis, Value: 1},
+ {Time: baseTime, Value: 20},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 1}},
+ "avg": {{Time: baseTime - 7*tsdbtest.DaysInMillis, Value: 15},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 2*tsdbtest.MinuteInMillis, Value: 1},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 4*tsdbtest.MinuteInMillis, Value: 10.5},
+ {Time: baseTime - 7*tsdbtest.DaysInMillis + 6*tsdbtest.MinuteInMillis, Value: 10.5},
+ {Time: baseTime, Value: 25},
+ {Time: baseTime + 2*tsdbtest.MinuteInMillis, Value: 30.5}}}
+
+ querierV2, err := adapter.QuerierV2()
+ suite.Require().NoError(err, "failed to create querier v2")
+
+ selectParams, _, err := pquerier.ParseQuery("select sum_all(prev(cpu)), min_all(prev(cpu)),avg_all(prev(cpu)),count_all(prev(cpu))")
+ suite.NoError(err)
+ selectParams.Step = 2 * tsdbtest.MinuteInMillis
+ selectParams.From = baseTime - 7*tsdbtest.DaysInMillis
+ selectParams.To = baseTime + 5*tsdbtest.MinuteInMillis
+ set, err := querierV2.Select(selectParams)
+ suite.Require().NoError(err, "Failed to execute query")
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+
+ data, err := tsdbtest.IteratorToSlice(iter)
+ agg := set.At().Labels().Get(aggregate.AggregateLabel)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+
+ suite.Require().Equal(expected[agg], data, "queried data does not match expected")
+ }
+
+ suite.Require().Equal(len(expected), seriesCount, "series count didn't match expected")
+}
+
+func (suite *testQuerySuite) TestCrossSeriesAggregatesWithInterpolationOverTolerance() {
+ adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
+ suite.Require().NoError(err, "failed to create v3io adapter")
+
+ labels1 := utils.LabelsFromStringList("os", "linux")
+ labels2 := utils.LabelsFromStringList("os", "mac")
+ numberOfEvents := 10
+ eventsInterval := 60 * 1000
+ baseTime := tsdbtest.NanosToMillis(time.Now().UnixNano()) - int64(numberOfEvents*eventsInterval)
+
+ ingestedData := []tsdbtest.DataPoint{{baseTime, 10},
+ {baseTime + 1*tsdbtest.MinuteInMillis, 20},
+ {baseTime + 10*tsdbtest.MinuteInMillis, 30}}
+ ingestedData2 := []tsdbtest.DataPoint{{baseTime, 20},
+ {baseTime + 5*tsdbtest.MinuteInMillis, 30},
+ {baseTime + 10*tsdbtest.MinuteInMillis, 40}}
+ testParams := tsdbtest.NewTestParams(suite.T(),
+ tsdbtest.TestOption{
+ Key: tsdbtest.OptTimeSeries,
+ Value: tsdbtest.TimeSeries{tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels1,
+ Data: ingestedData},
+ tsdbtest.Metric{
+ Name: "cpu",
+ Labels: labels2,
+ Data: ingestedData2},
+ }})
+ tsdbtest.InsertData(suite.T(), testParams)
+
+ expected := map[string][]tsdbtest.DataPoint{
+ "sum": {{Time: baseTime, Value: 30},
+ {Time: baseTime + 5*tsdbtest.MinuteInMillis, Value: 30},
+ {Time: baseTime + 10*tsdbtest.MinuteInMillis, Value: 70}},
+ "min": {{Time: baseTime, Value: 10},
+ {Time: baseTime + 5*tsdbtest.MinuteInMillis, Value: 30},
+ {Time: baseTime + 10*tsdbtest.MinuteInMillis, Value: 30}},
+ "max": {{Time: baseTime, Value: 20},
+ {Time: baseTime + 5*tsdbtest.MinuteInMillis, Value: 30},
+ {Time: baseTime + 10*tsdbtest.MinuteInMillis, Value: 40}}}
+
+ querierV2, err := adapter.QuerierV2()
+ suite.Require().NoError(err, "failed to create querier v2")
+
+ selectParams, _, err := pquerier.ParseQuery("select sum_all(prev(cpu)), min_all(prev(cpu)), max_all(prev(cpu))")
+ suite.NoError(err)
+ selectParams.Step = 5 * tsdbtest.MinuteInMillis
+ selectParams.From = baseTime
+ selectParams.To = baseTime + 10*tsdbtest.MinuteInMillis
+ for i := 0; i < len(selectParams.RequestedColumns); i++ {
+ selectParams.RequestedColumns[i].InterpolationTolerance = tsdbtest.MinuteInMillis
+ }
+ set, err := querierV2.Select(selectParams)
+ suite.Require().NoError(err, "Failed to execute query")
+
+ var seriesCount int
+ for set.Next() {
+ seriesCount++
+ iter := set.At().Iterator()
+
+ data, err := tsdbtest.IteratorToSlice(iter)
+ agg := set.At().Labels().Get(aggregate.AggregateLabel)
+ if err != nil {
+ suite.T().Fatal(err)
+ }
+
+ suite.Require().Equal(expected[agg], data, "queried data does not match expected")
+ }
+
+ suite.Require().Equal(len(expected), seriesCount, "series count didn't match expected")
}
func TestQueryV2Suite(t *testing.T) {
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go
index 3d3f6d4ef50..961cc176f6a 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go
@@ -173,19 +173,48 @@ func (queryCtx *selectQueryContext) queryPartition(partition *partmgr.DBPartitio
newQuery := &partQuery{mint: mint, maxt: maxt, partition: partition, step: step}
if aggregationParams != nil {
- newQuery.preAggregated = aggregationParams.CanAggregate(partition.AggrType())
- if newQuery.preAggregated || !queryCtx.queryParams.disableClientAggr {
+ // Cross series aggregations cannot use server side aggregates.
+ newQuery.useServerSideAggregates = aggregationParams.CanAggregate(partition.AggrType()) && !queryCtx.isCrossSeriesAggregate
+ if newQuery.useServerSideAggregates || !queryCtx.queryParams.disableClientAggr {
newQuery.aggregationParams = aggregationParams
}
}
- err = newQuery.getItems(queryCtx, metric, requestAggregatesAndRaw)
+ var preAggregateLabels []string
+ if newQuery.useServerSideAggregates && !requestAggregatesAndRaw {
+ preAggregateLabels = queryCtx.parsePreAggregateLabels(partition)
+ }
+ err = newQuery.getItems(queryCtx, metric, preAggregateLabels, requestAggregatesAndRaw)
queries = append(queries, newQuery)
}
return queries, err
}
+func (queryCtx *selectQueryContext) parsePreAggregateLabels(partition *partmgr.DBPartition) []string {
+ if queryCtx.queryParams.GroupBy != "" {
+ groupByLabelSlice := strings.Split(queryCtx.queryParams.GroupBy, ",")
+ groupByLabelSet := make(map[string]bool)
+ for _, groupByLabel := range groupByLabelSlice {
+ groupByLabelSet[groupByLabel] = true
+ }
+ outer:
+ for _, preAggr := range partition.PreAggregates() {
+ if len(preAggr.Labels) != len(groupByLabelSet) {
+ continue
+ }
+ for _, label := range preAggr.Labels {
+ if !groupByLabelSet[label] {
+ continue outer
+ }
+ }
+ sort.Strings(groupByLabelSlice)
+ return groupByLabelSlice
+ }
+ }
+ return nil
+}
+
func (queryCtx *selectQueryContext) startCollectors() error {
queryCtx.requestChannels = make([]chan *qryResults, queryCtx.workers)
@@ -278,8 +307,7 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error
queryCtx.isAllMetrics,
queryCtx.getResultBucketsSize(),
results.IsServerAggregates(),
- queryCtx.showAggregateLabel,
- encoding)
+ queryCtx.showAggregateLabel)
if err != nil {
return err
}
@@ -341,9 +369,20 @@ func (queryCtx *selectQueryContext) createColumnSpecs() ([]columnMeta, map[strin
for metric, cols := range columnsSpecByMetric {
var aggregatesMask aggregate.AggrType
var aggregates []aggregate.AggrType
+ var metricInterpolationType InterpolationType
for _, colSpec := range cols {
aggregatesMask |= colSpec.function
aggregates = append(aggregates, colSpec.function)
+
+ if metricInterpolationType == 0 {
+ if colSpec.interpolationType != 0 {
+ metricInterpolationType = colSpec.interpolationType
+ }
+ } else if colSpec.interpolationType != 0 && colSpec.interpolationType != metricInterpolationType {
+ return nil, nil, fmt.Errorf("multiple interpolation for the same metric are not supported, got %v and %v",
+ metricInterpolationType.String(),
+ colSpec.interpolationType.String())
+ }
}
// Add hidden aggregates only if there the user specified aggregations
@@ -355,6 +394,16 @@ func (queryCtx *selectQueryContext) createColumnSpecs() ([]columnMeta, map[strin
columnsSpecByMetric[metric] = append(columnsSpecByMetric[metric], hiddenCol)
}
}
+
+ // After creating all columns set their interpolation function
+ for i := 0; i < len(columnsSpecByMetric[metric]); i++ {
+ columnsSpecByMetric[metric][i].interpolationType = metricInterpolationType
+ }
+ for i, col := range columnsSpec {
+ if col.metric == metric {
+ columnsSpec[i].interpolationType = metricInterpolationType
+ }
+ }
}
if len(columnsSpec) == 0 {
@@ -405,6 +454,9 @@ func (queryCtx *selectQueryContext) getResultBucketsSize() int {
if queryCtx.isRawQuery() {
return 0
}
+ if queryCtx.queryParams.To-queryCtx.queryParams.From == queryCtx.queryParams.Step {
+ return 1
+ }
return int((queryCtx.queryParams.To-queryCtx.queryParams.From)/queryCtx.queryParams.Step + 1)
}
@@ -420,27 +472,31 @@ type partQuery struct {
attrs []string
step int64
- chunk0Time int64
- chunkTime int64
- preAggregated bool
- aggregationParams *aggregate.AggregationParams
+ chunk0Time int64
+ chunkTime int64
+ useServerSideAggregates bool
+ aggregationParams *aggregate.AggregationParams
}
-func (query *partQuery) getItems(ctx *selectQueryContext, name string, aggregatesAndChunk bool) error {
+func (query *partQuery) getItems(ctx *selectQueryContext, name string, preAggregateLabels []string, aggregatesAndChunk bool) error {
path := query.partition.GetTablePath()
+ if len(preAggregateLabels) > 0 {
+ path = fmt.Sprintf("%sagg/%s/", path, strings.Join(preAggregateLabels, ","))
+ }
+
var shardingKeys []string
if name != "" {
shardingKeys = query.partition.GetShardingKeys(name)
}
- attrs := []string{config.LabelSetAttrName, config.EncodingAttrName, config.MetricNameAttrName, config.MaxTimeAttrName}
+ attrs := []string{config.LabelSetAttrName, config.EncodingAttrName, config.MetricNameAttrName, config.MaxTimeAttrName, config.ObjectNameAttrName}
- if query.preAggregated {
+ if query.useServerSideAggregates {
query.attrs = query.aggregationParams.GetAttrNames()
}
// It is possible to request both server aggregates and raw chunk data (to downsample) for the same metric
// example: `select max(cpu), avg(cpu), cpu` with step = 1h
- if !query.preAggregated || aggregatesAndChunk {
+ if !query.useServerSideAggregates || aggregatesAndChunk {
chunkAttr, chunk0Time := query.partition.Range2Attrs("v", query.mint, query.maxt)
query.chunk0Time = chunk0Time
query.attrs = append(query.attrs, chunkAttr...)
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/selectQueryContext_test.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/selectQueryContext_test.go
index 543c4091f37..9aed2e6b2bf 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/selectQueryContext_test.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/selectQueryContext_test.go
@@ -26,19 +26,19 @@ func TestCreateColumnSpecs(t *testing.T) {
{params: SelectParams{Name: "cpu", Functions: "avg"},
expectedSpecs: []columnMeta{{metric: "cpu", function: toAggr("avg"), interpolationType: interpolateNext},
- {metric: "cpu", function: toAggr("count"), isHidden: true},
- {metric: "cpu", function: toAggr("sum"), isHidden: true}},
+ {metric: "cpu", function: toAggr("count"), isHidden: true, interpolationType: interpolateNext},
+ {metric: "cpu", function: toAggr("sum"), isHidden: true, interpolationType: interpolateNext}},
expectedSpecsMap: map[string][]columnMeta{"cpu": {{metric: "cpu", function: toAggr("avg"), interpolationType: interpolateNext},
- {metric: "cpu", function: toAggr("count"), isHidden: true},
- {metric: "cpu", function: toAggr("sum"), isHidden: true}}}},
+ {metric: "cpu", function: toAggr("count"), isHidden: true, interpolationType: interpolateNext},
+ {metric: "cpu", function: toAggr("sum"), isHidden: true, interpolationType: interpolateNext}}}},
{params: SelectParams{Name: "cpu", Functions: "avg,count"},
expectedSpecs: []columnMeta{{metric: "cpu", function: toAggr("avg"), interpolationType: interpolateNext},
{metric: "cpu", function: toAggr("count"), interpolationType: interpolateNext},
- {metric: "cpu", function: toAggr("sum"), isHidden: true}},
+ {metric: "cpu", function: toAggr("sum"), isHidden: true, interpolationType: interpolateNext}},
expectedSpecsMap: map[string][]columnMeta{"cpu": {{metric: "cpu", function: toAggr("avg"), interpolationType: interpolateNext},
{metric: "cpu", function: toAggr("count"), interpolationType: interpolateNext},
- {metric: "cpu", function: toAggr("sum"), isHidden: true}}}},
+ {metric: "cpu", function: toAggr("sum"), isHidden: true, interpolationType: interpolateNext}}}},
{params: SelectParams{RequestedColumns: []RequestedColumn{{Metric: "cpu", Function: "count"}}},
expectedSpecs: []columnMeta{{metric: "cpu", function: toAggr("count")}},
@@ -77,6 +77,40 @@ func TestCreateColumnSpecs(t *testing.T) {
{metric: "cpu", function: toAggr("count"), interpolationType: interpolateNext}},
"diskio": {{metric: "diskio", function: toAggr("sum"), interpolationType: interpolateNext},
{metric: "diskio", function: toAggr("count"), interpolationType: interpolateNext}}}},
+
+ {params: SelectParams{RequestedColumns: []RequestedColumn{{Metric: "cpu", Function: "sum", Interpolator: "linear"},
+ {Metric: "cpu", Function: "count", Interpolator: "linear"}}},
+ expectedSpecs: []columnMeta{{metric: "cpu", function: toAggr("sum"), interpolationType: interpolateLinear},
+ {metric: "cpu", function: toAggr("count"), interpolationType: interpolateLinear}},
+ expectedSpecsMap: map[string][]columnMeta{"cpu": {{metric: "cpu", function: toAggr("sum"), interpolationType: interpolateLinear},
+ {metric: "cpu", function: toAggr("count"), interpolationType: interpolateLinear}}}},
+
+ {params: SelectParams{RequestedColumns: []RequestedColumn{{Metric: "cpu", Function: "sum", Interpolator: "linear"},
+ {Metric: "cpu", Function: "count"}}},
+ expectedSpecs: []columnMeta{{metric: "cpu", function: toAggr("sum"), interpolationType: interpolateLinear},
+ {metric: "cpu", function: toAggr("count"), interpolationType: interpolateLinear}},
+ expectedSpecsMap: map[string][]columnMeta{"cpu": {{metric: "cpu", function: toAggr("sum"), interpolationType: interpolateLinear},
+ {metric: "cpu", function: toAggr("count"), interpolationType: interpolateLinear}}}},
+
+ {params: SelectParams{RequestedColumns: []RequestedColumn{{Metric: "cpu", Function: "avg", Interpolator: "linear"},
+ {Metric: "cpu", Function: "count"}}},
+ expectedSpecs: []columnMeta{{metric: "cpu", function: toAggr("avg"), interpolationType: interpolateLinear},
+ {metric: "cpu", function: toAggr("sum"), interpolationType: interpolateLinear, isHidden: true},
+ {metric: "cpu", function: toAggr("count"), interpolationType: interpolateLinear}},
+ expectedSpecsMap: map[string][]columnMeta{"cpu": {{metric: "cpu", function: toAggr("avg"), interpolationType: interpolateLinear},
+ {metric: "cpu", function: toAggr("count"), interpolationType: interpolateLinear},
+ {metric: "cpu", function: toAggr("sum"), interpolationType: interpolateLinear, isHidden: true}}}},
+
+ {params: SelectParams{RequestedColumns: []RequestedColumn{{Metric: "cpu", Function: "count", Interpolator: "linear"},
+ {Metric: "diskio", Function: "count", Interpolator: "prev"},
+ {Metric: "diskio", Function: "sum"}}},
+ expectedSpecs: []columnMeta{{metric: "cpu", function: toAggr("count"), interpolationType: interpolateLinear},
+ {metric: "diskio", function: toAggr("count"), interpolationType: interpolatePrev},
+ {metric: "diskio", function: toAggr("sum"), interpolationType: interpolatePrev}},
+ expectedSpecsMap: map[string][]columnMeta{"cpu": {{metric: "cpu", function: toAggr("count"), interpolationType: interpolateLinear}},
+ "diskio": {
+ {metric: "diskio", function: toAggr("count"), interpolationType: interpolatePrev},
+ {metric: "diskio", function: toAggr("sum"), interpolationType: interpolatePrev}}}},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go
index b2ef256291b..498ba5243a4 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go
@@ -10,16 +10,21 @@ import (
"github.com/v3io/v3io-tsdb/pkg/utils"
)
-func NewDataFrameColumnSeries(indexColumn, dataColumn, countColumn Column, labels utils.Labels, hash uint64, showAggregateLabel bool, encoding chunkenc.Encoding) *DataFrameColumnSeries {
+func NewDataFrameColumnSeries(indexColumn, dataColumn, countColumn Column, labels utils.Labels, hash uint64, showAggregateLabel bool) *DataFrameColumnSeries {
// If we need to return the Aggregate label then add it, otherwise (for example in prometheus) return labels without it
if showAggregateLabel {
labels = append(labels, utils.LabelsFromStringList(aggregate.AggregateLabel, dataColumn.GetColumnSpec().function.String())...)
}
+ wantedMetricName := dataColumn.GetColumnSpec().alias
+ if wantedMetricName == "" {
+ wantedMetricName = dataColumn.GetColumnSpec().metric
+ }
+
// The labels we get from the Dataframe are agnostic to the metric name, since there might be several metrics in one Dataframe
- labels = append(labels, utils.LabelsFromStringList(config.PrometheusMetricNameAttribute, dataColumn.GetColumnSpec().metric)...)
+ labels = append(labels, utils.LabelsFromStringList(config.PrometheusMetricNameAttribute, wantedMetricName)...)
s := &DataFrameColumnSeries{labels: labels, key: hash}
- s.iter = &dataFrameColumnSeriesIterator{indexColumn: indexColumn, dataColumn: dataColumn, countColumn: countColumn, currentIndex: -1, encoding: encoding}
+ s.iter = &dataFrameColumnSeriesIterator{indexColumn: indexColumn, dataColumn: dataColumn, countColumn: countColumn, currentIndex: -1}
return s
}
@@ -43,7 +48,6 @@ type dataFrameColumnSeriesIterator struct {
currentIndex int
err error
- encoding chunkenc.Encoding
}
func (it *dataFrameColumnSeriesIterator) Seek(seekT int64) bool {
@@ -101,7 +105,13 @@ func (it *dataFrameColumnSeriesIterator) Next() bool {
func (it *dataFrameColumnSeriesIterator) Err() error { return it.err }
-func (it *dataFrameColumnSeriesIterator) Encoding() chunkenc.Encoding { return it.encoding }
+func (it *dataFrameColumnSeriesIterator) Encoding() chunkenc.Encoding {
+ enc := chunkenc.EncXOR
+ if it.dataColumn.DType() == StringType {
+ enc = chunkenc.EncVariant
+ }
+ return enc
+}
func (it *dataFrameColumnSeriesIterator) getNextValidCell(from int) (nextIndex int) {
for nextIndex = from + 1; nextIndex < it.dataColumn.Len() && !it.doesCellHasData(nextIndex); nextIndex++ {
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go
index ab28b21c99f..55618c03e4d 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go
@@ -4,6 +4,7 @@ import (
"fmt"
"strings"
+ "github.com/v3io/v3io-tsdb/pkg/utils"
"github.com/xwb1989/sqlparser"
)
@@ -43,19 +44,9 @@ func ParseQuery(sql string) (*SelectParams, string, error) {
switch expr := col.Expr.(type) {
case *sqlparser.FuncExpr:
- switch firstExpr := expr.Exprs[0].(type) {
- case *sqlparser.AliasedExpr:
- cc := firstExpr.Expr.(*sqlparser.ColName)
- currCol.Function = sqlparser.String(expr.Name)
- currCol.Interpolator = removeBackticks(sqlparser.String(cc.Qualifier.Name)) // Some of the interpolators are parsed with a `
- currCol.Metric = sqlparser.String(cc.Name)
- case *sqlparser.StarExpr:
- // Appending column with empty metric name, meaning a column template with the given aggregate
- currCol.Function = sqlparser.String(expr.Name)
- }
+ parseFuncExpr(expr, &currCol)
case *sqlparser.ColName:
currCol.Metric = sqlparser.String(expr.Name)
- currCol.Interpolator = removeBackticks(sqlparser.String(expr.Qualifier.Name)) // Some of the interpolators are parsed with a `
default:
return nil, "", fmt.Errorf("unknown columns type - %T", col.Expr)
}
@@ -82,6 +73,47 @@ func ParseQuery(sql string) (*SelectParams, string, error) {
return selectParams, fromTable, nil
}
+func parseFuncExpr(expr *sqlparser.FuncExpr, destCol *RequestedColumn) error {
+ possibleInterpolator := removeBackticks(sqlparser.String(expr.Name))
+ if _, err := StrToInterpolateType(possibleInterpolator); err == nil {
+ destCol.Interpolator = possibleInterpolator
+ numOfParameters := len(expr.Exprs)
+ if numOfParameters == 1 {
+ collName := expr.Exprs[0].(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName)
+ destCol.Metric = sqlparser.String(collName)
+ } else if numOfParameters == 2 {
+ collName := expr.Exprs[0].(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName)
+ destCol.Metric = sqlparser.String(collName)
+ toleranceVal := expr.Exprs[1].(*sqlparser.AliasedExpr).Expr.(*sqlparser.SQLVal)
+ toleranceString := sqlparser.String(toleranceVal)
+
+ // SQLVal cannot start with a number so it has to be surrounded with ticks.
+ // Stripping ticks
+ tolerance, err := utils.Str2duration(toleranceString[1 : len(toleranceString)-1])
+ if err != nil {
+ return err
+ }
+ destCol.InterpolationTolerance = tolerance
+ } else {
+ return fmt.Errorf("unssoported number of parameters for function %v", possibleInterpolator)
+ }
+ } else {
+ destCol.Function = sqlparser.String(expr.Name)
+
+ switch firstExpr := expr.Exprs[0].(type) {
+ case *sqlparser.AliasedExpr:
+ switch innerExpr := firstExpr.Expr.(type) {
+ case *sqlparser.ColName:
+ destCol.Metric = sqlparser.String(innerExpr.Name)
+ case *sqlparser.FuncExpr:
+ parseFuncExpr(innerExpr, destCol)
+ }
+ }
+ }
+
+ return nil
+}
+
func getTableName(slct *sqlparser.Select) (string, error) {
if nTables := len(slct.From); nTables != 1 {
return "", fmt.Errorf("select from multiple tables is not supported (got %d)", nTables)
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go
index 8b106ed54e3..2f2324010bf 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go
@@ -1,56 +1,68 @@
// +build unit
-package pquerier
+package pquerier_test
import (
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/v3io/v3io-tsdb/pkg/pquerier"
+ "github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest"
)
func TestParseQuery(t *testing.T) {
testCases := []struct {
input string
- output *SelectParams
+ output *pquerier.SelectParams
outputTable string
}{
{input: "select columnA, columnB",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA"}, {Metric: "columnB"}}}},
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "columnA"}, {Metric: "columnB"}}}},
- {input: "select linear.columnA",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Interpolator: "linear"}}}},
+ {input: "select linear(columnA, '10m')",
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "columnA",
+ Interpolator: "linear",
+ InterpolationTolerance: 10 * tsdbtest.MinuteInMillis}}}},
- {input: "select max(prev.columnA), avg(columnB)",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Interpolator: "prev", Function: "max"},
+ {input: "select max(prev(columnA)), avg(columnB)",
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "columnA", Interpolator: "prev", Function: "max"},
+ {Metric: "columnB", Function: "avg"}}}},
+
+ {input: "select max(prev(columnA, '1h')) as ahsheli, avg(columnB)",
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "columnA",
+ Interpolator: "prev",
+ Function: "max",
+ Alias: "ahsheli",
+ InterpolationTolerance: tsdbtest.HoursInMillis},
{Metric: "columnB", Function: "avg"}}}},
{input: "select columnA where columnB = 'tal' and columnC < 'Neiman'",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA"}}, Filter: "columnB == 'tal' and columnC < 'Neiman'"}},
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "columnA"}}, Filter: "columnB == 'tal' and columnC < 'Neiman'"}},
{input: "select max(columnA) group by columnB",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "max"}}, GroupBy: "columnB"}},
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "columnA", Function: "max"}}, GroupBy: "columnB"}},
- {input: "select min(columnA) as bambi, max(linear.columnB) as bimba where columnB >= 123 group by columnB,columnC ",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "min", Alias: "bambi"},
+ {input: "select min(columnA) as bambi, max(linear(columnB)) as bimba where columnB >= 123 group by columnB,columnC ",
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "columnA", Function: "min", Alias: "bambi"},
{Metric: "columnB", Function: "max", Interpolator: "linear", Alias: "bimba"}},
Filter: "columnB >= 123", GroupBy: "columnB, columnC"}},
{input: "select min(columnA) from my_table where columnB >= 123",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "min"}},
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "columnA", Function: "min"}},
Filter: "columnB >= 123"},
outputTable: "my_table"},
{input: "select * from my_table",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: ""}}},
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: ""}}},
outputTable: "my_table"},
{input: "select max(*), avg(*) from my_table",
- output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "", Function: "max"}, {Metric: "", Function: "avg"}}},
+ output: &pquerier.SelectParams{RequestedColumns: []pquerier.RequestedColumn{{Metric: "", Function: "max"}, {Metric: "", Function: "avg"}}},
outputTable: "my_table"},
}
for _, test := range testCases {
t.Run(test.input, func(tt *testing.T) {
- queryParams, table, err := ParseQuery(test.input)
+ queryParams, table, err := pquerier.ParseQuery(test.input)
if err != nil {
tt.Fatal(err)
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/schema/schema.go b/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/schema/schema.go
index c8a9af0d4c7..f769f4b664b 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/schema/schema.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/schema/schema.go
@@ -16,11 +16,12 @@ const (
Version = 2
)
-func NewSchema(v3ioCfg *config.V3ioConfig, samplesIngestionRate, aggregationGranularity, aggregatesList string) (*config.Schema, error) {
+func NewSchema(v3ioCfg *config.V3ioConfig, samplesIngestionRate, aggregationGranularity, aggregatesList string, crossLabelSets string) (*config.Schema, error) {
return newSchema(
samplesIngestionRate,
aggregationGranularity,
aggregatesList,
+ crossLabelSets,
v3ioCfg.MinimumChunkSize,
v3ioCfg.MaximumChunkSize,
v3ioCfg.MaximumSampleSize,
@@ -29,7 +30,7 @@ func NewSchema(v3ioCfg *config.V3ioConfig, samplesIngestionRate, aggregationGran
v3ioCfg.ShardingBucketsCount)
}
-func newSchema(samplesIngestionRate, aggregationGranularity, aggregatesList string, minChunkSize, maxChunkSize, maxSampleSize, maxPartitionSize, sampleRetention, shardingBucketsCount int) (*config.Schema, error) {
+func newSchema(samplesIngestionRate, aggregationGranularity, aggregatesList string, crossLabelSets string, minChunkSize, maxChunkSize, maxSampleSize, maxPartitionSize, sampleRetention, shardingBucketsCount int) (*config.Schema, error) {
rateInHours, err := rateToHours(samplesIngestionRate)
if err != nil {
return nil, errors.Wrapf(err, "Invalid samples ingestion rate (%s).", samplesIngestionRate)
@@ -49,6 +50,12 @@ func newSchema(samplesIngestionRate, aggregationGranularity, aggregatesList stri
return nil, errors.Wrapf(err, "Failed to parse aggregates list '%s'.", aggregatesList)
}
+ parsedCrossLabelSets := aggregate.ParseCrossLabelSets(crossLabelSets)
+
+ if len(parsedCrossLabelSets) > 0 && len(aggregates) == 0 {
+ return nil, errors.New("Cross label aggregations must be used in conjunction with aggregations.")
+ }
+
defaultRollup := config.Rollup{
Aggregates: []string{},
AggregationGranularity: aggregationGranularity,
@@ -57,12 +64,23 @@ func newSchema(samplesIngestionRate, aggregationGranularity, aggregatesList stri
LayerRetentionTime: config.DefaultLayerRetentionTime, //TODO: make configurable
}
+ var preaggregates []config.PreAggregate
+ for _, labelSet := range parsedCrossLabelSets {
+ preaggregate := config.PreAggregate{
+ Labels: labelSet,
+ Granularity: aggregationGranularity,
+ Aggregates: aggregates,
+ }
+ preaggregates = append(preaggregates, preaggregate)
+ }
+
tableSchema := config.TableSchema{
Version: Version,
RollupLayers: []config.Rollup{defaultRollup},
ShardingBucketsCount: shardingBucketsCount,
PartitionerInterval: partitionInterval,
ChunckerInterval: chunkInterval,
+ PreAggregates: preaggregates,
}
if len(aggregates) == 0 {
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest/testutils/schema.go b/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest/testutils/schema.go
index 42f7c2167a5..bae13b4bd0a 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest/testutils/schema.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest/testutils/schema.go
@@ -13,7 +13,7 @@ func CreateSchema(t testing.TB, aggregates string) *config.Schema {
t.Fatalf("Failed to obtain a TSDB configuration. Error: %v", err)
}
- schm, err := schema.NewSchema(v3ioCfg, "1/s", "1h", aggregates)
+ schm, err := schema.NewSchema(v3ioCfg, "1/s", "1h", aggregates, "")
if err != nil {
t.Fatalf("Failed to create a TSDB schema. Error: %v", err)
}
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/create.go b/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/create.go
index 9434c10860d..eaa420473d7 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/create.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/create.go
@@ -34,6 +34,7 @@ type createCommandeer struct {
path string
storageClass string
defaultRollups string
+ crossLabelSets string
aggregationGranularity string
shardingBucketsCount int
sampleRetention int
@@ -50,7 +51,7 @@ func newCreateCommandeer(rootCommandeer *RootCommandeer) *createCommandeer {
Short: "Create a new TSDB instance",
Long: `Create a new TSDB instance (table) according to the provided configuration.`,
Example: `- tsdbctl create -s 192.168.1.100:8081 -u myuser -p mypassword -c mycontainer -t my_tsdb -r 1/s
-- tsdbctl create -s 192.168.204.14:8081 -u janed -p OpenSesame -c bigdata -t my_dbs/metrics_table -r 60/m -a "min,avg,stddev" -i 3h
+- tsdbctl create -s 192.168.204.14:8081 -u janed -p OpenSesame -c bigdata -t my_dbs/metrics_table -r 60/m -a "min,avg,stddev" -i 3h -l label1,label2;label3
- tsdbctl create -g ~/my_tsdb_cfg.yaml -k 9c1f3e75-a521-4b0d-b640-68c86417df2f -c admin_container -t perf_metrics -r "100/h"
(where ~/my_tsdb_cfg.yaml sets "webApiEndpoint" to the endpoint of the web-gateway service)`,
RunE: func(cmd *cobra.Command, args []string) error {
@@ -62,6 +63,8 @@ func newCreateCommandeer(rootCommandeer *RootCommandeer) *createCommandeer {
cmd.Flags().StringVarP(&commandeer.defaultRollups, "aggregates", "a", "",
"Default aggregates to calculate in real time during\nthe samples ingestion, as a comma-separated list of\nsupported aggregation functions - count | avg | sum |\nmin | max | stddev | stdvar | last | rate.\nExample: \"sum,avg,max\".")
+ cmd.Flags().StringVarP(&commandeer.crossLabelSets, "cross-label", "l", "",
+ "Label sets for which cross-label pre-aggregations should be created. Must be used in conjunction with -a.\nExample: \"label1,label2;label3\".")
cmd.Flags().StringVarP(&commandeer.aggregationGranularity, "aggregation-granularity", "i", config.DefaultAggregationGranularity,
"Aggregation granularity - a time interval for applying\nthe aggregation functions (if configured - see the\n-a|--aggregates flag), of the format \"[0-9]+[mhd]\"\n(where 'm' = minutes, 'h' = hours, and 'd' = days).\nExamples: \"2h\"; \"90m\".")
cmd.Flags().IntVarP(&commandeer.shardingBucketsCount, "sharding-buckets", "b", config.DefaultShardingBucketsCount,
@@ -88,7 +91,8 @@ func (cc *createCommandeer) create() error {
cc.rootCommandeer.v3iocfg,
cc.samplesIngestionRate,
cc.aggregationGranularity,
- cc.defaultRollups)
+ cc.defaultRollups,
+ cc.crossLabelSets)
if err != nil {
return errors.Wrap(err, "Failed to create a TSDB schema.")
diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/utils/asynciter.go b/vendor/github.com/v3io/v3io-tsdb/pkg/utils/asynciter.go
index 7b2b2c135e7..e786ccfbf7d 100644
--- a/vendor/github.com/v3io/v3io-tsdb/pkg/utils/asynciter.go
+++ b/vendor/github.com/v3io/v3io-tsdb/pkg/utils/asynciter.go
@@ -21,11 +21,14 @@ such restriction.
package utils
import (
+ "fmt"
"net/http"
+ "strings"
"github.com/nuclio/logger"
"github.com/pkg/errors"
"github.com/v3io/v3io-go-http"
+ "github.com/v3io/v3io-tsdb/pkg/config"
)
type ItemsCursor interface {
@@ -132,25 +135,33 @@ func (ic *AsyncItemsCursor) Next() bool {
// NextItem gets the next matching item. this may potentially block as this lazy loads items from the collection
func (ic *AsyncItemsCursor) NextItem() (v3io.Item, error) {
+ for {
+ // are there any more items left in the previous response we received?
+ if ic.itemIndex < len(ic.items) {
+ ic.currentItem = ic.items[ic.itemIndex]
+ ic.currentError = nil
- // are there any more items left in the previous response we received?
- if ic.itemIndex < len(ic.items) {
- ic.currentItem = ic.items[ic.itemIndex]
- ic.currentError = nil
+ // next time we'll give next item
+ ic.itemIndex++
+ ic.Cnt++
- // next time we'll give next item
- ic.itemIndex++
- ic.Cnt++
+ return ic.currentItem, nil
+ }
- return ic.currentItem, nil
- }
+ // are there any more items up stream? did all the shards complete ?
+ if ic.lastShards == ic.workers {
+ ic.currentError = nil
+ return nil, nil
+ }
- // are there any more items up stream? did all the shards complete ?
- if ic.lastShards == ic.workers {
- ic.currentError = nil
- return nil, nil
+ err := ic.processResponse()
+ if err != nil {
+ return nil, err
+ }
}
+}
+func (ic *AsyncItemsCursor) processResponse() error {
// Read response from channel
resp := <-ic.responseChan
defer resp.Release()
@@ -159,11 +170,11 @@ func (ic *AsyncItemsCursor) NextItem() (v3io.Item, error) {
if e, hasErrorCode := resp.Error.(v3io.ErrorWithStatusCode); hasErrorCode && e.StatusCode() == http.StatusNotFound {
ic.logger.Debug("Got 404 - error: %v, request: %v", resp.Error, resp.Request().Input)
ic.lastShards++
- return ic.NextItem()
+ return nil
}
if resp.Error != nil {
ic.logger.Warn("error reading from response channel: %v, error: %v, request: %v", resp, resp.Error, resp.Request().Input)
- return nil, errors.Wrap(resp.Error, "Failed to get next items")
+ return errors.Wrap(resp.Error, "Failed to get next items")
}
getItemsResp := resp.Output.(*v3io.GetItemsOutput)
@@ -172,6 +183,23 @@ func (ic *AsyncItemsCursor) NextItem() (v3io.Item, error) {
ic.items = getItemsResp.Items
ic.itemIndex = 0
+ conf, err := config.GetOrDefaultConfig()
+ if err != nil {
+ return err
+ }
+
+ // until IGZ-2.0 there is a bug in Nginx regarding range-scan, the following code is a mitigation for it.
+ if conf.DisableNginxMitigation {
+ ic.sendNextGetItemsOld(resp)
+ } else {
+ ic.sendNextGetItemsNew(resp)
+ }
+
+ return nil
+}
+
+func (ic *AsyncItemsCursor) sendNextGetItemsOld(resp *v3io.Response) error {
+ getItemsResp := resp.Output.(*v3io.GetItemsOutput)
if !getItemsResp.Last {
// if not last, make a new request to that shard
@@ -182,7 +210,54 @@ func (ic *AsyncItemsCursor) NextItem() (v3io.Item, error) {
_, err := ic.container.GetItems(input, input, ic.responseChan)
if err != nil {
- return nil, errors.Wrap(err, "Failed to request next items")
+ return errors.Wrap(err, "Failed to request next items")
+ }
+
+ } else {
+ // Mark one more shard as completed
+ ic.lastShards++
+ }
+
+ return nil
+}
+
+func (ic *AsyncItemsCursor) sendNextGetItemsNew(resp *v3io.Response) error {
+ getItemsResp := resp.Output.(*v3io.GetItemsOutput)
+ if len(getItemsResp.Items) > 0 {
+
+ // if not last, make a new request to that shard
+ input := resp.Context.(*v3io.GetItemsInput)
+
+ if getItemsResp.NextMarker == "" || getItemsResp.NextMarker == input.Marker {
+ lastItemObjectName, err := ic.items[len(ic.items)-1].GetFieldString(config.ObjectNameAttrName)
+
+ splittdObjectName := strings.Split(lastItemObjectName, ".")
+
+ // If we have the __name attribute and the query was a range scan query and the data is in range-scan format
+ if len(splittdObjectName) == 2 && input.ShardingKey != "" && err == nil {
+ lastSortingKey := splittdObjectName[1]
+
+ ic.logger.Debug("getting next items after calculating next marker for %v%v is %v for the object=%v", input.Path, input.ShardingKey, lastSortingKey, lastItemObjectName)
+ input.SortKeyRangeStart = lastSortingKey + "0"
+ input.Marker = ""
+ } else {
+ // In case it is names query
+ if getItemsResp.Last {
+ ic.lastShards++
+ return nil
+ } else {
+ input.Marker = getItemsResp.NextMarker
+ }
+ }
+ } else {
+ // set next marker
+ input.Marker = getItemsResp.NextMarker
+ ic.logger.Debug("getting next items for %v%v with given next marker %v", input.Path, input.ShardingKey, input.Marker)
+ }
+
+ _, err := ic.container.GetItems(input, input, ic.responseChan)
+ if err != nil {
+ return errors.Wrap(err, fmt.Sprintf("Failed to request next items for input=%v", input))
}
} else {
@@ -190,8 +265,7 @@ func (ic *AsyncItemsCursor) NextItem() (v3io.Item, error) {
ic.lastShards++
}
- // and recurse into next now that we repopulated response
- return ic.NextItem()
+ return nil
}
// gets all items
diff --git a/vendor/github.com/v3io/v3io-tsdb/travis_v3io.yaml b/vendor/github.com/v3io/v3io-tsdb/travis_v3io.yaml
index 6ca83d72841..49654869e97 100644
--- a/vendor/github.com/v3io/v3io-tsdb/travis_v3io.yaml
+++ b/vendor/github.com/v3io/v3io-tsdb/travis_v3io.yaml
@@ -1,6 +1,6 @@
container: "bigdata"
workers: 1
-webApiEndpoint: "webapi.iguazio.app.fnqtlobecymq.dev.trl.iguazio.com:8081"
+webApiEndpoint: "webapi.iguazio.app.tsdb-integration.iguazio-cd0.com"
username: "iguazio"
-password: "IqXUHmIe2op3Q02S"
+password: "e244737b57d86fde05f747e47514c4c2"
loglevel: "debug"
diff --git a/vendor/github.com/v3io/v3io-tsdb/travis_v3io_bench.yaml b/vendor/github.com/v3io/v3io-tsdb/travis_v3io_bench.yaml
index 2dee2c7829d..b9c08721875 100644
--- a/vendor/github.com/v3io/v3io-tsdb/travis_v3io_bench.yaml
+++ b/vendor/github.com/v3io/v3io-tsdb/travis_v3io_bench.yaml
@@ -1,8 +1,8 @@
container: "bigdata"
workers: 8
-webApiEndpoint: "webapi.iguazio.app.fnqtlobecymq.dev.trl.iguazio.com:8081"
+webApiEndpoint: "webapi.iguazio.app.tsdb-integration.iguazio-cd0.com"
username: "iguazio"
-password: "IqXUHmIe2op3Q02S"
+password: "e244737b57d86fde05f747e47514c4c2"
batchSize: 2048
performance:
diff --git a/vendor/github.com/v3io/v3io-tsdb/vendor/github.com/v3io/v3io-go-http/synccontainer.go b/vendor/github.com/v3io/v3io-tsdb/vendor/github.com/v3io/v3io-go-http/synccontainer.go
index 5ae5b479d6d..ba8f9f9097c 100644
--- a/vendor/github.com/v3io/v3io-tsdb/vendor/github.com/v3io/v3io-go-http/synccontainer.go
+++ b/vendor/github.com/v3io/v3io-tsdb/vendor/github.com/v3io/v3io-go-http/synccontainer.go
@@ -207,6 +207,14 @@ func (sc *SyncContainer) GetItems(input *GetItemsInput) (*Response, error) {
body["Segment"] = input.Segment
}
+ if input.SortKeyRangeStart != "" {
+ body["SortKeyRangeStart"] = input.SortKeyRangeStart
+ }
+
+ if input.SortKeyRangeEnd != "" {
+ body["SortKeyRangeEnd"] = input.SortKeyRangeEnd
+ }
+
marshalledBody, err := json.Marshal(body)
if err != nil {
return nil, err
@@ -236,6 +244,13 @@ func (sc *SyncContainer) GetItems(input *GetItemsInput) (*Response, error) {
return nil, err
}
+ //validate getItems response to avoid infinite loop
+ if getItemsResponse.LastItemIncluded != "TRUE" && (getItemsResponse.NextMarker == "" || getItemsResponse.NextMarker == input.Marker) {
+ errMsg := fmt.Sprintf("Invalid getItems response: lastItemIncluded='false' and nextMarker='%s', " +
+ "startMarker='%s', probably due to object size bigger than 2M. Query is: %+v", getItemsResponse.NextMarker, input.Marker, input)
+ sc.logger.Warn(errMsg)
+ }
+
getItemsOutput := GetItemsOutput{
NextMarker: getItemsResponse.NextMarker,
Last: getItemsResponse.LastItemIncluded == "TRUE",
diff --git a/vendor/github.com/v3io/v3io-tsdb/vendor/github.com/v3io/v3io-go-http/types.go b/vendor/github.com/v3io/v3io-tsdb/vendor/github.com/v3io/v3io-go-http/types.go
index 9efe18c47ec..bef5c993304 100644
--- a/vendor/github.com/v3io/v3io-tsdb/vendor/github.com/v3io/v3io-go-http/types.go
+++ b/vendor/github.com/v3io/v3io-tsdb/vendor/github.com/v3io/v3io-go-http/types.go
@@ -178,14 +178,16 @@ type GetItemOutput struct {
}
type GetItemsInput struct {
- Path string
- AttributeNames []string
- Filter string
- Marker string
- ShardingKey string
- Limit int
- Segment int
- TotalSegments int
+ Path string
+ AttributeNames []string
+ Filter string
+ Marker string
+ ShardingKey string
+ Limit int
+ Segment int
+ TotalSegments int
+ SortKeyRangeStart string
+ SortKeyRangeEnd string
}
type GetItemsOutput struct {
@@ -229,7 +231,7 @@ type DeleteStreamInput struct {
type SeekShardInputType int
const (
- SeekShardInputTypeTime SeekShardInputType = iota
+ SeekShardInputTypeTime SeekShardInputType = iota
SeekShardInputTypeSequence
SeekShardInputTypeLatest
SeekShardInputTypeEarliest