Skip to content

Commit a3ae702

Browse files
committed
Large refactor of the datastore layer
* introduce new storage interface that does not perform any validation on the data * add BlobName struct encapsulating blob names, blob names encode information about blob type and its hash * introduce blob type handler interface to deal with different blob types, currently only static blobs are supported * simplify datastore interface - methods do not return readers or writers anymore, instead those expect input or output stream to be given as an argument
1 parent b976671 commit a3ae702

35 files changed

+1913
-2236
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,5 @@ _testmain.go
2525

2626
coverage.out
2727
*.coverprofile
28+
29+
*.log

.vscode/settings.json

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,15 @@
11
{
22
"go.autocompleteUnimportedPackages": true,
3-
"go.coverOnSave": true
3+
"go.coverOnSave": true,
4+
"cSpell.words": [
5+
"blobtype",
6+
"blobtypes",
7+
"bmap",
8+
"cinode",
9+
"ciphertext",
10+
"Hasher",
11+
"coverprofile",
12+
"goveralls",
13+
"shogo"
14+
]
415
}

datastore/blob_name.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package datastore
2+
3+
import (
4+
"errors"
5+
6+
base58 "github.com/jbenet/go-base58"
7+
)
8+
9+
var (
10+
ErrInvalidBlobName = errors.New("invalid blob name")
11+
)
12+
13+
// BlobName is used to identify blobs.
14+
// Internally it is a single array of bytes that represents
15+
// both the type of the blob and internal hash used to create that blob.
16+
// The type of the blob is not stored directly. Instead it is mixed
17+
// with the hash of the blob to make sure that all bytes in the blob name
18+
// are randomly distributed.
19+
type BlobName []byte
20+
21+
// BlobNameFromHashAndType generates the name of a blob from some hash (e.g. sha256 of blob's content)
22+
// and given blob type
23+
func BlobNameFromHashAndType(hash []byte, t BlobType) (BlobName, error) {
24+
if len(hash) == 0 {
25+
return nil, ErrInvalidBlobName
26+
}
27+
28+
ret := make([]byte, len(hash)+1)
29+
30+
copy(ret[1:], hash)
31+
32+
scrambledTypeByte := byte(t)
33+
for _, b := range hash {
34+
scrambledTypeByte ^= b
35+
}
36+
ret[0] = scrambledTypeByte
37+
38+
return BlobName(ret), nil
39+
}
40+
41+
// BlobNameFromString decodes base58-encoded string into blob name
42+
func BlobNameFromString(s string) (BlobName, error) {
43+
decoded := base58.Decode(s)
44+
if len(decoded) == 0 {
45+
return nil, ErrInvalidBlobName
46+
}
47+
return BlobName(decoded), nil
48+
}
49+
50+
// Returns base58-encoded blob name
51+
func (b BlobName) String() string {
52+
return base58.Encode(b)
53+
}
54+
55+
// Extracts hash from blob name
56+
func (b BlobName) Hash() []byte {
57+
return b[1:]
58+
}
59+
60+
// Extracts blob type from the name
61+
func (b BlobName) Type() BlobType {
62+
ret := byte(0)
63+
for _, by := range b {
64+
ret ^= by
65+
}
66+
return BlobType(ret)
67+
}

