Skip to content

Commit

Permalink
zmq4: first stab at XPUB
Browse files Browse the repository at this point in the history
  • Loading branch information
sbinet committed Jan 20, 2020
1 parent 2795d9e commit 018f24d
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 12 deletions.
60 changes: 60 additions & 0 deletions czmq4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,65 @@ var (
},
}

cxpubsubs = []testCaseXPubSub{
{
name: "tcp-cxpub-sub",
endpoint: must(EndPoint("tcp")),
xpub: zmq4.NewCXPub(bkg),
sub0: zmq4.NewSub(bkg),
sub1: zmq4.NewSub(bkg),
sub2: zmq4.NewSub(bkg),
},
{
name: "tcp-xpub-csub",
endpoint: must(EndPoint("tcp")),
xpub: zmq4.NewXPub(bkg),
sub0: zmq4.NewCSub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
{
name: "tcp-cxpub-csub",
endpoint: must(EndPoint("tcp")),
xpub: zmq4.NewCXPub(bkg),
sub0: zmq4.NewCSub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
{
name: "ipc-cxpub-sub",
endpoint: "ipc://ipc-cxpub-sub",
xpub: zmq4.NewCXPub(bkg),
sub0: zmq4.NewSub(bkg),
sub1: zmq4.NewSub(bkg),
sub2: zmq4.NewSub(bkg),
},
{
name: "ipc-xpub-csub",
endpoint: "ipc://ipc-xpub-csub",
xpub: zmq4.NewXPub(bkg),
sub0: zmq4.NewCSub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
{
name: "ipc-cxpub-csub",
endpoint: "ipc://ipc-cxpub-csub",
xpub: zmq4.NewCXPub(bkg),
sub0: zmq4.NewCSub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
{
name: "inproc-cxpub-csub",
endpoint: "inproc://inproc-cxpub-csub",
xpub: zmq4.NewCXPub(bkg),
sub0: zmq4.NewCSub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
}

crouterdealers = []testCaseRouterDealer{
{
name: "tcp-router-cdealer",
Expand Down Expand Up @@ -348,5 +407,6 @@ func init() {
reqreps = append(reqreps, creqreps...)
pairs = append(pairs, cpairs...)
pubsubs = append(pubsubs, cpubsubs...)
xpubsubs = append(xpubsubs, cxpubsubs...)
routerdealers = append(routerdealers, crouterdealers...)
}
11 changes: 0 additions & 11 deletions sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,6 @@ func (sub *subSocket) Dial(ep string) error {
if err != nil {
return err
}
// send our subscriptions to the remote end...
sub.mu.RLock()
defer sub.mu.RUnlock()
for k := range sub.topics {
topic := append([]byte{1}, k...)
msg := NewMsg(topic)
err := sub.Send(msg)
if err != nil {
return err
}
}
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions xpub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
// The returned socket value is initially unbound.
func NewXPub(ctx context.Context, opts ...Option) Socket {
xpub := &xpubSocket{newSocket(ctx, XPub, opts...)}
xpub.sck.w = newPubMWriter(xpub.sck.ctx)
xpub.sck.r = newPubQReader(xpub.sck.ctx)
return xpub
}

Expand Down
5 changes: 4 additions & 1 deletion zmq4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ func getTCPPort() (string, error) {
}

func cleanUp(ep string) {
if strings.HasPrefix(ep, "ipc://") {
switch {
case strings.HasPrefix(ep, "ipc://"):
os.Remove(ep[len("ipc://"):])
case strings.HasPrefix(ep, "inproc://"):
os.Remove(ep[len("inproc://"):])
}
}

Expand Down
182 changes: 182 additions & 0 deletions zmq4_xpubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2020 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package zmq4_test

import (
"context"
"reflect"
"sync"
"testing"
"time"

"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)

var (
xpubsubs = []testCaseXPubSub{
{
name: "tcp-xpub-sub",
endpoint: must(EndPoint("tcp")),
xpub: zmq4.NewXPub(bkg),
sub0: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub0"))),
sub1: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub1"))),
sub2: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub2"))),
},
{
name: "ipc-xpub-sub",
endpoint: "ipc://ipc-xpub-sub",
xpub: zmq4.NewXPub(bkg),
sub0: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub0"))),
sub1: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub1"))),
sub2: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub2"))),
},
{
name: "inproc-xpub-sub",
endpoint: "inproc://inproc-xpub-sub",
xpub: zmq4.NewXPub(bkg),
sub0: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub0"))),
sub1: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub1"))),
sub2: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub2"))),
},
}
)

type testCaseXPubSub struct {
name string
skip bool
endpoint string
xpub zmq4.Socket
sub0 zmq4.Socket
sub1 zmq4.Socket
sub2 zmq4.Socket
}

func TestXPubSub(t *testing.T) {
var (
topics = []string{"", "MSG", "msg"}
wantNumMsgs = []int{3, 1, 1}
msg0 = zmq4.NewMsgString("anything")
msg1 = zmq4.NewMsgString("MSG 1")
msg2 = zmq4.NewMsgString("msg 2")
msgs = [][]zmq4.Msg{
0: {msg0, msg1, msg2},
1: {msg1},
2: {msg2},
}
)

for i := range xpubsubs {
tc := xpubsubs[i]
t.Run(tc.name, func(t *testing.T) {
defer tc.xpub.Close()
defer tc.sub0.Close()
defer tc.sub1.Close()
defer tc.sub2.Close()

if tc.skip {
t.Skipf(tc.name)
}
//t.Parallel()

ep := tc.endpoint
cleanUp(ep)

ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
defer timeout()

nmsgs := []int{0, 0, 0}
subs := []zmq4.Socket{tc.sub0, tc.sub1, tc.sub2}

var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
wg1.Add(len(subs))
wg2.Add(len(subs))

grp, ctx := errgroup.WithContext(ctx)
grp.Go(func() error {

err := tc.xpub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
}

if addr := tc.xpub.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
}

wg1.Wait()
wg2.Wait()

time.Sleep(1 * time.Second)

for _, msg := range msgs[0] {
err = tc.xpub.Send(msg)
if err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
}
}

return err
})

for isub := range subs {
func(isub int, sub zmq4.Socket) {
grp.Go(func() error {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
}

if addr := sub.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
}

wg1.Done()
wg1.Wait()

err = sub.SetOption(zmq4.OptionSubscribe, topics[isub])
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topics[isub], err)
}

wg2.Done()
wg2.Wait()

msgs := msgs[isub]
for imsg, want := range msgs {
msg, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message %v: %w", want, err)
}
if !reflect.DeepEqual(msg, want) {
return xerrors.Errorf("sub[%d][msg=%d]: got = %v, want= %v", isub, imsg, msg, want)
}
nmsgs[isub]++
}

return err
})
}(isub, subs[isub])
}

if err := grp.Wait(); err != nil {
t.Fatalf("error: %+v", err)
}

if err := ctx.Err(); err != nil && err != context.Canceled {
t.Fatalf("error: %+v", err)
}

for i, want := range wantNumMsgs {
if want != nmsgs[i] {
t.Errorf("xsub[%d]: got %d messages, want %d msgs=%v", i, nmsgs[i], want, nmsgs)
}
}
})
}
}

0 comments on commit 018f24d

Please sign in to comment.