-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathmessage.go
158 lines (131 loc) · 3.72 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package messaging
import (
"context"
"errors"
"fmt"
"time"
)
//Message for msg in Message bus
type Message struct {
Seq int
Text string
From Session //消息来源
}
//User for user
type User struct {
ID uint64
Name string
}
//Session inherit user
type Session struct {
User
Timestamp time.Time
}
//Subscription for user
//Subscription is a session
type Subscription struct {
Session
topicName string
ctx context.Context
cancel context.CancelFunc
Ch chan<- Message //发送队列
Inbox chan Message //接收消息的队列
}
func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscription {
ctx, cancel := context.WithCancel(context.Background())
return Subscription{
Session: Session{User{ID: uid}, time.Now()},
ctx: ctx,
cancel: cancel,
Ch: make(chan<- Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户发送消息
Inbox: make(chan Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户接收消息
}
}
//Cancel Message
func (s *Subscription) Cancel() {
s.cancel()
}
//Publish 这个表示用户订阅到感兴趣的主题的时候,同时也可以发送消息,
//Publish 但是,示例中不演示这个用途
//Publish 只有当channel无数据,且channel被close了,才会返回ok=false
//Publish a message to subscription queue
func (s *Subscription) Publish(msg Message) error {
select {
case <-s.ctx.Done():
return errors.New("Topic has been closed")
default:
s.Ch <- msg
}
return nil
}
//Receive message
func (s *Subscription) Receive(out *Message) error {
select {
case <-s.ctx.Done():
return errors.New("Topic has been closed")
case <-time.After(time.Millisecond * 100):
return errors.New("time out error")
case *out = <-s.Inbox:
return nil
}
}
//Topic that user is interested in
//Topic should locate in MQ
type Topic struct {
UserQueueSize int
Name string
Subscribers map[uint64]Subscription //user list
MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间
}
//Publish 只有当channel无数据,且channel被close了,才会返回ok=false
//Publish a message to subscription queue
func (t *Topic) Publish(msg Message) error {
//将消息发布给当前Topic的所有人
for usersID, subscription := range t.Subscribers {
if subscription.ID == usersID {
subscription.Inbox <- msg
}
}
//save message history
t.MessageHistory = append(t.MessageHistory, msg)
fmt.Println("current histroy message count: ", len(t.MessageHistory))
return nil
}
func (t *Topic) findUserSubscription(uid uint64, topicName string) (Subscription, bool) {
// Get session or create one if it's the first
if topicName != t.Name || t.Subscribers == nil || len(t.Subscribers) == 0 {
return Subscription{}, false
}
if subscription, found := t.Subscribers[uid]; found {
return subscription, true
}
return Subscription{}, false
}
//Subscribe a spec topic
func (t *Topic) Subscribe(uid uint64, topicName string) (Subscription, bool) {
if t.Name != topicName {
return Subscription{}, false
}
// Get session or create one if it's the first
if _, found := t.findUserSubscription(uid, topicName); !found {
if t.Subscribers == nil {
t.Subscribers = make(map[uint64]Subscription)
}
t.Subscribers[uid] = newSubscription(uid, topicName, t.UserQueueSize)
}
return t.Subscribers[uid], true
}
//Unsubscribe remove Subscription
func (t *Topic) Unsubscribe(s Subscription) error {
if _, found := t.findUserSubscription(s.ID, s.topicName); found {
delete(t.Subscribers, s.ID)
}
return nil
}
//Delete topic
func (t *Topic) Delete() error {
t.Subscribers = nil
t.Name = ""
t.MessageHistory = nil
return nil
}