datastore/blob_name_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package datastore
2+
3+
import (
4+
"crypto/sha256"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestBlobName(t *testing.T) {
13+
for _, h := range [][]byte{
14+
{0}, {1}, {2}, {0xFE}, {0xFF},
15+
{0, 0, 0, 0},
16+
{1, 2, 3, 4, 5, 6},
17+
sha256.New().Sum(nil),
18+
} {
19+
for _, bt := range []BlobType{0, 1, 2, 0xFE, 0xFF} {
20+
t.Run(fmt.Sprintf("%v:%v", bt, h), func(t *testing.T) {
21+
bn, err := BlobNameFromHashAndType(h, bt)
22+
assert.NoError(t, err)
23+
assert.NotEmpty(t, bn)
24+
assert.Greater(t, len(bn), len(h))
25+
assert.Equal(t, h, bn.Hash())
26+
assert.Equal(t, bt, bn.Type())
27+
28+
s := bn.String()
29+
bn2, err := BlobNameFromString(s)
30+
require.NoError(t, err)
31+
require.Equal(t, bn, bn2)
32+
})
33+
}
34+
}
35+
36+
_, err := BlobNameFromString("!@#")
37+
require.ErrorIs(t, err, ErrInvalidBlobName)
38+
39+
_, err = BlobNameFromString("")
40+
require.ErrorIs(t, err, ErrInvalidBlobName)
41+
42+
_, err = BlobNameFromHashAndType(nil, 0x00)
43+
require.ErrorIs(t, err, ErrInvalidBlobName)
44+
}

datastore/blob_type_handler.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package datastore
2+
3+
import "io"
4+
5+
type BlobType byte
6+
7+
type BlobTypeHandler interface {
8+
// Type returns the type of blobs supported by this handler
9+
Type() BlobType
10+
11+
Ingest(hash []byte, current, update io.Reader, result io.Writer) error
12+
13+
Validate(hash []byte, data io.Reader, validated io.Writer) error
14+
}

datastore/blob_type_handlers_list.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package datastore
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
)
7+
8+
var handlers = []BlobTypeHandler{
9+
NewStaticBlobHandlerSha256(),
10+
}
11+
12+
var (
13+
ErrUnknownBlobType = errors.New("unknown blob type")
14+
)
15+
16+
func handlerForType(t BlobType) (BlobTypeHandler, error) {
17+
for _, h := range handlers {
18+
if h.Type() == t {
19+
return h, nil
20+
}
21+
}
22+
23+
return nil, fmt.Errorf("%w (%d)", ErrUnknownBlobType, t)
24+
}

datastore/blob_type_static.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package datastore
2+
3+
import (
4+
"crypto/sha256"
5+
"fmt"
6+
"hash"
7+
"io"
8+
)
9+
10+
var (
11+
ErrInvalidStaticBlobHash = fmt.Errorf("%w: invalid static blob hash", ErrValidationFailed)
12+
)
13+
14+
const (
15+
blobTypeStatic BlobType = 0x00
16+
)
17+
18+
type staticBlobHandler struct {
19+
blobType BlobType
20+
newHasher func() hash.Hash
21+
}
22+
23+
func (h staticBlobHandler) Type() BlobType {
24+
return h.blobType
25+
}
26+
27+
func (h staticBlobHandler) Ingest(hash []byte, current, update io.Reader, w io.Writer) error {
28+
// TODO: Optimize - at this point we are not required to ingest the update stream
29+
// so if the current stream is valid (hash correct hash), we should cancel the
30+
// ingestion (but don't return an error, instead say that update is not needed)
31+
return h.Validate(hash, update, w)
32+
}
33+
34+
func (h staticBlobHandler) Validate(hash []byte, data io.Reader, validated io.Writer) error {
35+
_, err := io.Copy(
36+
validated,
37+
NewChecksumReader(
38+
data,
39+
h.newHasher(),
40+
hash,
41+
ErrInvalidStaticBlobHash,
42+
),
43+
)
44+
return err
45+
}
46+
47+
func NewStaticBlobHandlerSha256() BlobTypeHandler {
48+
return staticBlobHandler{
49+
blobType: blobTypeStatic,
50+
newHasher: sha256.New,
51+
}
52+
}

