-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added the ability to add/receive batch data from storage, as well as upload/download data to file storage using de-serialization algorithms
- Loading branch information
Showing
12 changed files
with
1,610 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
// Package jellystore | ||
/* | ||
Copyright 2022 Jellydb in-memory database | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
package jellystore | ||
|
||
import "github.com/pkg/errors" | ||
|
||
type Config struct { | ||
Path string | ||
} | ||
|
||
func (c Config) validate() error { | ||
if c.Path == "" { | ||
return errors.New("config: path has not be empty") | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
// Package jellystore | ||
/* | ||
Copyright 2022 Jellydb in-memory database | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
package jellystore | ||
|
||
import ( | ||
"context" | ||
"encoding/binary" | ||
"fmt" | ||
"io" | ||
"os" | ||
|
||
"github.com/pkg/errors" | ||
"go.uber.org/multierr" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
// Load - loading all parameters/data from storage. | ||
// Loading data is necessary for fault-tolerant operation of in-memory storage. | ||
// Loading occurs through the directory specified in the config, | ||
// upon completion of the context, there is a possibility that | ||
// not all data can be included in the storage. | ||
// For example: | ||
// err := storage.Load(ctx) | ||
// if err != nil { | ||
// log.Fatal(err) | ||
// } | ||
// bb, err := storage.Get("some-key", 1) | ||
// if err != nil { | ||
// log.Fatal(err) | ||
// } | ||
// fmt.Println(bb) // bytes by key: "some-key" | ||
func (s *Store) Load(ctx context.Context) error { | ||
return s.load(ctx) | ||
} | ||
|
||
func (s *Store) load(ctx context.Context) error { | ||
entities, err := os.ReadDir(s.config.Path) | ||
if err != nil { | ||
return errors.Wrapf(err, "read dir by path - %s", s.config.Path) | ||
} | ||
if len(entities) == 0 { | ||
return nil | ||
} | ||
|
||
left, right := entities[:len(entities)/2], entities[len(entities)/2:] | ||
|
||
eg := errgroup.Group{} | ||
eg.Go(func() error { | ||
return s.loadEntities(ctx, left) | ||
}) | ||
|
||
eg.Go(func() error { | ||
return s.loadEntities(ctx, right) | ||
}) | ||
|
||
return errors.Wrap(eg.Wait(), "load files by entities") | ||
} | ||
|
||
const ( | ||
logFileName = "log.jelly.db" | ||
metaFileName = "meta.jelly.format" | ||
) | ||
|
||
func (s *Store) loadEntities(ctx context.Context, entities []os.DirEntry) error { | ||
for _, e := range entities { | ||
select { | ||
case <-ctx.Done(): | ||
return errors.New("failed to load all file data") | ||
default: | ||
if err := s.loadByFile(e.Name()); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
const ( | ||
messageLen = 4 | ||
) | ||
|
||
func (s *Store) loadByFile(key string) (err error) { | ||
if s.mpstate == nil { | ||
return nil | ||
} | ||
|
||
pdata := fmt.Sprintf("%s/%s/%s", s.config.Path, key, logFileName) | ||
|
||
metaInfo, err := openMeta(fmt.Sprintf("%s/%s/%s", s.config.Path, key, metaFileName)) | ||
if err != nil { | ||
return err | ||
} | ||
defer multierr.AppendInvoke(&err, multierr.Close(metaInfo)) | ||
|
||
writtenOffset, err := metaInfo.written.offset() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
committedOffset, err := metaInfo.committed.offset() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
s.setWrittenOffset(key, writtenOffset.int64(), committedOffset.int64()) | ||
if committedOffset.uint32() == writtenOffset.uint32() { | ||
return nil | ||
} | ||
|
||
dataFile, err := os.OpenFile(pdata, os.O_RDONLY, os.ModePerm) | ||
if err != nil { | ||
return errors.Wrap(err, "open data file") | ||
} | ||
|
||
iteration := int64(committedOffset) | ||
for { | ||
bb := make([]byte, maxMessageSize+messageLen) | ||
_, err = dataFile.ReadAt(bb, iteration) | ||
if err != nil { | ||
if errors.Is(err, io.EOF) { | ||
break | ||
} | ||
|
||
return errors.Wrapf(err, "read messages by key %s from path %s", key, pdata) | ||
} | ||
|
||
length := binary.LittleEndian.Uint32(bb[:messageLen]) | ||
err = s.Set(key, bb[messageLen:messageLen+length]) | ||
if err != nil { | ||
return errors.Wrapf(err, "set memorry by key %s from path %s", key, pdata) | ||
} | ||
iteration += messageLen + maxMessageSize | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
// Package store | ||
/* | ||
Copyright 2022 Jellydb in-memory database | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
package jellystore | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestStore_Load(t *testing.T) { | ||
tests := []struct { | ||
Name string | ||
Key string | ||
Get int64 | ||
Commit int64 | ||
Batch [][]byte | ||
Want int | ||
}{ | ||
{ | ||
Name: "simple", | ||
Key: "simple-load", | ||
Batch: [][]byte{ | ||
[]byte("message1"), | ||
[]byte("message2"), | ||
[]byte("message3"), | ||
[]byte("message4"), | ||
[]byte("message5"), | ||
}, | ||
Get: 5, | ||
Want: 5, | ||
}, | ||
{ | ||
Name: "all-committed", | ||
Key: "all-committed-load", | ||
Batch: [][]byte{ | ||
[]byte("message1"), | ||
[]byte("message2"), | ||
[]byte("message3"), | ||
[]byte("message4"), | ||
[]byte("message5"), | ||
}, | ||
Get: 5, | ||
Commit: 5, | ||
Want: 0, | ||
}, | ||
{ | ||
Name: "one-committed", | ||
Key: "one-committed-load", | ||
Batch: [][]byte{ | ||
[]byte("message1"), | ||
[]byte("message2"), | ||
[]byte("message3"), | ||
[]byte("message4"), | ||
[]byte("message5"), | ||
}, | ||
Get: 5, | ||
Commit: 1, | ||
Want: 4, | ||
}, | ||
{ | ||
Name: "two-committed", | ||
Key: "two-committed-load", | ||
Batch: [][]byte{ | ||
[]byte("message1"), | ||
[]byte("message2"), | ||
[]byte("message3"), | ||
[]byte("message4"), | ||
[]byte("message5"), | ||
}, | ||
Get: 5, | ||
Commit: 2, | ||
Want: 3, | ||
}, | ||
{ | ||
Name: "99-to-1", | ||
Key: "99-to-1-load", | ||
Batch: [][]byte{ | ||
[]byte("message1"), | ||
[]byte("message2"), | ||
[]byte("message3"), | ||
[]byte("message4"), | ||
[]byte("message5"), | ||
}, | ||
Get: 5, | ||
Commit: 4, | ||
Want: 1, | ||
}, | ||
} | ||
|
||
for _, tt := range tests { | ||
t.Run(tt.Name, func(t *testing.T) { | ||
err := os.RemoveAll(testPath + "/" + tt.Key) | ||
require.NoError(t, err) | ||
|
||
unloadStore, err := New(testConfig) | ||
require.NoError(t, err) | ||
for _, bb := range tt.Batch { | ||
err := unloadStore.Set(tt.Key, bb) | ||
require.NoError(t, err) | ||
} | ||
|
||
err = unloadStore.Commit(tt.Key, tt.Commit) | ||
require.NoError(t, err) | ||
|
||
err = unloadStore.Unload(context.Background()) | ||
require.NoError(t, err) | ||
|
||
loadStore, err := New(testConfig) | ||
require.NoError(t, err) | ||
|
||
err = loadStore.Load(context.Background()) | ||
require.NoError(t, err) | ||
|
||
bb, err := loadStore.Get(tt.Key, tt.Get) | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, tt.Want, len(bb)) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
// Package jellystore | ||
/* | ||
Copyright 2022 Jellydb in-memory database | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
package jellystore | ||
|
||
import ( | ||
"os" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/baibikov/jellydb/pkg/utils" | ||
) | ||
|
||
type log struct { | ||
file *os.File | ||
} | ||
|
||
func openLog(path string) (*log, error) { | ||
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "open logfile by path - %s", path) | ||
} | ||
|
||
return &log{ | ||
file: file, | ||
}, nil | ||
} | ||
|
||
func (l *log) Close() error { | ||
return l.file.Close() | ||
} | ||
|
||
func (l *log) write(bb []byte) error { | ||
err := utils.Uint32ToWriter(l.file, messageLen, uint32(len(bb))) | ||
if err != nil { | ||
return errors.Wrap(err, "write message-len") | ||
} | ||
|
||
mb := make([]byte, maxMessageSize) | ||
copy(mb, bb) | ||
_, err = l.file.Write(mb) | ||
return errors.Wrap(err, "write message") | ||
} |
Oops, something went wrong.