/
impl.go
139 lines (123 loc) · 3.89 KB
/
impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package blockstorage
import (
"bytes"
"context"
"io"
"log"
"strings"
"github.com/igumus/blockstorage/blockpb"
"github.com/ipfs/go-cid"
)
// GetBlock - reads block with given cid (aka content identifier) from underlying object store.
//
// Flow:
// 1. Finds store that contains block with given cid
// 1.1. checks permanent object store already has block with given cid, If exists reads from permanent object store.
// 1.2. checks temporary object store already has block with given cid, If exists reads from temporary object store.
// 1.3. asks p2p network to provide block with given cid, If founds any provider, stores block to temporary object store.
// 2. Decodes/Unmarshals binary form of block to proto object instance.
// 3. Returns proto instance (`blockpb.Block`) without error
//
// Error:
// When any of the flow operations fail, returns `nil` with error cause
func (s *storage) GetBlock(ctx context.Context, cid cid.Cid) (*blockpb.Block, error) {
var data []byte
var err error
if s.localStore.HasObject(ctx, cid) {
data, err = s.localStore.ReadObject(ctx, cid)
} else {
data, err = s.peer.GetRemoteBlock(ctx, cid)
}
if err != nil {
return nil, err
}
return blockpb.Decode(data)
}
// persistBlock - is a helper function that persists given block instance to permanent store.
//
// Flow:
// 1. Encodes/Marshals block proto object instance to binary
// 2. Persists binary content to permanent store.
// 3. Announces block ownership to p2p network.
// 4. Returns proto object instance reference (`blockpb.Link`)
//
// Error:
// When any of the flow operations fail, returns `nil` with error cause
func (s *storage) persistBlock(ctx context.Context, block *blockpb.Block) (*blockpb.Link, error) {
blockBin, encodeErr := blockpb.Encode(block)
if encodeErr != nil {
return nil, encodeErr
}
digest, persistErr := s.localStore.CreateObject(ctx, bytes.NewReader(blockBin))
if persistErr != nil {
return nil, persistErr
}
if s.debug {
log.Printf("debug: wrote block with digest: %s, %d\n", digest.String(), len(block.Data))
}
s.peer.AnnounceBlock(ctx, digest)
return &blockpb.Link{
Hash: digest.String(),
Tsize: uint64(len(block.Data)),
}, nil
}
// persistBlockWithData - creates and persists block which only have `Data` field with given byte slice.
func (s *storage) persistBlockWithData(ctx context.Context, data []byte) (*blockpb.Link, error) {
block := &blockpb.Block{
Data: data,
}
return s.persistBlock(ctx, block)
}
// CreateBlock - creates block with given `name` in underlying objectstore.
//
// Flow:
// 1. Validates file name
// 2. Reads `chunkSize` (default: 512KB) of data from `reader`
// 2.1 On each reading step persists DAG (Directed Acyclic Graph) leaf nodes to permanent store.
// 3. Creates root node to associate with leaf nodes.
// 4. Persists root of DAG to permanent store.
//
// Error:
// - When `fname` is not valid returns `"", ErrBlockNameEmpty`
// - When reading from `reader` fails returns `"", <Reader Failure Error>`
// - When reader not contains any data, returns `"",ErrBlockDataEmpty`
func (s *storage) CreateBlock(ctx context.Context, fname string, reader io.Reader) (string, error) {
name := strings.TrimSpace(fname)
if name == "" {
return "", ErrBlockNameEmpty
}
root := &blockpb.Block{
Name: name,
}
links := make([]*blockpb.Link, 0)
totalSize := uint64(0)
var buf []byte
for {
if ctx.Err() != nil {
return "", ctx.Err()
}
buf = make([]byte, s.chunkSize)
n, err := reader.Read(buf)
if err != nil {
if err != io.EOF {
return "", err
}
break
}
link, linkErr := s.persistBlockWithData(ctx, buf[:n])
if linkErr != nil {
return "", linkErr
}
links = append(links, link)
totalSize += uint64(n)
}
if len(links) < 1 {
return "", ErrBlockDataEmpty
}
root.Links = append(root.Links, links...)
rootLink, rootLinkErr := s.persistBlock(ctx, root)
if rootLinkErr != nil {
return "", rootLinkErr
}
return rootLink.Hash, nil
}