-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc.go
119 lines (106 loc) · 3.47 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package grpc
import (
"context"
"errors"
"io"
"log"
"strings"
"github.com/igumus/blockstorage"
"github.com/igumus/blockstorage/blockpb"
"github.com/igumus/blockstorage/util"
"github.com/ipfs/go-cid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Captures/Respresents grpc server endpoint information
type storageGrpc struct {
blockpb.UnimplementedBlockStorageGrpcServiceServer
storage blockstorage.BlockStorage
}
func NewBlockStorageServiceEndpoint(ctx context.Context, s blockstorage.BlockStorage) (blockpb.BlockStorageGrpcServiceServer, error) {
if s == nil {
return nil, errors.New("blockstorage: service not defined for endpoint")
}
return &storageGrpc{storage: s}, nil
}
// rpcErr - converts given error and grpc error code to grpc status error
func (s *storageGrpc) rpcError(code codes.Code, err error) error {
return status.Error(code, err.Error())
}
// GetBlock - is a RPC function defined in `store.proto` file. Accepts `blockpb.GetBlockRequest` which contains
// block cid as string. After decoding block cid string to actual cid, asks to underlying `BlockStorage` instance
// to get block
func (s *storageGrpc) GetBlock(ctx context.Context, req *blockpb.GetBlockRequest) (*blockpb.Block, error) {
ctxErr := util.CheckContext(ctx)
if ctxErr != nil {
return nil, s.rpcError(codes.Aborted, ctxErr)
}
digest := req.GetCid()
cid, decodeErr := cid.Decode(digest)
if decodeErr != nil {
return nil, s.rpcError(codes.InvalidArgument, blockstorage.ErrBlockIdentifierNotValid)
}
return s.storage.GetBlock(ctx, cid)
}
// WriteBlock - is a rpc function defined in `store.proto` file. Accepts client stream which contains
// document name and raw chunks of document content and writes to permanent object store.
//
// On successful function call, returns `nil` with code `codes.OK`. Otherwise;
// - On context error: returns associated context error with code `codes.Aborted`
// - On receive error: returns associated error with code `codes.Aborted`
// - On empty document name err: returns `ErrBlockNameEmpty` error with code `codes.InvalidArgument`
// - On other errors: returns associated error with code `codes.Internal`
func (s *storageGrpc) WriteBlock(stream blockpb.BlockStorageGrpcService_WriteBlockServer) error {
ctx := stream.Context()
ctxErr := util.CheckContext(ctx)
if ctxErr != nil {
return s.rpcError(codes.Aborted, ctxErr)
}
request, requestErr := stream.Recv()
if requestErr != nil {
log.Printf("err: receiving request failed: %s\n", requestErr.Error())
return status.Error(codes.Aborted, "cannot receive request")
}
fname := request.GetName()
fileName := strings.TrimSpace(fname)
if fileName == "" {
return s.rpcError(codes.InvalidArgument, blockstorage.ErrBlockNameEmpty)
}
pr, pw := io.Pipe()
go func() {
var retErr error = nil
for {
ctxErr := util.CheckContext(ctx)
if ctxErr != nil {
retErr = ctxErr
}
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("err: receiving chunk failed: %v\n", err)
retErr = err
break
}
_, err = pw.Write(req.GetChunkData())
if err != nil {
retErr = err
break
}
}
if retErr != nil {
pw.CloseWithError(retErr)
} else {
pw.Close()
}
}()
digest, err := s.storage.CreateBlock(ctx, fileName, pr)
if err != nil {
log.Printf("err: writing block failed: %s, %s\n", fileName, err.Error())
return s.rpcError(codes.Internal, err)
}
return stream.SendAndClose(&blockpb.WriteBlockResponse{
Cid: digest,
})
}