Skip to content

Commit

Permalink
[fix #100] support current pd's safepoint update interface (#101)
Browse files Browse the repository at this point in the history
* use current pd safepoint update interface

Signed-off-by: haojinming <[email protected]>

* add unit test for safepoint update

Signed-off-by: haojinming <[email protected]>

* fix golangci-lint check

Signed-off-by: haojinming <[email protected]>

* fix golangci-lint check

Signed-off-by: haojinming <[email protected]>

* fix review comments

Signed-off-by: haojinming <[email protected]>
  • Loading branch information
haojinming authored May 18, 2022
1 parent 760def3 commit 613073d
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 18 deletions.
9 changes: 2 additions & 7 deletions gc-worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,12 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/stretchr/testify v1.7.0
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
github.com/tikv/pd v1.1.0-beta.0.20220428091252-fc74bea31d5d
github.com/tikv/pd/client v0.0.0-20220428091252-fc74bea31d5d
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.43.0
)

replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

replace github.com/pingcap/kvproto => github.com/AmoebaProtozoa/kvproto v0.0.0-20220427045408-abeb7dbc9f22

replace github.com/tikv/pd => github.com/AmoebaProtozoa/pd v1.1.0-beta.0.20220427094035-c5944e39ae5e

replace github.com/tikv/pd/client => github.com/AmoebaProtozoa/pd/client v0.0.0-20220427094035-c5944e39ae5e
20 changes: 14 additions & 6 deletions gc-worker/go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA=
github.com/AmoebaProtozoa/kvproto v0.0.0-20220427045408-abeb7dbc9f22 h1:f32Y8ZPmZ0bpGe+FEXRu1XpHb/veNvEX3XsrH0ytMkU=
github.com/AmoebaProtozoa/kvproto v0.0.0-20220427045408-abeb7dbc9f22/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/AmoebaProtozoa/pd v1.1.0-beta.0.20220427094035-c5944e39ae5e h1:VJQvjHSwedoJUuo5FzC7WUk9avaFh+vNELPhMtesnao=
github.com/AmoebaProtozoa/pd v1.1.0-beta.0.20220427094035-c5944e39ae5e/go.mod h1:XCqfmj+6hch3za2cNF1ynL8beslaFJQGnp0eVAh+ZNE=
github.com/AmoebaProtozoa/pd/client v0.0.0-20220427094035-c5944e39ae5e h1:2XMbiFrDRqo/zFAe7vbzHIJJ0tIue5E5sMbh2Myz47U=
github.com/AmoebaProtozoa/pd/client v0.0.0-20220427094035-c5944e39ae5e/go.mod h1:42eGJQinsZvsoQcfOij9n5fMKpLVs9RMRxuiNSXbMpM=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down Expand Up @@ -137,8 +131,10 @@ github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-graphviz v0.0.9/go.mod h1:wXVsXxmyMQU6TN3zGRttjNn3h+iCAS7xQFC6TlNvLhk=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
Expand All @@ -150,6 +146,7 @@ github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -253,6 +250,7 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -343,6 +341,10 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 h1:i4MBe1zGq9/r3BH6rTRunizi4T59fpNk8hvBCrB5UAY=
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -449,6 +451,10 @@ github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd v1.1.0-beta.0.20220428091252-fc74bea31d5d h1:qM2WjQdql0Gmfs7szh061r+R3Nw/obY2Ld0f+N0kAuI=
github.com/tikv/pd v1.1.0-beta.0.20220428091252-fc74bea31d5d/go.mod h1:BtnlCFzmxS1qStpC3GpsRKwgKBOiEaVXTJP25NDcWXE=
github.com/tikv/pd/client v0.0.0-20220428091252-fc74bea31d5d h1:wHmEG7TApku56YBYwwp8o6DXj0oTXAyVrFMMRnFx56c=
github.com/tikv/pd/client v0.0.0-20220428091252-fc74bea31d5d/go.mod h1:EpfgtTOi0/zRU8njoCkwxzGiCiuXGLkEZsE9oJQyxcs=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1gBkr4QyP8=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -616,6 +622,7 @@ golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydths
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
Expand Down Expand Up @@ -650,6 +657,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1N
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c h1:hrpEMCZ2O7DR5gC1n2AJGVhrwiEjOi35+jxtIuZpTMo=
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
Expand Down
41 changes: 36 additions & 5 deletions gc-worker/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
Expand All @@ -36,10 +37,11 @@ import (
const (
etcdTimeout = time.Duration(3) * time.Second
// etcdElectionPath for all gcworker servers.
etcdElectionPath = "/gc-worker/election"
etcdElectionVal = "local"
maxPdMsgSize = int(128 * units.MiB)
updateSafePointRetryCnt = int(10)
etcdElectionPath = "/gc-worker/election"
etcdElectionVal = "local"
maxPdMsgSize = int(128 * units.MiB)
gcWorkerSafePointTTL = math.MaxInt64 // Sets TTL to MAX to make it permanently valid.
gcWorkerServiceID = "gc_worker" // MUST be same with definition in PD
)

// The version info is set in Makefile
Expand Down Expand Up @@ -248,12 +250,14 @@ func (s *Server) getGCWorkerSafePoint(ctx context.Context) (uint64, error) {
}

func (s *Server) calcNewGCSafePoint(serviceSafePoint, gcWorkerSafePoint uint64) uint64 {
if serviceSafePoint < gcWorkerSafePoint {
// `serviceSafePoint == 0` means no service is registered. Just use gc safe point.
if serviceSafePoint > 0 && serviceSafePoint < gcWorkerSafePoint {
return serviceSafePoint
}
return gcWorkerSafePoint
}

/* keep these codes, uncomment these when new pd interfaces are enabled.
func (s *Server) updateServiceGroupSafePointWithRetry(ctx context.Context, serviceGroup string, gcWorkerSafePoint uint64) error {
succeed := false
for i := 0; i < updateSafePointRetryCnt; i++ {
Expand Down Expand Up @@ -308,6 +312,33 @@ func (s *Server) updateRawGCSafePoint(ctx context.Context) error {
}
return nil
}
*/

func (s *Server) updateRawGCSafePoint(ctx context.Context) error {
gcWorkerSafePoint, err := s.getGCWorkerSafePoint(ctx)
if err != nil {
log.Error("calc gc-worker safe point fails.", zap.Error(err), zap.String("worker", s.cfg.Name))
return errors.Trace(err)
}
serviceSafePoint, err := s.pdClient.UpdateServiceGCSafePoint(ctx, gcWorkerServiceID, gcWorkerSafePointTTL, gcWorkerSafePoint)
if err != nil {
log.Error("update service gc safepoint fails", zap.Error(err), zap.String("worker", s.cfg.Name))
return errors.Trace(err)
}
newSafepoint := s.calcNewGCSafePoint(serviceSafePoint, gcWorkerSafePoint)
retSafePoint, err := s.pdClient.UpdateGCSafePoint(ctx, newSafepoint)
if err != nil {
log.Error("update gc safepoint fails", zap.Error(err), zap.String("worker", s.cfg.Name))
return errors.Trace(err)
}
log.Info("update gc safepoint with old finish", zap.Uint64("gcWorkerSafePoint", gcWorkerSafePoint),
zap.Uint64("serviceSafePoint", serviceSafePoint),
zap.Uint64("newSafepoint", newSafepoint),
zap.Uint64("retSafePoint", retSafePoint),
zap.Uint64("gcWorkerSafePoint", gcWorkerSafePoint),
zap.String("worker", s.cfg.Name))
return nil
}

func (s *Server) startUpdateGCSafePointLoop() {
defer s.serverLoopWg.Done()
Expand Down
109 changes: 109 additions & 0 deletions gc-worker/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,66 @@ package server
import (
"context"
"fmt"
"math"
"net/url"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/tempurl"
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/pkg/typeutil"
"go.etcd.io/etcd/embed"
"go.uber.org/atomic"
)

type MockPDClient struct {
pd.Client
tsLogical atomic.Uint64
// SafePoint set by `UpdateGCSafePoint`. Not to be confused with SafePointKV.
gcSafePoint uint64
// Represents the current safePoint of all services including TiDB, representing how much data they want to retain
// in GC.
serviceSafePoints map[string]uint64
}

func (c *MockPDClient) GetTS(context.Context) (int64, int64, error) {
unixTime := time.Now()
ts := tsoutil.GenerateTimestamp(unixTime, c.tsLogical.Add(1)) // set logical as 0
return ts.Physical, ts.Logical, nil
}

func (c *MockPDClient) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
c.gcSafePoint = safePoint
return safePoint, nil
}

func (c *MockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
c.serviceSafePoints[serviceID] = safePoint
minSafePoint := uint64(math.MaxUint64)
for _, safepoint := range c.serviceSafePoints {
if safepoint < minSafePoint {
minSafePoint = safepoint
}
}
return minSafePoint, nil
}

func (c *MockPDClient) Close() {
}

// NewPDClient creates a mock pd.Client that uses local timestamp and meta data
// from a Cluster.
func NewMockPDClient() *MockPDClient {
mockClient := MockPDClient{
gcSafePoint: 0,
serviceSafePoints: make(map[string]uint64),
}
return &mockClient
}

func CreateAndStartTestServer(ctx context.Context, num uint32, cfg *Config) []*Server {
ret := []*Server{}
for i := 0; i < int(num); i++ {
Expand Down Expand Up @@ -108,3 +158,62 @@ func NewTestSingleConfig() *embed.Config {
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}

func TestCalcNewGCSafePoint(t *testing.T) {
cfg := NewConfig()
ctx := context.Background()
s := &Server{
cfg: cfg,
ctx: ctx,
startTimestamp: time.Now().Unix(),
}
defer s.Close()
newSp := s.calcNewGCSafePoint(0, 100)
require.Equal(t, newSp, uint64(100))

newSp = s.calcNewGCSafePoint(1000, 100)
require.Equal(t, newSp, uint64(100))

newSp = s.calcNewGCSafePoint(1000, 10000)
require.Equal(t, newSp, uint64(1000))

}

func TestCalcGCSafePoint(t *testing.T) {
mockPdClient := NewMockPDClient()
cfg := NewConfig()
cfg.GCLifeTime = typeutil.NewDuration(defaultGCLifeTime)
ctx := context.Background()
s := &Server{
cfg: cfg,
ctx: ctx,
startTimestamp: time.Now().Unix(),
}
defer s.Close()
s.pdClient = mockPdClient
curTs := tsoutil.GenerateTS(tsoutil.GenerateTimestamp(time.Now(), 0))
expectTs := time.Now().Add(-cfg.GCLifeTime.Duration)
expectGcSafePoint := tsoutil.GenerateTS(tsoutil.GenerateTimestamp(expectTs, 0))
gcSafePoint, err := s.getGCWorkerSafePoint(ctx)
require.NoError(t, err)
require.LessOrEqual(t, expectGcSafePoint, gcSafePoint)
require.LessOrEqual(t, gcSafePoint, curTs)
}

func TestUpdateGCSafePoint(t *testing.T) {
mockPdClient := NewMockPDClient()
cfg := NewConfig()
cfg.GCLifeTime = typeutil.NewDuration(defaultGCLifeTime)
ctx := context.Background()
s := &Server{
cfg: cfg,
ctx: ctx,
startTimestamp: time.Now().Unix(),
}
defer s.Close()
s.pdClient = mockPdClient
mockPdClient.UpdateServiceGCSafePoint(ctx, "cdc", math.MaxInt64, 100)
err := s.updateRawGCSafePoint(ctx)
require.NoError(t, err)
require.Equal(t, mockPdClient.gcSafePoint, uint64(100))
}

0 comments on commit 613073d

Please sign in to comment.