Skip to content

Commit 0cce3f7

Browse files
committed
nominate
0 parents  commit 0cce3f7

File tree

12 files changed

+2258
-0
lines changed

12 files changed

+2258
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea

broadcaster.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package scp
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"time"
7+
8+
"github.com/golang/protobuf/proto"
9+
"github.com/sirupsen/logrus"
10+
"github.com/xdarksome/scp/pkg/key"
11+
"github.com/xdarksome/scp/pkg/network"
12+
"google.golang.org/grpc"
13+
)
14+
15+
const optimalMessageSize = 1 << 15
16+
17+
type stream struct {
18+
messages chan *network.Message
19+
done chan struct{}
20+
}
21+
22+
func newStream() stream {
23+
return stream{
24+
make(chan *network.Message, 100),
25+
make(chan struct{}),
26+
}
27+
}
28+
29+
type broadcaster struct {
30+
nodeID key.Public
31+
port uint
32+
input chan *network.Message
33+
streams map[chan struct{}]stream
34+
queue chan stream
35+
}
36+
37+
func newBroadcaster(nodeID key.Public, port uint, input chan *network.Message) broadcaster {
38+
return broadcaster{
39+
nodeID: nodeID,
40+
input: input,
41+
port: port,
42+
streams: make(map[chan struct{}]stream),
43+
queue: make(chan stream),
44+
}
45+
}
46+
47+
func (b *broadcaster) Run() {
48+
go func() {
49+
for {
50+
l, err := net.Listen("tcp", fmt.Sprintf(":%d", b.port))
51+
if err != nil {
52+
logrus.WithError(err).Error("failed to start tcp listener")
53+
time.Sleep(time.Second)
54+
continue
55+
}
56+
server := grpc.NewServer()
57+
network.RegisterTenvisServer(server, b)
58+
if err := server.Serve(l); err != nil {
59+
logrus.WithError(err).Error("grpc server failed")
60+
time.Sleep(time.Second)
61+
continue
62+
}
63+
}
64+
}()
65+
66+
for {
67+
select {
68+
case newStream := <-b.queue:
69+
b.streams[newStream.done] = newStream
70+
case m := <-b.input:
71+
for done, stream := range b.streams {
72+
select {
73+
case <-done:
74+
delete(b.streams, done)
75+
default:
76+
m.NodeID = b.nodeID.String()
77+
stream.messages <- m
78+
}
79+
}
80+
}
81+
}
82+
}
83+
84+
func (b *broadcaster) newStream() stream {
85+
s := newStream()
86+
b.queue <- s
87+
return s
88+
}
89+
90+
func (b *broadcaster) Streaming(req *network.StreamingRequest, stream network.Tenvis_StreamingServer) error {
91+
s := b.newStream()
92+
tick := time.Tick(100 * time.Millisecond)
93+
var buffer *network.Message
94+
send := func() error {
95+
if err := stream.Send(buffer); err != nil {
96+
logrus.WithError(err).Error("failed to send message")
97+
return err
98+
}
99+
stream.Context().Done()
100+
//fmt.Println("send", buffer)
101+
102+
buffer = nil
103+
return nil
104+
}
105+
106+
var err error
107+
loop:
108+
for {
109+
select {
110+
case <-tick:
111+
if buffer != nil {
112+
if err = send(); err != nil {
113+
break loop
114+
}
115+
}
116+
case message := <-s.messages:
117+
if buffer == nil {
118+
buffer = message
119+
break
120+
}
121+
proto.Merge(buffer, message)
122+
if proto.Size(buffer) >= optimalMessageSize {
123+
if err = send(); err != nil {
124+
break loop
125+
}
126+
}
127+
}
128+
}
129+
130+
s.done <- struct{}{}
131+
return err
132+
}

0 commit comments

Comments
 (0)