Skip to content

Commit

Permalink
feat(tsm): Allow for deletion of series outside default rp (#25312)
Browse files Browse the repository at this point in the history
* feat(tsm): Allow for deletion of series outside default RP
9d116f6
This PR adds the ability for deletion of series that are outside
of the default retention policy. This updates InfluxQL to include changes
from: influxdata/influxql#71

closes: influxdata/feature-requests#175

* feat(tsm): Allow for deletion of series outside default RP
9d116f6
This PR adds the ability for deletion of series that are outside
of the default retention policy. This updates InfluxQL to include changes
from: influxdata/influxql#71

closes: influxdata/feature-requests#175
  • Loading branch information
devanbenz authored Sep 17, 2024
1 parent fdf0df7 commit 8eaa24d
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/influxdata/flux v0.194.5
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxql v1.2.0
github.com/influxdata/influxql v1.3.0
github.com/influxdata/pkg-config v0.2.11
github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ github.com/influxdata/influxdb-iox-client-go v1.0.0-beta.1 h1:zDmAiE2o3Y/YZinI6C
github.com/influxdata/influxdb-iox-client-go v1.0.0-beta.1/go.mod h1:Chl4pz0SRqoPmEavex4vZaQlunqXqrtEPWAN54THFfo=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/influxql v1.1.0/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
github.com/influxdata/influxql v1.2.0 h1:EkgnTLCmaXeZKEjA6G+B7a/HH+Gl7GVLO0k2AoZbJMU=
github.com/influxdata/influxql v1.2.0/go.mod h1:nISAma2m+CbSt/y3GrehnHKWJRXdTTMZn+iSGroMmJw=
github.com/influxdata/influxql v1.3.0 h1:z2VZEK1LGItlUW8LLiKzX8duC5t6/SDM7Doj2rKs8Kw=
github.com/influxdata/influxql v1.3.0/go.mod h1:VqxAKyQz5p8GzgGsxWalCWYGxEqw6kvJo2IickMQiQk=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
Expand Down
112 changes: 108 additions & 4 deletions tests/server_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,18 @@ func init() {
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=100 %d`, mustParseTime(time.RFC3339Nano, "2000-01-02T00:00:00Z").UnixNano())},
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-03T00:00:00Z").UnixNano())},
&Write{db: "db0", rp: "rp1", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-03T00:00:00Z").UnixNano())},
&Write{db: "db0", rp: "rp1", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-04T00:00:00Z").UnixNano())},
&Write{db: "db0", rp: "rp1", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-05T00:00:00Z").UnixNano())},
&Write{db: "db1", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
&Write{db: "db1", rp: "rp1", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-03-01T00:00:00Z").UnixNano())},
// Queries for wildcard deletes
&Write{db: "db2", rp: "rp2", data: fmt.Sprintf(`cpu1,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-05T00:00:00Z").UnixNano())},
&Write{db: "db2", rp: "rp2", data: fmt.Sprintf(`cpu2,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-06T00:00:00Z").UnixNano())},
&Write{db: "db2", rp: "rp2", data: fmt.Sprintf(`gpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-06T00:00:00Z").UnixNano())},
&Write{db: "db2", rp: "rp3", data: fmt.Sprintf(`cpu1,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-05T00:00:00Z").UnixNano())},
&Write{db: "db2", rp: "rp3", data: fmt.Sprintf(`cpu2,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-06T00:00:00Z").UnixNano())},
&Write{db: "db2", rp: "rp3", data: fmt.Sprintf(`gpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-06T00:00:00Z").UnixNano())},
},
queries: []*Query{
&Query{
Expand All @@ -231,6 +242,43 @@ func init() {
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Show series is present in retention policy 0",
command: `SHOW SERIES`,
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}, "rp": []string{"rp0"}},
},
&Query{
name: "Show series is present in retention policy 1",
command: `SHOW SERIES`,
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}, "rp": []string{"rp1"}},
},
&Query{
name: "Delete series in retention policy only across shards",
command: `DELETE FROM rp1.cpu WHERE time < '2000-01-05T00:00:00Z'`,
exp: `{"results":[{"statement_id":0}]}`,
params: url.Values{"db": []string{"db0"}},
once: true,
},
&Query{
name: "Series is not deleted from db1",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-03-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
params: url.Values{"db": []string{"db1"}, "rp": []string{"rp1"}},
},
&Query{
name: "First two series across first two shards are deleted in rp1 on db0 and third shard still has data",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-05T00:00:00Z","serverA","uswest",200]]}]}]}`,
params: url.Values{"db": []string{"db0"}, "rp": []string{"rp1"}},
},
&Query{
name: "Show series is present in database",
command: `SHOW SERIES`,
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}, "rp": []string{"rp0"}},
},
&Query{
name: "Delete series",
command: `DELETE FROM cpu WHERE time < '2000-01-03T00:00:00Z'`,
Expand All @@ -242,23 +290,23 @@ func init() {
name: "Show series still exists",
command: `SHOW SERIES`,
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
params: url.Values{"db": []string{"db0"}, "rp": []string{"rp0"}},
},
&Query{
name: "Make sure last point still exists",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
params: url.Values{"db": []string{"db0"}, "rp": []string{"rp0"}},
},
&Query{
name: "Make sure data wasn't deleted from other database.",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-03-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
params: url.Values{"db": []string{"db1"}},
},
&Query{
name: "Delete remaining instances of series",
command: `DELETE FROM cpu WHERE time < '2000-01-04T00:00:00Z'`,
command: `DELETE FROM cpu WHERE time < '2000-01-06T00:00:00Z'`,
exp: `{"results":[{"statement_id":0}]}`,
params: url.Values{"db": []string{"db0"}},
},
Expand All @@ -268,6 +316,62 @@ func init() {
exp: `{"results":[{"statement_id":0}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Delete cpu* wildcard from series in rp2 db2",
command: `DELETE FROM rp2./cpu*/`,
exp: `{"results":[{"statement_id":0}]}`,
params: url.Values{"db": []string{"db2"}},
once: true,
},
{
name: "Show that cpu is not in series for rp2 db2",
command: `SELECT * FROM rp2./cpu*/`,
exp: `{"results":[{"statement_id":0}]}`,
params: url.Values{"db": []string{"db2"}},
once: true,
},
&Query{
name: "Show that gpu is still in series for rp2 db2",
command: `SELECT * FROM rp2./gpu*/`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"gpu","columns":["time","host","region","val"],"values":[["2000-01-06T00:00:00Z","serverA","uswest",200]]}]}]}`,
params: url.Values{"db": []string{"db2"}},
once: true,
},
{
name: "Show that cpu is in series for rp3 db2",
command: `SELECT * FROM rp3./cpu*/`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu1","columns":["time","host","region","val"],"values":[["2000-01-05T00:00:00Z","serverA","uswest",200]]},{"name":"cpu2","columns":["time","host","region","val"],"values":[["2000-01-06T00:00:00Z","serverA","uswest",200]]}]}]}`,
params: url.Values{"db": []string{"db2"}},
once: true,
},
&Query{
name: "Show that gpu is still in series for rp3 db2",
command: `SELECT * FROM rp3./gpu*/`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"gpu","columns":["time","host","region","val"],"values":[["2000-01-06T00:00:00Z","serverA","uswest",200]]}]}]}`,
params: url.Values{"db": []string{"db2"}},
once: true,
},
&Query{
name: "Delete with wildcard in retention policy should fail parsing",
command: `DELETE FROM /rp*/.gpu`,
exp: `{"error":"error parsing query: found ., expected ; at line 1, char 18"}`,
params: url.Values{"db": []string{"db2"}},
once: true,
},
&Query{
name: "Delete with database in query should fail parsing",
command: `DELETE FROM db2..gpu`,
exp: `{"error":"error parsing query: database not supported at line 1, char 1"}`,
params: url.Values{"db": []string{"db2"}},
once: true,
},
&Query{
name: "Delete with empty retention policy and database should fail parsing",
command: `DELETE FROM ...gpu`,
exp: `{"error":"error parsing query: found ., expected identifier at line 1, char 13"}`,
params: url.Values{"db": []string{"db2"}},
once: true,
},
},
}

Expand Down
65 changes: 54 additions & 11 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,9 +1050,7 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
// unknown database, nothing to do
return nil
}
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == database && sh.retentionPolicy == name
})
shards := s.filterShards(ComposeShardFilter(byDatabase(database), byRetentionPolicy(name)))
s.mu.RUnlock()

// Close and delete all shards under the retention policy on the
Expand Down Expand Up @@ -1144,14 +1142,42 @@ func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard {
return shards
}

type ShardPredicate = func(sh *Shard) bool

func ComposeShardFilter(fns ...ShardPredicate) ShardPredicate {
return func(sh *Shard) bool {
for _, fn := range fns {
if !fn(sh) {
return false
}
}

return true
}
}

// byDatabase provides a predicate for filterShards that matches on the name of
// the database passed in.
func byDatabase(name string) func(sh *Shard) bool {
func byDatabase(name string) ShardPredicate {
return func(sh *Shard) bool {
return sh.database == name
}
}

// byRetentionPolicy provides a predicate for filterShards that matches on the name of
// the retention policy passed in.
func byRetentionPolicy(name string) ShardPredicate {
return func(sh *Shard) bool {
return sh.retentionPolicy == name
}
}

func byIndexType(name string) ShardPredicate {
return func(sh *Shard) bool {
return sh.IndexType() == name
}
}

// walkShards apply a function to each shard in parallel. fn must be safe for
// concurrent use. If any of the functions return an error, the first error is
// returned.
Expand Down Expand Up @@ -1496,7 +1522,28 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
// No series file means nothing has been written to this DB and thus nothing to delete.
return nil
}
shards := s.filterShards(byDatabase(database))

shardFilterFn := byDatabase(database)
if len(sources) != 0 {
var rp string
for idx, source := range sources {
if measurement, ok := source.(*influxql.Measurement); ok {
if idx == 0 {
rp = measurement.RetentionPolicy
} else if rp != measurement.RetentionPolicy {
return fmt.Errorf("mixed retention policies not supported, wanted %q got %q", rp, measurement.RetentionPolicy)
}
} else {
return fmt.Errorf("unsupported source type in delete %v", source)
}
}

if rp != "" {
shardFilterFn = ComposeShardFilter(shardFilterFn, byRetentionPolicy(rp))
}
}
shards := s.filterShards(shardFilterFn)

epochs := s.epochsForShards(shards)
s.mu.RUnlock()

Expand Down Expand Up @@ -1684,9 +1731,7 @@ func (s *Store) MeasurementNames(ctx context.Context, auth query.FineAuthorizer,
if s.EngineOptions.IndexVersion != TSI1IndexName {
return nil, fmt.Errorf("retention policy filter for measurements not supported for index %s", s.EngineOptions.IndexVersion)
}
filterFunc = func(sh *Shard) bool {
return sh.Database() == database && sh.RetentionPolicy() == retentionPolicy
}
filterFunc = ComposeShardFilter(byDatabase(database), byRetentionPolicy(retentionPolicy))
}

s.mu.RLock()
Expand Down Expand Up @@ -2166,9 +2211,7 @@ func (s *Store) monitorShards() {
}

s.mu.RLock()
shards := s.filterShards(func(sh *Shard) bool {
return sh.IndexType() == InmemIndexName
})
shards := s.filterShards(byIndexType(InmemIndexName))
s.mu.RUnlock()

// No inmem shards...
Expand Down
30 changes: 30 additions & 0 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,36 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
}
}

// Ensure the delete series method does not return an error when multiple sources are passed.
func TestStore_DeleteSeries_MultipleSources(t *testing.T) {
t.Parallel()

test := func(index string) {
s := MustOpenStore(index)
defer s.Close()

if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatal("expected shard")
}

if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
t.Fatal(err)
} else if sh := s.Shard(2); sh == nil {
t.Fatal("expected shard")
}

if err := s.DeleteSeries("db0", []influxql.Source{&influxql.Measurement{Name: string("foo")}, &influxql.Measurement{Name: string("bar")}}, nil); err != nil {
t.Fatal("DeleteSeries should not fail with multiple sources")
}
}

for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
}
}

// Ensure the store can delete an existing shard.
func TestStore_DeleteShard(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit 8eaa24d

Please sign in to comment.