Skip to content

Commit 04587eb

Browse files
committed
Gossipsub v2.0: Handle INEED and send the message
1 parent 5768259 commit 04587eb

File tree

2 files changed

+202
-1
lines changed

2 files changed

+202
-1
lines changed

gossipsub.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,7 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
787787

788788
iwant := gs.handleIHave(rpc.from, ctl)
789789
ihave := gs.handleIWant(rpc.from, ctl)
790+
ihave = append(ihave, gs.handleINeed(rpc.from, ctl)...)
790791
prune := gs.handleGraft(rpc.from, ctl)
791792
gs.handleIAnnounce(rpc.from, ctl)
792793
gs.handlePrune(rpc.from, ctl)
@@ -922,6 +923,37 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
922923
return msgs
923924
}
924925

926+
func (gs *GossipSubRouter) handleINeed(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
927+
ihave := make(map[string]*pb.Message)
928+
for _, ineed := range ctl.GetIneed() {
929+
mid := ineed.GetMessageID()
930+
// Check if that peer has sent IDONTWANT before, if so don't send them the message
931+
if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok {
932+
continue
933+
}
934+
935+
msg, _, ok := gs.mcache.GetForPeer(mid, p)
936+
if !ok {
937+
continue
938+
}
939+
if !gs.p.peerFilter(p, msg.GetTopic()) {
940+
continue
941+
}
942+
ihave[mid] = msg.Message
943+
}
944+
945+
if len(ihave) == 0 {
946+
return nil
947+
}
948+
949+
msgs := make([]*pb.Message, 0, len(ihave))
950+
for _, msg := range ihave {
951+
msgs = append(msgs, msg)
952+
}
953+
954+
return msgs
955+
}
956+
925957
func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
926958
var prune []string
927959

gossipsub_test.go

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3338,7 +3338,7 @@ func TestGossipsubPruneMeshCorrectly(t *testing.T) {
33383338
}
33393339
}
33403340

