Skip to content

Commit 1067ed0

Browse files
committed
Add use cached client in fe master redirect && Make all log with
filename Signed-off-by: Jack Drogon <[email protected]>
1 parent e349a7f commit 1067ed0

File tree

7 files changed

+219
-27
lines changed

7 files changed

+219
-27
lines changed

build.sh

100644100755
File mode changed.

doc/start_syncer.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ bash bin/start_syncer.sh --log_dir /path/to/ccr_syncer.log
5454
```bash
5555
bash bin/start_syncer.sh --log_level info
5656
```
57-
日志的格式如下,其中hook只会在`log_level > info`的时候打印:
57+
5858
```
5959
# time level msg hooks
6060
[2023-07-18 16:30:18] TRACE This is trace type. ccrName=xxx line=xxx

pkg/ccr/base/spec.go

+5
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ type Frontend struct {
9898
Host string `json:"host"`
9999
Port string `json:"port"`
100100
ThriftPort string `json:"thrift_port"`
101+
IsMaster bool `json:"is_master"`
102+
}
103+
104+
func (f *Frontend) String() string {
105+
return fmt.Sprintf("host: %s, port: %s, thrift_port: %s, is_master: %v", f.Host, f.Port, f.ThriftPort, f.IsMaster)
101106
}
102107

103108
// TODO(Drogon): timeout config

pkg/ccr/job.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,8 @@ func (j *Job) fullSync() error {
317317
}
318318

319319
if snapshotResp.Status.GetStatusCode() != tstatus.TStatusCode_OK {
320-
log.Errorf("get snapshot failed, status: %v", snapshotResp.Status)
320+
err = xerror.Errorf(xerror.FE, "get snapshot failed, status: %v", snapshotResp.Status)
321+
return err
321322
}
322323

323324
log.Debugf("job: %s", string(snapshotResp.GetJobInfo()))
@@ -1159,7 +1160,7 @@ func (j *Job) incrementalSync() error {
11591160

11601161
getBinlogResp, err := srcRpc.GetBinlog(src, commitSeq)
11611162
if err != nil {
1162-
return nil
1163+
return err
11631164
}
11641165
log.Debugf("resp: %v", getBinlogResp)
11651166

@@ -1435,6 +1436,16 @@ func (j *Job) Stop() {
14351436
func (j *Job) FirstRun() error {
14361437
log.Info("first run check job", zap.String("src", j.Src.String()), zap.String("dest", j.Dest.String()))
14371438

1439+
// Step 0: get all frontends
1440+
if frontends, err := j.srcMeta.GetFrontends(); err != nil {
1441+
return err
1442+
} else {
1443+
for _, frontend := range frontends {
1444+
j.Src.Frontends = append(j.Src.Frontends, *frontend)
1445+
}
1446+
}
1447+
log.Debugf("src frontends %+v", j.Src.Frontends)
1448+
14381449
// Step 1: check fe and be binlog feature is enabled
14391450
if err := j.srcMeta.CheckBinlogFeature(); err != nil {
14401451
return err

pkg/ccr/meta.go

+47
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,53 @@ func (m *Meta) UpdateBackends() error {
511511
return nil
512512
}
513513

514+
func (m *Meta) GetFrontends() ([]*base.Frontend, error) {
515+
db, err := m.Connect()
516+
if err != nil {
517+
return nil, err
518+
}
519+
520+
query := "select Host, QueryPort, RpcPort, IsMaster from frontends();"
521+
log.Debug(query)
522+
rows, err := db.Query(query)
523+
if err != nil {
524+
return nil, xerror.Wrap(err, xerror.Normal, query)
525+
}
526+
527+
frontends := make([]*base.Frontend, 0)
528+
defer rows.Close()
529+
for rows.Next() {
530+
rowParser := utils.NewRowParser()
531+
if err := rowParser.Parse(rows); err != nil {
532+
return nil, xerror.Wrapf(err, xerror.Normal, query)
533+
}
534+
535+
var fe base.Frontend
536+
fe.Host, err = rowParser.GetString("Host")
537+
if err != nil {
538+
return nil, xerror.Wrapf(err, xerror.Normal, query)
539+
}
540+
541+
fe.Port, err = rowParser.GetString("QueryPort")
542+
if err != nil {
543+
return nil, xerror.Wrapf(err, xerror.Normal, query)
544+
}
545+
546+
fe.ThriftPort, err = rowParser.GetString("RpcPort")
547+
if err != nil {
548+
return nil, xerror.Wrapf(err, xerror.Normal, query)
549+
}
550+
551+
fe.IsMaster, err = rowParser.GetBool("IsMaster")
552+
if err != nil {
553+
return nil, xerror.Wrapf(err, xerror.Normal, query)
554+
}
555+
556+
frontends = append(frontends, &fe)
557+
}
558+
return frontends, nil
559+
}
560+
514561
func (m *Meta) GetBackends() ([]*base.Backend, error) {
515562
if len(m.Backends) > 0 {
516563
backends := make([]*base.Backend, 0, len(m.Backends))

pkg/ccr/metaer.go

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type Metaer interface {
9494
GetPartitionRange(tableId int64, partitionId int64) (string, error)
9595
GetPartitionIdByName(tableId int64, partitionName string) (int64, error)
9696

97+
GetFrontends() ([]*base.Frontend, error)
9798
UpdateBackends() error
9899
GetBackends() ([]*base.Backend, error)
99100
GetBackendId(host, portStr string) (int64, error)

pkg/rpc/fe.go

+152-24
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@ package rpc
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"strings"
68
"sync"
79

8-
"github.com/cloudwego/kitex/client"
9-
"github.com/selectdb/ccr_syncer/pkg/ccr/base"
1010
festruct "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice"
1111
feservice "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice/frontendservice"
1212
tstatus "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/status"
1313
festruct_types "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/types"
1414
"github.com/selectdb/ccr_syncer/pkg/utils"
1515
"github.com/selectdb/ccr_syncer/pkg/xerror"
1616

17+
"github.com/cloudwego/kitex/client"
18+
"github.com/cloudwego/kitex/pkg/kerrors"
19+
"github.com/selectdb/ccr_syncer/pkg/ccr/base"
1720
log "github.com/sirupsen/logrus"
1821
)
1922

@@ -25,6 +28,32 @@ var (
2528
ErrFeNotMasterCompatible = xerror.NewWithoutStack(xerror.FE, "not master compatible")
2629
)
2730

31+
// canUseNextAddr means can try next addr, err is a connection error, not a method not found or other error
32+
func canUseNextAddr(err error) bool {
33+
if errors.Is(err, kerrors.ErrNoConnection) {
34+
return true
35+
}
36+
if errors.Is(err, kerrors.ErrNoResolver) {
37+
return true
38+
}
39+
if errors.Is(err, kerrors.ErrNoDestAddress) {
40+
return true
41+
}
42+
43+
errMsg := err.Error()
44+
if strings.Contains(errMsg, "connection has been closed by peer") {
45+
return true
46+
}
47+
if strings.Contains(errMsg, "closed network connection") {
48+
return true
49+
}
50+
if strings.Contains(errMsg, "connection reset by peer") {
51+
return true
52+
}
53+
54+
return false
55+
}
56+
2857
type IFeRpc interface {
2958
BeginTransaction(*base.Spec, string, []int64) (*festruct.TBeginTxnResult_, error)
3059
CommitTransaction(*base.Spec, int64, []*festruct_types.TTabletCommitInfo) (*festruct.TCommitTxnResult_, error)
@@ -62,6 +91,17 @@ func NewFeRpc(spec *base.Spec) (*FeRpc, error) {
6291
cachedFeAddrs := make(map[string]bool)
6392
for _, fe := range spec.Frontends {
6493
addr := fmt.Sprintf("%s:%s", fe.Host, fe.ThriftPort)
94+
95+
if _, ok := cachedFeAddrs[addr]; ok {
96+
continue
97+
}
98+
99+
// for cached all spec clients
100+
if client, err := newSingleFeClient(addr); err != nil {
101+
log.Warnf("new fe client error: %v", err)
102+
} else {
103+
clients[client.Address()] = client
104+
}
65105
cachedFeAddrs[addr] = true
66106
}
67107

@@ -124,41 +164,129 @@ func (rpc *FeRpc) getCacheFeAddrs() map[string]bool {
124164
return utils.CopyMap(rpc.cachedFeAddrs)
125165
}
126166

127-
func (rpc *FeRpc) callWithMasterRedirect(caller callerType) (resultType, error) {
128-
masterClient := rpc.getMasterClient()
167+
type retryWithMasterRedirectAndCachedClientsRpc struct {
168+
rpc *FeRpc
169+
caller callerType
170+
notriedClients map[string]*singleFeClient
171+
}
172+
173+
type call0Result struct {
174+
canUseNextAddr bool
175+
resp resultType
176+
err error
177+
masterAddr string
178+
}
129179

130-
result, err := caller(masterClient)
180+
func (r *retryWithMasterRedirectAndCachedClientsRpc) call0(masterClient *singleFeClient) *call0Result {
181+
caller := r.caller
182+
resp, err := caller(masterClient)
183+
log.Tracef("call resp: %+v, error: %+v", resp, err)
184+
185+
// Step 1: check error
131186
if err != nil {
132-
return result, err
187+
if !canUseNextAddr(err) {
188+
return &call0Result{
189+
canUseNextAddr: false,
190+
err: xerror.Wrap(err, xerror.FE, "thrift error"),
191+
}
192+
} else {
193+
log.Warnf("call error: %v, try next addr", err)
194+
return &call0Result{
195+
canUseNextAddr: true,
196+
err: xerror.Wrap(err, xerror.FE, "thrift error"),
197+
}
198+
}
133199
}
134200

135-
if result.GetStatus().GetStatusCode() != tstatus.TStatusCode_NOT_MASTER {
136-
return result, err
201+
// Step 2: check need redirect
202+
if resp.GetStatus().GetStatusCode() != tstatus.TStatusCode_NOT_MASTER {
203+
return &call0Result{
204+
canUseNextAddr: false,
205+
resp: resp,
206+
err: nil,
207+
}
137208
}
138209

139210
// no compatible for master
140-
if !result.IsSetMasterAddress() {
141-
return result, xerror.XPanicWrapf(ErrFeNotMasterCompatible, "fe addr [%s]", masterClient.Address())
211+
if !resp.IsSetMasterAddress() {
212+
err = xerror.XPanicWrapf(ErrFeNotMasterCompatible, "fe addr [%s]", masterClient.Address())
213+
return &call0Result{
214+
canUseNextAddr: true,
215+
err: err, // not nil
216+
}
142217
}
143218

144219
// switch to master
145-
masterAddr := result.GetMasterAddress()
146-
log.Infof("switch to master %s", masterAddr)
147-
addr := fmt.Sprintf("%s:%d", masterAddr.Hostname, masterAddr.Port)
220+
masterAddr := resp.GetMasterAddress()
221+
err = xerror.Errorf(xerror.FE, "addr [%s] is not master", masterAddr)
222+
return &call0Result{
223+
canUseNextAddr: true,
224+
resp: resp,
225+
masterAddr: fmt.Sprintf("%s:%d", masterAddr.Hostname, masterAddr.Port),
226+
err: err, // not nil
227+
}
228+
}
148229

149-
client, ok := rpc.getClient(addr)
150-
if ok {
151-
masterClient = client
152-
} else {
153-
masterClient, err = newSingleFeClient(addr)
154-
if err != nil {
155-
return nil, xerror.Wrapf(err, xerror.RPC, "NewFeClient error: %v", err)
230+
func (r *retryWithMasterRedirectAndCachedClientsRpc) call() (resultType, error) {
231+
rpc := r.rpc
232+
masterClient := rpc.masterClient
233+
234+
// Step 1: try master
235+
result := r.call0(masterClient)
236+
log.Tracef("call0 result: %+v", result)
237+
if result.err == nil {
238+
return result.resp, nil
239+
}
240+
241+
// Step 2: check error, if can't use next addr, return error
242+
// canUseNextAddr means can try next addr, contains ErrNoConnection, ErrNoResolver, ErrNoDestAddress => (feredirect && use next cached addr)
243+
if !result.canUseNextAddr {
244+
return nil, result.err
245+
}
246+
247+
// Step 3: if set master addr, redirect to master
248+
// redirect to master
249+
if result.masterAddr != "" {
250+
masterAddr := result.masterAddr
251+
log.Infof("switch to master %s", masterAddr)
252+
253+
var err error
254+
client, ok := rpc.getClient(masterAddr)
255+
if ok {
256+
masterClient = client
257+
} else {
258+
masterClient, err = newSingleFeClient(masterAddr)
259+
if err != nil {
260+
return nil, xerror.Wrapf(err, xerror.RPC, "NewFeClient [%s] error: %v", masterAddr, err)
261+
}
156262
}
263+
rpc.updateMasterClient(masterClient)
264+
return r.call()
265+
}
266+
267+
// Step 4: try all cached fe clients
268+
if r.notriedClients == nil {
269+
r.notriedClients = rpc.getClients()
270+
}
271+
delete(r.notriedClients, masterClient.Address())
272+
if len(r.notriedClients) == 0 {
273+
return nil, result.err
274+
}
275+
// get first notried client
276+
var client *singleFeClient
277+
for _, client = range r.notriedClients {
278+
break
157279
}
158-
rpc.updateMasterClient(masterClient)
280+
// because call0 failed, so original masterClient is not master now, set client as masterClient for retry
281+
rpc.updateMasterClient(client)
282+
return r.call()
283+
}
159284

160-
// retry
161-
return caller(masterClient)
285+
func (rpc *FeRpc) callWithMasterRedirect(caller callerType) (resultType, error) {
286+
r := &retryWithMasterRedirectAndCachedClientsRpc{rpc: rpc,
287+
caller: caller,
288+
}
289+
return r.call()
162290
}
163291

164292
type retryCallerType func(client *singleFeClient) (any, error)
@@ -194,7 +322,7 @@ func (rpc *FeRpc) callWithRetryAllClients(caller retryCallerType) (result any, e
194322

195323
usedClientAddrs[addr] = true
196324
if client, err := newSingleFeClient(addr); err != nil {
197-
log.Errorf("new fe client error: %v", err)
325+
log.Warnf("new fe client error: %v", err)
198326
} else {
199327
rpc.addClient(client)
200328
if result, err = caller(client); err == nil {

0 commit comments

Comments
 (0)