Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: respect some errors from scanning measurements #24995

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions cmd/influx_inspect/buildtsi/buildtsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,12 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i

for _, key := range cache.Keys() {
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
name, tags := models.ParseKeyBytes(seriesKey)
name, tags, err := models.ParseKeyBytes(seriesKey)

if err != nil {
log.Error("Error parsing key while creating series", zap.String("key", string(seriesKey)), zap.Error(err))
return err
}
if verboseLogging {
log.Info("Series", zap.String("name", string(name)), zap.String("tags", tags.String()))
}
Expand Down Expand Up @@ -496,11 +500,16 @@ func IndexTSMFile(index *tsi1.Index, path string, batchSize int, log *zap.Logger
tagsBatch := make([]models.Tags, batchSize)
var ti int
for i := 0; i < r.KeyCount(); i++ {
var err error
key, _ := r.KeyAt(i)
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
var name []byte
name, tagsBatch[ti] = models.ParseKeyBytesWithTags(seriesKey, tagsBatch[ti])
name, tagsBatch[ti], err = models.ParseKeyBytesWithTags(seriesKey, tagsBatch[ti])

if err != nil {
log.Error("Error parsing key while creating series", zap.String("key", string(seriesKey)), zap.Error(err))
return err
}
if verboseLogging {
log.Info("Series", zap.String("name", string(name)), zap.String("tags", tagsBatch[ti].String()))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/influx_tools/importer/series_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (sw *seriesWriter) AddSeries(key []byte) error {
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
sw.keys = append(sw.keys, seriesKey)

name, tag := models.ParseKeyBytes(seriesKey)
name, tag, _ := models.ParseKeyBytes(seriesKey)
sw.names = append(sw.names, name)
sw.tags = append(sw.tags, tag)

Expand Down
20 changes: 13 additions & 7 deletions models/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,19 +293,22 @@ func ParsePointsString(buf string) ([]Point, error) {
// NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf.
// This can have the unintended effect preventing buf from being garbage collected.
func ParseKey(buf []byte) (string, Tags) {
name, tags := ParseKeyBytes(buf)
name, tags, _ := ParseKeyBytes(buf)
return string(name), tags
}

func ParseKeyBytes(buf []byte) ([]byte, Tags) {
func ParseKeyBytes(buf []byte) ([]byte, Tags, error) {
return ParseKeyBytesWithTags(buf, nil)
}

func ParseKeyBytesWithTags(buf []byte, tags Tags) ([]byte, Tags) {
func ParseKeyBytesWithTags(buf []byte, tags Tags) ([]byte, Tags, error) {
// Ignore the error because scanMeasurement returns "missing fields" which we ignore
// when just parsing a key
state, i, _ := scanMeasurement(buf, 0)
state, i, err := scanMeasurement(buf, 0)

if err != nil && !errors.Is(err, ErrNoFields) {
return nil, nil, err
}
var name []byte
if state == tagKeyState {
tags = parseTags(buf, tags)
Expand All @@ -314,7 +317,7 @@ func ParseKeyBytesWithTags(buf []byte, tags Tags) ([]byte, Tags) {
} else {
name = buf[:i]
}
return unescapeMeasurement(name), tags
return unescapeMeasurement(name), tags, nil
}

func ParseTags(buf []byte) Tags {
Expand Down Expand Up @@ -593,21 +596,24 @@ const (
fieldsState
)

var ErrNoMeasurement = errors.New("missing measurement")
var ErrNoFields = errors.New("missing fields")

// scanMeasurement examines the measurement part of a Point, returning
// the next state to move to, and the current location in the buffer.
func scanMeasurement(buf []byte, i int) (int, int, error) {
// Check first byte of measurement, anything except a comma is fine.
// It can't be a space, since whitespace is stripped prior to this
// function call.
if i >= len(buf) || buf[i] == ',' {
return -1, i, fmt.Errorf("missing measurement")
return -1, i, ErrNoMeasurement
}

for {
i++
if i >= len(buf) {
// cpu
return -1, i, fmt.Errorf("missing fields")
return -1, i, ErrNoFields
}

if buf[i-1] == '\\' {
Expand Down
2 changes: 1 addition & 1 deletion models/points_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2419,7 +2419,7 @@ func TestParseKeyBytes(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.input, func(t *testing.T) {
name, tags := models.ParseKeyBytes([]byte(testCase.input))
name, tags, _ := models.ParseKeyBytes([]byte(testCase.input))
if !bytes.Equal([]byte(testCase.expectedName), name) {
t.Errorf("%s produced measurement %s but expected %s", testCase.input, string(name), testCase.expectedName)
}
Expand Down
2 changes: 1 addition & 1 deletion services/storage/series_cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestSeriesCursorValuer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.n, func(t *testing.T) {
var sc indexSeriesCursor
sc.row.Name, sc.row.SeriesTags = models.ParseKeyBytes([]byte(tc.m))
sc.row.Name, sc.row.SeriesTags, _ = models.ParseKeyBytes([]byte(tc.m))
sc.field.n = sc.row.SeriesTags.GetString(fieldKey)
sc.row.SeriesTags.Delete(fieldKeyBytes)

Expand Down
2 changes: 1 addition & 1 deletion storage/reads/aggregate_resultset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type mockReadCursor struct {
func newMockReadCursor(keys ...string) mockReadCursor {
rows := make([]reads.SeriesRow, len(keys))
for i := range keys {
rows[i].Name, rows[i].SeriesTags = models.ParseKeyBytes([]byte(keys[i]))
rows[i].Name, rows[i].SeriesTags, _ = models.ParseKeyBytes([]byte(keys[i]))
rows[i].Tags = rows[i].SeriesTags.Clone()
var itrs cursors.CursorIterators
cur := &mockCursorIterator{
Expand Down
2 changes: 1 addition & 1 deletion storage/reads/group_resultset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ type sliceSeriesCursor struct {
func newSeriesRows(keys ...string) []reads.SeriesRow {
rows := make([]reads.SeriesRow, len(keys))
for i := range keys {
rows[i].Name, rows[i].SeriesTags = models.ParseKeyBytes([]byte(keys[i]))
rows[i].Name, rows[i].SeriesTags, _ = models.ParseKeyBytes([]byte(keys[i]))
rows[i].Tags = rows[i].SeriesTags.Clone()
rows[i].Tags.Set([]byte("_m"), rows[i].Name)
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
continue // This key was wiped because it shouldn't be removed from index.
}

name, tags := models.ParseKeyBytes(k)
name, tags, _ := models.ParseKeyBytes(k)
sid := e.sfile.SeriesID(name, tags, buf)
if sid == 0 {
continue
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2926,7 +2926,7 @@ func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) {
if len(itr.keys) == 0 {
return nil, nil
}
name, tags := models.ParseKeyBytes(itr.keys[0])
name, tags, _ := models.ParseKeyBytes(itr.keys[0])
s := series{name: name, tags: tags}
itr.keys = itr.keys[1:]
return s, nil
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
}

// Extract measurement name & tags.
name, tags := models.ParseKeyBytes(key)
name, tags, _ := models.ParseKeyBytes(key)

// If there are cached sets for any of the tag pairs, they will need to be
// updated with the series id.
Expand Down
2 changes: 1 addition & 1 deletion tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2558,7 +2558,7 @@ func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) {
if len(itr.keys) == 0 {
return nil, nil
}
name, tags := models.ParseKeyBytes(itr.keys[0])
name, tags, _ := models.ParseKeyBytes(itr.keys[0])
s := series{name: name, tags: tags}
itr.keys = itr.keys[1:]
return s, nil
Expand Down