Skip to content

Commit 36e2a6b

Browse files
committed
catch up
1 parent f026b07 commit 36e2a6b

File tree

4 files changed

+131
-43
lines changed

4 files changed

+131
-43
lines changed

ballot_protocol.go

Lines changed: 63 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"github.com/sirupsen/logrus"
88
)
99

10-
type Catchuper interface {
11-
GetSlot(slotIndex uint64, nodeID string) (Slot, error)
10+
type SlotsLoader interface {
11+
LoadSlots(from uint64, to Slot) []Slot
1212
}
1313

1414
type Ledger interface {
@@ -19,6 +19,7 @@ type ballotProtocol struct {
1919
slotIndex uint64
2020
id string
2121

22+
loader SlotsLoader
2223
ledger Ledger
2324

2425
quorumSlices
@@ -36,17 +37,44 @@ type ballotProtocol struct {
3637
nominationProtocol chan protocolMessage
3738
lastCandidate Value
3839
candidates chan Value
40+
41+
catchUpDone chan struct{}
42+
persistLater []Slot
43+
}
44+
45+
func (b *ballotProtocol) init(slotIndex uint64) {
46+
b.slotIndex = slotIndex
47+
b.ballots = newBallots()
48+
b.currentBallot = &ballot{slotIndex: slotIndex, counter: 1}
49+
b.counters = make(ballotCounters)
50+
b.timer = time.NewTimer(time.Hour)
51+
}
52+
53+
func (b *ballotProtocol) reinit(index uint64) {
54+
b.init(index)
55+
56+
b.candidates = make(chan Value, 1000)
57+
b.highestAcceptedPrepared = nil
58+
b.highestConfirmedPrepared = nil
59+
b.lastCandidate = nil
60+
61+
b.nominationProtocol <- protocolMessage{
62+
slotIndex: index,
63+
candidates: b.candidates,
64+
}
3965
}
4066

4167
func (b *ballotProtocol) run() {
4268
for {
4369
select {
44-
case <-b.timer.C:
45-
b.updateCurrentBallotCounter(b.currentBallot.counter + 1)
4670
case c := <-b.candidates:
4771
b.newCandidate(c)
4872
case m := <-b.inputMessages:
4973
b.receive(m)
74+
case <-b.timer.C:
75+
b.updateCurrentBallotCounter(b.currentBallot.counter + 1)
76+
case <-b.catchUpDone:
77+
b.finishCatchUp()
5078
}
5179
}
5280
}
@@ -80,6 +108,12 @@ func (b *ballotProtocol) receive(m *Message) {
80108
}
81109
}
82110

111+
func (b *ballotProtocol) updateCurrentBallotCounter(c uint32) {
112+
b.currentBallot.counter = c
113+
b.recomputeCurrentBallotValue()
114+
b.votePrepare()
115+
}
116+
83117
func (b *ballotProtocol) broadcast(m *Message) {
84118
if m.SlotIndex != b.slotIndex {
85119
return
@@ -235,37 +269,41 @@ func (b *ballotProtocol) confirmCommit(ballot *ballot) {
235269
}
236270

237271
func (b *ballotProtocol) externalize(s Slot) {
272+
if s.Index != b.slotIndex {
273+
b.catchUp(s)
274+
b.reinit(s.Index)
275+
return
276+
}
277+
238278
b.ledger.PersistSlot(s)
239279
fmt.Println("\n", b.id, "externalized", s.Index, string(s.Value))
240280
b.reinit(s.Index + 1)
241281
}
242282

243-
func (b *ballotProtocol) init(slotIndex uint64) {
244-
b.slotIndex = slotIndex
245-
b.ballots = newBallots()
246-
b.currentBallot = &ballot{slotIndex: slotIndex, counter: 1}
247-
b.counters = make(ballotCounters)
248-
b.timer = time.NewTimer(time.Hour)
249-
}
283+
func (b *ballotProtocol) catchUp(slot Slot) {
284+
inProgress := b.persistLater != nil
285+
b.persistLater = append(b.persistLater, slot)
250286

251-
func (b *ballotProtocol) reinit(index uint64) {
252-
b.init(index)
287+
if inProgress {
288+
return
289+
}
253290

254-
b.candidates = make(chan Value, 1000)
255-
b.highestAcceptedPrepared = nil
256-
b.highestConfirmedPrepared = nil
257-
b.lastCandidate = nil
291+
go catchuper{
292+
loader: b.loader,
293+
ledger: b.ledger,
294+
from: b.slotIndex,
295+
to: slot,
296+
done: b.catchUpDone,
297+
}.catchUp()
298+
}
258299

259-
b.nominationProtocol <- protocolMessage{
260-
slotIndex: index,
261-
candidates: b.candidates,
300+
func (b *ballotProtocol) finishCatchUp() {
301+
for _, slot := range b.persistLater {
302+
b.ledger.PersistSlot(slot)
262303
}
263-
}
264304

265-
func (b *ballotProtocol) updateCurrentBallotCounter(c uint32) {
266-
b.currentBallot.counter = c
267-
b.recomputeCurrentBallotValue()
268-
b.votePrepare()
305+
b.slotIndex = b.persistLater[len(b.persistLater)-1].Index + 1
306+
b.persistLater = nil
269307
}
270308

271309
func (b *ballotProtocol) recomputeCurrentBallotValue() {

catchup.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package scp
2+
3+
type catchuper struct {
4+
loader SlotsLoader
5+
ledger Ledger
6+
from uint64
7+
to Slot
8+
done chan struct{}
9+
}
10+
11+
func (c catchuper) catchUp() {
12+
slots := c.loader.LoadSlots(c.from, c.to)
13+
for _, s := range slots {
14+
c.ledger.PersistSlot(s)
15+
}
16+
c.done <- struct{}{}
17+
}

main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type Config struct {
1515
Validator Validator
1616
Combiner Combiner
1717
Ledger Ledger
18+
SlotsLoader SlotsLoader
1819
QuorumSlices []*QuorumSlice
1920
}
2021

@@ -47,11 +48,13 @@ func New(cfg Config) *Consensus {
4748
slotIndex: cfg.CurrentSlot,
4849
id: cfg.NodeID,
4950
ledger: cfg.Ledger,
51+
loader: cfg.SlotsLoader,
5052
quorumSlices: cfg.QuorumSlices,
5153
inputMessages: make(chan *Message, 1000000),
5254
outputMessages: outputMessages,
5355
candidates: candidates,
5456
nominationProtocol: link,
57+
catchUpDone: make(chan struct{}),
5558
}
5659
ballotProtocol.init(cfg.CurrentSlot)
5760

main_test.go

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,21 @@ import (
1010
"github.com/xdarksome/scp"
1111
)
1212

13-
type app struct{}
13+
type app struct {
14+
slots map[uint64]scp.Slot
15+
}
16+
17+
func newApp() app {
18+
return app{
19+
slots: map[uint64]scp.Slot{
20+
1: {Index: 1, Value: []byte{1}},
21+
2: {Index: 2, Value: []byte{2}},
22+
3: {Index: 3, Value: []byte{3}},
23+
4: {Index: 4, Value: []byte{4}},
24+
},
25+
}
26+
}
27+
1428
type badApp struct{}
1529

1630
func (v app) ValidateValue(scp.Value) bool {
@@ -29,7 +43,21 @@ func (v app) CombineValues(values ...scp.Value) (composite scp.Value) {
2943
return composite
3044
}
3145

32-
func (v app) PersistSlot(slot scp.Slot) {}
46+
func (v app) PersistSlot(s scp.Slot) {
47+
v.slots[s.Index] = s
48+
}
49+
50+
type catchup struct {
51+
app
52+
}
53+
54+
func (c catchup) LoadSlots(from uint64, to scp.Slot) (loaded []scp.Slot) {
55+
time.Sleep(5 * time.Second)
56+
for i := from; i < to.Index; i++ {
57+
loaded = append(loaded, c.slots[i])
58+
}
59+
return loaded
60+
}
3361

3462
func (v badApp) ValidateValue(segment scp.Value) bool {
3563
return false
@@ -49,10 +77,10 @@ func Test(t *testing.T) {
4977

5078
node1 := scp.New(scp.Config{
5179
NodeID: node1key,
52-
CurrentSlot: 10,
53-
Validator: app{},
54-
Combiner: app{},
55-
Ledger: app{},
80+
CurrentSlot: 5,
81+
Validator: newApp(),
82+
Combiner: newApp(),
83+
Ledger: newApp(),
5684
QuorumSlices: []*scp.QuorumSlice{
5785
{
5886
Threshold: 2,
@@ -67,10 +95,10 @@ func Test(t *testing.T) {
6795

6896
node2 := scp.New(scp.Config{
6997
NodeID: node2key,
70-
CurrentSlot: 10,
71-
Validator: app{},
72-
Combiner: app{},
73-
Ledger: app{},
98+
CurrentSlot: 5,
99+
Validator: newApp(),
100+
Combiner: newApp(),
101+
Ledger: newApp(),
74102
QuorumSlices: []*scp.QuorumSlice{
75103
{
76104
Threshold: 2,
@@ -83,12 +111,13 @@ func Test(t *testing.T) {
83111
},
84112
})
85113

114+
node3app := newApp()
86115
node3 := scp.New(scp.Config{
87116
NodeID: node3key,
88-
CurrentSlot: 10,
89-
Validator: app{},
90-
Combiner: app{},
91-
Ledger: app{},
117+
CurrentSlot: 5,
118+
Validator: node3app,
119+
Combiner: node3app,
120+
Ledger: node3app,
92121
QuorumSlices: []*scp.QuorumSlice{
93122
{
94123
Threshold: 2,
@@ -103,10 +132,11 @@ func Test(t *testing.T) {
103132

104133
node4 := scp.New(scp.Config{
105134
NodeID: node4key,
106-
CurrentSlot: 10,
107-
Validator: badApp{},
108-
Combiner: badApp{},
109-
Ledger: badApp{},
135+
CurrentSlot: 1,
136+
Validator: newApp(),
137+
Combiner: newApp(),
138+
Ledger: newApp(),
139+
SlotsLoader: &catchup{node3app},
110140
QuorumSlices: []*scp.QuorumSlice{
111141
{
112142
Threshold: 2,

0 commit comments

Comments
 (0)