datastore/blob_type_static_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package datastore
2+
3+
import (
4+
"bytes"
5+
"crypto/sha256"
6+
"errors"
7+
"io"
8+
"testing"
9+
"testing/iotest"
10+
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func getUpdatedBlob(bt BlobTypeHandler, hash []byte, current, update []byte) ([]byte, error) {
15+
var currentReader io.Reader
16+
if current != nil {
17+
currentReader = bytes.NewReader(current)
18+
}
19+
20+
return getUpdatedBlobWithCurrentReader(bt, hash, currentReader, update)
21+
}
22+
23+
func getUpdatedBlobWithCurrentReader(bt BlobTypeHandler, hash []byte, currentReader io.Reader, update []byte) ([]byte, error) {
24+
25+
output := bytes.NewBuffer(nil)
26+
27+
err := bt.Ingest(hash[:], currentReader, bytes.NewReader(update), output)
28+
if err != nil {
29+
return nil, err
30+
}
31+
32+
return output.Bytes(), nil
33+
}
34+
35+
func TestStaticBlobHandler(t *testing.T) {
36+
bt := NewStaticBlobHandlerSha256()
37+
38+
require.EqualValues(t, bt.Type(), 0x00)
39+
40+
t.Run("ingest a new correct blob", func(t *testing.T) {
41+
data := []byte("hello world!")
42+
hash := sha256.Sum256(data)
43+
44+
dataBack, err := getUpdatedBlob(bt, hash[:], nil, data)
45+
require.NoError(t, err)
46+
require.Equal(t, data, dataBack)
47+
})
48+
49+
t.Run("ingest a new incorrect blob - hash of wrong data", func(t *testing.T) {
50+
data := []byte("hello world!")
51+
hash := sha256.Sum256(append(data, 1))
52+
53+
dataBack, err := getUpdatedBlob(bt, hash[:], nil, data)
54+
require.ErrorIs(t, err, ErrInvalidStaticBlobHash)
55+
require.ErrorIs(t, err, ErrValidationFailed)
56+
require.Nil(t, dataBack)
57+
})
58+
59+
t.Run("ingest a new incorrect blob - hash size mismatch", func(t *testing.T) {
60+
data := []byte("hello world!")
61+
hash := sha256.Sum256(data)
62+
63+
dataBack, err := getUpdatedBlob(bt, hash[:len(hash)-1], nil, data)
64+
require.ErrorIs(t, err, ErrInvalidStaticBlobHash)
65+
require.ErrorIs(t, err, ErrValidationFailed)
66+
require.Nil(t, dataBack)
67+
})
68+
69+
t.Run("ingest a correct update", func(t *testing.T) {
70+
data := []byte("hello world!")
71+
hash := sha256.Sum256(data)
72+
73+
dataBack, err := getUpdatedBlob(bt, hash[:], data, data)
74+
require.NoError(t, err)
75+
require.Equal(t, data, dataBack)
76+
})
77+
78+
t.Run("ingest an incorrect update - hash of wrong data", func(t *testing.T) {
79+
data := []byte("hello world!")
80+
hash := sha256.Sum256(data)
81+
82+
_, err := getUpdatedBlob(bt, hash[:], data, append(data, 1))
83+
require.ErrorIs(t, err, ErrInvalidStaticBlobHash)
84+
require.ErrorIs(t, err, ErrValidationFailed)
85+
})
86+
87+
t.Run("ingest a new incorrect blob - hash size mismatch", func(t *testing.T) {
88+
data := []byte("hello world!")
89+
hash := sha256.Sum256(data)
90+
91+
dataBack, err := getUpdatedBlob(bt, hash[:len(hash)-1], data, data)
92+
require.ErrorIs(t, err, ErrInvalidStaticBlobHash)
93+
require.ErrorIs(t, err, ErrValidationFailed)
94+
require.Nil(t, dataBack)
95+
})
96+
97+
t.Run("recover from a broken local data - hash mismatch", func(t *testing.T) {
98+
data := []byte("hello world!")
99+
hash := sha256.Sum256(data)
100+
101+
dataBack, err := getUpdatedBlob(bt, hash[:], append(data, 1), data)
102+
require.NoError(t, err)
103+
require.Equal(t, data, dataBack)
104+
})
105+
106+
t.Run("recover from a broken local data - read error", func(t *testing.T) {
107+
data := []byte("hello world!")
108+
hash := sha256.Sum256(data)
109+
110+
dataBack, err := getUpdatedBlobWithCurrentReader(
111+
bt, hash[:], iotest.ErrReader(errors.New("err")), data,
112+
)
113+
require.NoError(t, err)
114+
require.Equal(t, data, dataBack)
115+
})
116+
117+
}

datastore/checksumreader.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package datastore
2+
3+
import (
4+
"crypto/subtle"
5+
"errors"
6+
"hash"
7+
"io"
8+
)
9+
10+
type checksumReader struct {
11+
src io.Reader
12+
hasher hash.Hash
13+
expectedHash []byte
14+
hashMismatchError error
15+
}
16+
17+
func (cr *checksumReader) Read(b []byte) (int, error) {
18+
n, err := cr.src.Read(b)
19+
cr.hasher.Write(b[:n])
20+
21+
if errors.Is(err, io.EOF) {
22+
calculatedHash := cr.hasher.Sum(nil)
23+
if subtle.ConstantTimeCompare(cr.expectedHash, calculatedHash) != 1 {
24+
return n, cr.hashMismatchError
25+
}
26+
}
27+
28+
return n, err
29+
}
30+
31+
func NewChecksumReader(src io.Reader, hasher hash.Hash, expectedHash []byte, hashMismatchError error) io.Reader {
32+
return &checksumReader{
33+
src: src,
34+
hasher: hasher,
35+
expectedHash: expectedHash,
36+
hashMismatchError: hashMismatchError,
37+
}
38+
}

0 commit comments

Comments
 (0)