Skip to content

Commit 60e2ee3

Browse files
committed
Add an ability to run kvgod in a distributed way on multiple servers at the same time using RAFT consensus algorithm
1 parent 3535dac commit 60e2ee3

File tree

6 files changed

+436
-25
lines changed

6 files changed

+436
-25
lines changed

Gopkg.lock

Lines changed: 43 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/kvgod/main.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
package main
22

33
import (
4+
"context"
45
"flag"
5-
"fmt"
6+
"path/filepath"
7+
"time"
68

9+
pb "github.com/kgantsov/kvgo/pkg/server"
710
server "github.com/kgantsov/kvgo/pkg/server"
811
log "github.com/sirupsen/logrus"
12+
"google.golang.org/grpc"
913
)
1014

1115
const dbPath = "./data.db"
1216
const indexPath = "./indexes.idx"
1317

1418
func main() {
15-
port := flag.String("port", "56379", "DB port")
16-
rpcPort := flag.String("rpc_port", "50051", "RPC DB port")
19+
// addr := flag.String("addr", ":56379", "Redis bind address")
20+
rpcAddr := flag.String("rpc_addr", ":50051", "RPC bind address")
21+
raftDir := flag.String("raft_dir", "", "RPC DB port")
22+
raftAddr := flag.String("raft_addr", ":12000", "Raft bind address")
23+
joinAddr := flag.String("join_addr", "", "Join address")
24+
nodeID := flag.String("node_id", "", "Node ID")
1725
logLevel := flag.String("log_level", "info", "Log level")
1826
flag.Parse()
1927

@@ -26,9 +34,42 @@ func main() {
2634
}
2735
log.SetLevel(level)
2836

37+
if *raftDir == "" {
38+
log.Fatal("No Raft storage directory specified\n")
39+
}
40+
if *nodeID == "" {
41+
log.Fatal("No nodeID storage directory specified\n")
42+
}
43+
2944
log.Info("Creating storage...")
30-
store := server.NewStore(dbPath, indexPath, 1000, 10000)
45+
store := server.NewStore(
46+
filepath.Join(*raftDir, dbPath), filepath.Join(*raftDir, indexPath), 1000, 10000,
47+
)
48+
store.RaftDir = *raftDir
49+
store.RaftBind = *raftAddr
50+
51+
if err := store.Open(*joinAddr == "", *nodeID); err != nil {
52+
log.Fatalf("failed to open store: %s", err.Error())
53+
}
54+
55+
if *joinAddr != "" {
56+
conn, err := grpc.Dial(*joinAddr, grpc.WithInsecure())
57+
if err != nil {
58+
log.Fatalf("did not connect: %v", err)
59+
}
60+
defer conn.Close()
61+
c := pb.NewKVClient(conn)
62+
63+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
64+
defer cancel()
65+
66+
log.Info("TRYING TO CONNECT OT REMOTE NODE remote node %s at %s", *nodeID, *raftAddr)
67+
_, err = c.Join(ctx, &pb.JoinRequest{Addr: *raftAddr, NodeID: *nodeID})
68+
if err != nil {
69+
log.Fatalf("could not greet: %v", err)
70+
}
71+
}
3172

32-
server.ListenAndServ(fmt.Sprintf(":%s", *port), store)
33-
server.ListenAndServGrpc(fmt.Sprintf(":%s", *rpcPort), store)
73+
server.ListenAndServGrpc(*rpcAddr, store)
74+
// server.ListenAndServ(*addr, store)
3475
}

pkg/server/grpc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,8 @@ func (s *server) Del(ctx context.Context, in *DelRequest) (*DelResponse, error)
5252
s.store.Delete(in.Key)
5353
return &DelResponse{Exist: false}, nil
5454
}
55+
56+
func (s *server) Join(ctx context.Context, in *JoinRequest) (*JoinResponse, error) {
57+
s.store.Join(in.NodeID, in.Addr)
58+
return &JoinResponse{Joined: true}, nil
59+
}

pkg/server/kv.pb.go

Lines changed: 95 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/server/kv.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,19 @@ message DelResponse {
2828
bool exist = 1;
2929
}
3030

31+
message JoinRequest {
32+
string addr = 1;
33+
string nodeID = 2;
34+
}
35+
36+
message JoinResponse {
37+
bool joined = 1;
38+
}
39+
3140

3241
service KV {
3342
rpc Set (SetRequest) returns (SetResponse) {}
3443
rpc Get (GetRequest) returns (GetResponse) {}
3544
rpc Del (DelRequest) returns (DelResponse) {}
45+
rpc Join (JoinRequest) returns (JoinResponse) {}
3646
}

0 commit comments

Comments
 (0)