3341-
// Test that IANNOUNCE is sent to mesh peers
3341+
// Test that IANNOUNCE is sent to mesh peers and no message is sent if it doesn't send INEED
33423342
func TestGossipsubIannounceMeshPeer(t *testing.T) {
33433343
ctx, cancel := context.WithCancel(context.Background())
33443344
defer cancel()
@@ -3421,6 +3421,89 @@ func TestGossipsubIannounceMeshPeer(t *testing.T) {
34213421
<-ctx.Done()
34223422
}
34233423

3424+
// Test that IANNOUNCE is sent to mesh peers and the message is sent after sending INEED
3425+
func TestGossipsubIannounceIneedMeshPeer(t *testing.T) {
3426+
ctx, cancel := context.WithCancel(context.Background())
3427+
defer cancel()
3428+
hosts := getDefaultHosts(t, 2)
3429+
3430+
msgID := func(pmsg *pb.Message) string {
3431+
// silly content-based test message-ID: just use the data as whole
3432+
return base64.URLEncoding.EncodeToString(pmsg.Data)
3433+
}
3434+
3435+
params := DefaultGossipSubParams()
3436+
params.Dannounce = params.D
3437+
psub := getGossipsub(ctx, hosts[0], WithGossipSubParams(params), WithMessageIdFn(msgID))
3438+
_, err := psub.Subscribe("foobar")
3439+
if err != nil {
3440+
t.Fatal(err)
3441+
}
3442+
3443+
// Wait a bit after the last message before checking we got the right messages
3444+
msgTimer := time.NewTimer(1 * time.Second)
3445+
3446+
// Checks we received the right messages
3447+
msgCount := 0
3448+
checkMsgs := func() {
3449+
if msgCount != 1 {
3450+
t.Fatalf("Expected one message received, got %d", msgCount)
3451+
}
3452+
}
3453+
3454+
// Wait for the timer to expire
3455+
go func() {
3456+
select {
3457+
case <-msgTimer.C:
3458+
checkMsgs()
3459+
cancel()
3460+
return
3461+
case <-ctx.Done():
3462+
checkMsgs()
3463+
}
3464+
}()
3465+
3466+
newMockGS(ctx, t, hosts[1], func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
3467+
// When the first peer connects it will send us its subscriptions
3468+
for _, sub := range irpc.GetSubscriptions() {
3469+
if sub.GetSubscribe() {
3470+
// Reply by subcribing to the topic and grafting to the first peer
3471+
writeMsg(&pb.RPC{
3472+
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
3473+
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
3474+
})
3475+
3476+
go func() {
3477+
// Wait for a short interval to make sure the first peer
3478+
// received and processed the subscribe + graft
3479+
time.Sleep(100 * time.Millisecond)
3480+
// Publish messages from the first peer
3481+
data := []byte("mymessage")
3482+
psub.Publish("foobar", data)
3483+
}()
3484+
}
3485+
}
3486+
if len(irpc.GetControl().GetIannounce()) > 0 {
3487+
var ineeds []*pb.ControlINeed
3488+
for _, iannounce := range irpc.GetControl().GetIannounce() {
3489+
mid := iannounce.GetMessageID()
3490+
ineed := &pb.ControlINeed{
3491+
MessageID: &mid,
3492+
}
3493+
ineeds = append(ineeds, ineed)
3494+
}
3495+
writeMsg(&pb.RPC{
3496+
Control: &pb.ControlMessage{Ineed: ineeds},
3497+
})
3498+
}
3499+
msgCount += len(irpc.GetPublish())
3500+
})
3501+
3502+
connect(t, hosts[0], hosts[1])
3503+
3504+
<-ctx.Done()
3505+
}
3506+
34243507
// Test that IANNOUNCE is sent to direct peers
34253508
func TestGossipsubIannounceDirectPeer(t *testing.T) {
34263509
ctx, cancel := context.WithCancel(context.Background())
@@ -4018,6 +4101,92 @@ func TestGossipsubIneedIndirectNonmeshPeers(t *testing.T) {
40184101
<-ctx.Done()
40194102
}
40204103

4104+
func TestSparseGossipsubV2(t *testing.T) {
4105+
ctx, cancel := context.WithCancel(context.Background())
4106+
defer cancel()
4107+
hosts := getDefaultHosts(t, 20)
4108+
4109+
params := DefaultGossipSubParams()
4110+
params.Dannounce = params.D
4111+
psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params))
4112+
4113+
var msgs []*Subscription
4114+
for _, ps := range psubs {
4115+
subch, err := ps.Subscribe("foobar")
4116+
if err != nil {
4117+
t.Fatal(err)
4118+
}
4119+
4120+
msgs = append(msgs, subch)
4121+
}
4122+
4123+
sparseConnect(t, hosts)
4124+
4125+
// wait for heartbeats to build mesh
4126+
time.Sleep(time.Second * 2)
4127+
4128+
for i := 0; i < 100; i++ {
4129+
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
4130+
4131+
owner := mrand.Intn(len(psubs))
4132+
4133+
psubs[owner].Publish("foobar", msg)
4134+
4135+
for _, sub := range msgs {
4136+
got, err := sub.Next(ctx)
4137+
if err != nil {
4138+
t.Fatal(sub.err)
4139+
}
4140+
if !bytes.Equal(msg, got.Data) {
4141+
t.Fatal("got wrong message!")
4142+
}
4143+
}
4144+
}
4145+
}
4146+
4147+
func TestDenseGossipsubV2(t *testing.T) {
4148+
ctx, cancel := context.WithCancel(context.Background())
4149+
defer cancel()
4150+
hosts := getDefaultHosts(t, 20)
4151+
4152+
params := DefaultGossipSubParams()
4153+
params.Dannounce = params.D
4154+
psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params))
4155+
4156+
var msgs []*Subscription
4157+
for _, ps := range psubs {
4158+
subch, err := ps.Subscribe("foobar")
4159+
if err != nil {
4160+
t.Fatal(err)
4161+
}
4162+
4163+
msgs = append(msgs, subch)
4164+
}
4165+
4166+
denseConnect(t, hosts)
4167+
4168+
// wait for heartbeats to build mesh
4169+
time.Sleep(time.Second * 2)
4170+
4171+
for i := 0; i < 100; i++ {
4172+
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
4173+
4174+
owner := mrand.Intn(len(psubs))
4175+
4176+
psubs[owner].Publish("foobar", msg)
4177+
4178+
for _, sub := range msgs {
4179+
got, err := sub.Next(ctx)
4180+
if err != nil {
4181+
t.Fatal(sub.err)
4182+
}
4183+
if !bytes.Equal(msg, got.Data) {
4184+
t.Fatal("got wrong message!")
4185+
}
4186+
}
4187+
}
4188+
}
4189+
40214190
func BenchmarkAllocDoDropRPC(b *testing.B) {
40224191
gs := GossipSubRouter{tracer: &pubsubTracer{}}
40234192

0 commit comments

Comments
 (0)