Skip to content

Commit bb6f794

Browse files
committed
feat. add server context
1 parent eb5c56c commit bb6f794

File tree

8 files changed

+88
-90
lines changed

8 files changed

+88
-90
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ _Inspired by the supercomputer (Red Queen) in "Resident Evil", the distributed k
1616
This is a reliable distributed key-value store based on the raft algorithm, and internal provides advanced functions such as distributed-lock...
1717

1818
## Client call
19-
`# go get github.com/RealFax/RedQueen@latest`
19+
```
20+
go get github.com/RealFax/RedQueen@latest
21+
```
2022

2123
[Code example](https://github.com/RealFax/RedQueen/tree/master/client/example)
2224

README_zh.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ _灵感来源于《生化危机》中的超级计算机(Red Queen), 分布式key
1616
这是一个基于raft算法实现的可靠分布式key-value存储, 并在内部提供了诸如 分布式锁...之类的高级功能
1717

1818
## 客户端调用
19-
`# go get github.com/RealFax/RedQueen@latest`
19+
```
20+
go get github.com/RealFax/RedQueen@latest
21+
```
2022

2123
[代码示例](https://github.com/RealFax/RedQueen/tree/master/client/example)
2224

internal/rqd/pprof.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net"
77
"net/http"
88
"net/http/pprof"
9+
"time"
910
)
1011

1112
type pprofServer struct {
@@ -18,12 +19,13 @@ func (s *pprofServer) Run() error {
1819
}
1920

2021
func (s *pprofServer) Close() error {
21-
defer s.listener.Close()
22-
return s.server.Shutdown(context.Background())
22+
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
23+
defer cancel()
24+
return s.server.Shutdown(ctx)
2325
}
2426

2527
func newPprofServer() (*pprofServer, error) {
26-
listener, err := net.Listen("tcp", "0.0.0.0:0")
28+
listener, err := net.Listen("tcp", "127.0.0.1:")
2729
if err != nil {
2830
return nil, err
2931
}

internal/rqd/raft.go

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"github.com/RealFax/RedQueen/internal/rqd/store"
55
"io"
66
"net"
7-
"os"
8-
"path/filepath"
97
"sync/atomic"
108
"time"
119

@@ -14,14 +12,6 @@ import (
1412
"github.com/pkg/errors"
1513
)
1614

17-
type RaftConfig struct {
18-
Bootstrap bool
19-
MaxSnapshots int
20-
ServerID, Addr, DataDir string
21-
Store store.Store
22-
Clusters []raft.Server
23-
}
24-
2515
type Raft struct {
2616
bootstrap bool
2717
term uint64 // [ATOMIC]
@@ -149,26 +139,3 @@ func NewRaftWithOptions(opts ...RaftServerOption) (*Raft, error) {
149139

150140
return r, nil
151141
}
152-
153-
func NewRaft(cfg RaftConfig) (*Raft, error) {
154-
return NewRaftWithOptions(
155-
RaftWithClusters(cfg.Clusters),
156-
RaftWithConfig(func() *raft.Config {
157-
raftCfg := raft.DefaultConfig()
158-
raftCfg.LocalID = raft.ServerID(cfg.ServerID)
159-
raftCfg.LogLevel = "INFO"
160-
return raftCfg
161-
}()),
162-
RaftWithStdFSM(cfg.Store),
163-
RaftWithBoltLogStore(filepath.Join(cfg.DataDir, "raft-log.db")),
164-
RaftWithStdStableStore(cfg.Store),
165-
RaftWithFileSnapshotStore(cfg.DataDir, cfg.MaxSnapshots, os.Stderr),
166-
RaftWithTCPTransport(cfg.Addr, 32, time.Second*3, os.Stderr),
167-
func() RaftServerOption {
168-
if cfg.Bootstrap {
169-
return RaftWithBootstrap()
170-
}
171-
return RaftWithEmpty()
172-
}(),
173-
)
174-
}

internal/rqd/rqd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ package rqd
22

33
const (
44
StoreSuffix = "data"
5+
RaftLog = "raft-log.db"
56
)

internal/rqd/server.go

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import (
55
"context"
66
"github.com/RealFax/RedQueen/internal/rqd/store"
77
"github.com/RealFax/RedQueen/pkg/dlocker"
8+
"github.com/hashicorp/raft"
89
"github.com/pkg/errors"
910
"net"
11+
"os"
12+
"path/filepath"
1013
"sync"
1114
"time"
1215

13-
"github.com/hashicorp/raft"
1416
"google.golang.org/grpc"
1517

1618
"github.com/RealFax/RedQueen/api/serverpb"
@@ -24,7 +26,8 @@ var (
2426
)
2527

2628
type Server struct {
27-
clusterID string
29+
ctx context.Context
30+
cancel context.CancelCauseFunc
2831

2932
cfg *config.Config
3033

@@ -38,6 +41,8 @@ type Server struct {
3841

3942
stateNotify sync.Map // map[string]chan bool
4043

44+
clusterID string
45+
4146
serverpb.UnimplementedKVServer
4247
serverpb.UnimplementedLockerServer
4348
serverpb.UnimplementedRedQueenServer
@@ -77,6 +82,8 @@ func (s *Server) applyLog(ctx context.Context, p *serverpb.RaftLogPayload, timeo
7782
func (s *Server) _stateUpdater() {
7883
for {
7984
select {
85+
case <-s.ctx.Done():
86+
return
8087
case state := <-s.raft.LeaderCh():
8188
s.stateNotify.Range(func(_, val any) bool {
8289
val.(chan bool) <- state
@@ -95,6 +102,8 @@ func (s *Server) ListenAndServe() error {
95102
}
96103

97104
func (s *Server) Close() (err error) {
105+
s.cancel(errors.New("server close"))
106+
98107
if err = s.raft.Shutdown().Error(); err != nil {
99108
return
100109
}
@@ -108,12 +117,13 @@ func (s *Server) Close() (err error) {
108117

109118
func NewServer(cfg *config.Config) (*Server, error) {
110119
var (
111-
server = Server{
120+
err error
121+
server = &Server{
112122
clusterID: cfg.Node.ID,
113123
cfg: cfg,
114124
}
115-
err error
116125
)
126+
server.ctx, server.cancel = context.WithCancelCause(context.Background())
117127

118128
if cfg.Misc.PPROF {
119129
server.pprofServer, err = newPprofServer()
@@ -130,33 +140,44 @@ func NewServer(cfg *config.Config) (*Server, error) {
130140

131141
// init server grpc
132142
server.grpcServer = grpc.NewServer()
133-
serverpb.RegisterKVServer(server.grpcServer, &server)
134-
serverpb.RegisterLockerServer(server.grpcServer, &server)
135-
serverpb.RegisterRedQueenServer(server.grpcServer, &server)
143+
serverpb.RegisterKVServer(server.grpcServer, server)
144+
serverpb.RegisterLockerServer(server.grpcServer, server)
145+
serverpb.RegisterRedQueenServer(server.grpcServer, server)
136146

137147
// init server raft
138-
if server.raft, err = NewRaft(RaftConfig{
139-
Bootstrap: cfg.Env().FirstRun(),
140-
MaxSnapshots: int(cfg.Node.MaxSnapshots),
141-
ServerID: cfg.Node.ID,
142-
Addr: cfg.Node.ListenPeerAddr,
143-
DataDir: cfg.Node.DataDir,
144-
Store: server.store,
145-
Clusters: func() []raft.Server {
148+
if server.raft, err = NewRaftWithOptions(
149+
RaftWithStdFSM(server.store),
150+
RaftWithBoltLogStore(filepath.Join(cfg.Node.DataDir, RaftLog)),
151+
RaftWithStdStableStore(server.store),
152+
RaftWithFileSnapshotStore(cfg.Node.DataDir, int(cfg.Node.MaxSnapshots), os.Stderr),
153+
RaftWithTCPTransport(cfg.Node.ListenPeerAddr, 32, 3*time.Second, os.Stderr),
154+
RaftWithConfig(func() *raft.Config {
155+
c := raft.DefaultConfig()
156+
c.LocalID = raft.ServerID(cfg.Node.ID)
157+
c.LogLevel = "INFO"
158+
return c
159+
}()),
160+
func() RaftServerOption {
161+
if cfg.Env().FirstRun() {
162+
return RaftWithBootstrap()
163+
}
164+
return RaftWithEmpty()
165+
}(),
166+
RaftWithClusters(func() []raft.Server {
146167
if !cfg.Env().FirstRun() {
147168
return nil
148169
}
149-
clusters := make([]raft.Server, 0, len(cfg.Cluster.Bootstrap))
170+
cluster := make([]raft.Server, 0, len(cfg.Cluster.Bootstrap))
150171
for _, node := range cfg.Cluster.Bootstrap {
151-
clusters = append(clusters, raft.Server{
172+
cluster = append(cluster, raft.Server{
152173
Suffrage: raft.Voter,
153174
ID: raft.ServerID(node.Name),
154175
Address: raft.ServerAddress(node.PeerAddr),
155176
})
156177
}
157-
return clusters
158-
}(),
159-
}); err != nil {
178+
return cluster
179+
}()),
180+
); err != nil {
160181
return nil, errors.Wrap(err, "NewServer")
161182
}
162183

@@ -170,8 +191,8 @@ func NewServer(cfg *config.Config) (*Server, error) {
170191
server.logApplyer = NewRaftMultipleLogApply(
171192
context.Background(),
172193
64,
173-
time.Millisecond*300,
174-
time.Second*3,
194+
300*time.Millisecond,
195+
3*time.Second,
175196
server.raft.Apply,
176197
)
177198
} else {
@@ -181,5 +202,5 @@ func NewServer(cfg *config.Config) (*Server, error) {
181202
// start daemon service
182203
go server._stateUpdater()
183204

184-
return &server, nil
205+
return server, nil
185206
}

pkg/dlocker/backend.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ func (w LockerBackend) Watch(key []byte) (store.Watcher, error) {
6565

6666
func NewLockerBackend(
6767
s store.Store,
68-
raftApplyFunc func(context.Context, *serverpb.RaftLogPayload, time.Duration) error,
68+
apply func(context.Context, *serverpb.RaftLogPayload, time.Duration) error,
6969
) (Backend, error) {
7070
current, err := s.Swap(Namespace)
7171
if err != nil {
7272
return nil, err
7373
}
7474
return &LockerBackend{
7575
actions: current,
76-
apply: raftApplyFunc,
76+
apply: apply,
7777
}, nil
7878
}

scripts/protobuf.sh

100644100755
Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,35 @@
1-
#!/usr/bin/env bash
1+
#!/bin/bash
22

33
function walk() {
44
local dir="$1"
5-
local walkDir="$2"
6-
7-
# shellcheck disable=SC2045
8-
for f in `ls $1`; do
9-
10-
if [ -f "$dir/$f" ] && [[ $walkDir != "dir" ]] && [[ "$dir/$f" == *".proto" ]]; then
11-
echo "building protobuf: $dir/$f"
12-
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative "$dir/$f"
13-
fi
14-
15-
if [ -d "$dir/$f" ]
16-
then
17-
if [[ `ls $dir/$f` == *".proto" ]]; then
18-
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative $dir/$f/*.proto
19-
else
20-
walk "$dir/$f" "dir"
21-
fi
22-
echo "building protobuf: $dir/$f"
23-
fi
24-
done
5+
local walk_dir="$2"
6+
7+
for f in $(ls "$1"); do
8+
9+
if [ -f "$dir/$f" ] && [[ $walk_dir != "dir" ]] && [[ "$dir/$f" == *".proto" ]]; then
10+
echo "building protobuf: $dir/$f"
11+
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative "$dir/$f"
12+
fi
13+
14+
if [ -d "$dir/$f" ]; then
15+
if [[ $(ls $dir/$f) == *".proto" ]]; then
16+
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative $dir/$f/*.proto
17+
else
18+
walk "$dir/$f" "dir"
19+
fi
20+
echo "building protobuf: $dir/$f"
21+
fi
22+
done
23+
}
24+
25+
function main() {
26+
local api_dir='./api'
27+
28+
if [ "$1" != "" ]; then
29+
api_dir=$1
30+
fi
31+
32+
walk "$api_dir" "file"
2533
}
2634

27-
if [ "$1" == "" ]; then
28-
$1 = "./api"
29-
echo "invalid api dir"
30-
else
31-
walk "$1" "file"
32-
fi
35+
main "$@"

0 commit comments

Comments
 (0)