This repository was archived by the owner on Jul 18, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 143
Expand file tree
/
Copy pathinmem.go
More file actions
83 lines (72 loc) · 1.35 KB
/
inmem.go
File metadata and controls
83 lines (72 loc) · 1.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package libchan
import (
"fmt"
"io"
"reflect"
)
type pSender chan<- interface{}
type pReceiver <-chan interface{}
func (s pSender) Close() error {
close(s)
return nil
}
func (s pSender) Send(msg interface{}) error {
s <- msg
return nil
}
type messageDecoder interface {
Decode(v ...interface{}) error
}
func (r pReceiver) Receive(msg interface{}) error {
rmsg, ok := <-r
if !ok {
return io.EOF
}
// check type
v := reflect.ValueOf(msg)
rv := reflect.ValueOf(rmsg)
if v.Type() == rv.Type() {
if v.Kind() == reflect.Ptr {
v.Elem().Set(rv.Elem())
} else {
v.Set(rv)
}
} else {
switch msg.(type) {
case *interface{}:
v.Elem().Set(rv)
default:
switch rval := rmsg.(type) {
case messageDecoder:
return rval.Decode(msg)
default:
return fmt.Errorf("Cannot receive %T into %T", rmsg, msg)
}
}
}
return nil
}
func (r pReceiver) SendTo(dst Sender) (int, error) {
var n int
for {
pmsg, ok := <-r
if !ok {
break
}
if err := dst.Send(pmsg); err != nil {
return n, err
}
n++
}
return n, nil
}
// Pipe returns an inmemory Sender/Receiver pair.
func Pipe() (Receiver, Sender) {
c := make(chan interface{})
return pReceiver(c), pSender(c)
}
// BufferedPipe returns an inmemory buffered pipe.
func BufferedPipe(n int) (Receiver, Sender) {
c := make(chan interface{}, n)
return pReceiver(c), pSender(c)
}