-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathmessage_test.go
111 lines (92 loc) · 2.17 KB
/
message_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package messaging
import (
"context"
"fmt"
"sync"
"testing"
"time"
)
////////////////////////////////
//通常意义上是,连接消息队列之后就可以发送消息
//当订阅著之后才会收到相关Topic消息的推送
//当前,省略连接队列的步骤和操作代码
////////////////////////////////
func TestMessageSubAndPubWithTopic(t *testing.T) {
var wg sync.WaitGroup
topicName := "seeking passengers"
//假设消息队列已经收到数据,加下来由topic处理
topic := Topic{
Name: topicName,
UserQueueSize: 5,
}
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
//user 1
//用户tom订阅拼车消息,订阅的是车主发布的拼车消息
if subScriberTom, ok := topic.Subscribe(123, topicName); ok {
go func() {
defer wg.Done()
EXIT:
for {
select {
case <-ctx.Done():
fmt.Println("tom receive cancel, exit")
break EXIT
default:
msg := Message{}
err := subScriberTom.Receive(&msg)
if err == nil {
fmt.Println("tom receive subscribed msg:", msg)
}
}
time.Sleep(200)
}
}()
}
wg.Add(1)
//订阅成功了
//发送一个消息
//用户Lily订阅拼车消息,订阅的是车主发布的拼车消息
if subSCriptionLily, ok := topic.Subscribe(456, topicName); ok {
go func() {
defer wg.Done()
EXIT:
for {
select {
case <-ctx.Done():
fmt.Println("lily receive cancel, exit")
break EXIT
default:
msg := Message{}
err := subSCriptionLily.Receive(&msg)
if err == nil {
fmt.Println("lily receive subscribed msg:", msg)
}
}
time.Sleep(200)
}
}()
}
go func() {
//模拟发送消息
msg := Message{
Text: "i am looking for 1 passenger",
From: Session{User{123, "lily"}, time.Now()},
}
topic.Publish(msg)
msg = Message{
Text: "i am looking for 2 passenger",
From: Session{User{123, "lucy"}, time.Now()},
}
topic.Publish(msg)
msg = Message{
Text: "i am looking for passenger as many as i can",
From: Session{User{123, "rose"}, time.Now()},
}
topic.Publish(msg)
time.Sleep(time.Second)
cancel()
}()
wg.Wait()
fmt.Println("all message done,exit it")
}