Skip to content

Commit

Permalink
Merge pull request #7 from 0chain/feat/enterprise-blobber
Browse files Browse the repository at this point in the history
Merge sprint-1.17
  • Loading branch information
dabasov authored Aug 25, 2024
2 parents 2ddc42d + 970cd10 commit c0ae8c0
Show file tree
Hide file tree
Showing 14 changed files with 404 additions and 22 deletions.
9 changes: 9 additions & 0 deletions core/sys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ type FS interface {

// RemoveProgress remove progress
RemoveProgress(progressID string) error

// Create Directory
CreateDirectory(dirID string) error

// GetFileHandler
GetFileHandler(dirID, name string) (File, error)

// Remove all created directories(used in download directory)
RemoveAllDirectories()
}

type File interface {
Expand Down
24 changes: 24 additions & 0 deletions core/sys/fs_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sys
import (
"io/fs"
"os"
"path/filepath"
)

// DiskFS implement file system on disk
Expand All @@ -23,6 +24,12 @@ func (dfs *DiskFS) Open(name string) (File, error) {
}

func (dfs *DiskFS) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
dir := filepath.Dir(name)
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0744); err != nil {
return nil, err
}
}
return os.OpenFile(name, flag, perm)
}

Expand Down Expand Up @@ -64,3 +71,20 @@ func (dfs *DiskFS) SaveProgress(progressID string, data []byte, perm fs.FileMode
func (dfs *DiskFS) RemoveProgress(progressID string) error {
return dfs.Remove(progressID)
}

func (dfs *DiskFS) CreateDirectory(_ string) error {
return nil
}

func (dfs *DiskFS) GetFileHandler(_, name string) (File, error) {
dir := filepath.Dir(name)
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0744); err != nil {
return nil, err
}
}
return os.OpenFile(name, os.O_CREATE|os.O_WRONLY, 0644)
}

func (dfs *DiskFS) RemoveAllDirectories() {
}
70 changes: 70 additions & 0 deletions core/sys/fs_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@
package sys

import (
"errors"
"io/fs"
"os"
"path/filepath"
"strings"
"syscall/js"
"time"

"github.com/0chain/gosdk/wasmsdk/jsbridge"
)

// MemFS implement file system on memory
type MemFS struct {
files map[string]*MemFile
dirs map[string]js.Value
}

// NewMemFS create MemFS instance
func NewMemFS() FS {
return &MemFS{
files: make(map[string]*MemFile),
dirs: make(map[string]js.Value),
}
}

Expand Down Expand Up @@ -120,3 +126,67 @@ func (mfs *MemFS) RemoveProgress(progressID string) error {
js.Global().Get("localStorage").Call("removeItem", key)
return nil
}

func (mfs *MemFS) CreateDirectory(dirID string) error {
if !js.Global().Get("showDirectoryPicker").Truthy() || !js.Global().Get("WritableStream").Truthy() {
return errors.New("dir_picker: not supported")
}
showDirectoryPicker := js.Global().Get("showDirectoryPicker")
dirHandle, err := jsbridge.Await(showDirectoryPicker.Invoke())
if len(err) > 0 && !err[0].IsNull() {
return errors.New("dir_picker: " + err[0].String())
}
mfs.dirs[dirID] = dirHandle[0]
return nil
}

func (mfs *MemFS) GetFileHandler(dirID, path string) (File, error) {
dirHandler, ok := mfs.dirs[dirID]
if !ok {
return nil, errors.New("dir_picker: directory not found")
}
currHandler, err := mfs.mkdir(dirHandler, filepath.Dir(path))
if err != nil {
return nil, err
}
return jsbridge.NewFileWriterFromHandle(currHandler, filepath.Base(path))
}

func (mfs *MemFS) RemoveAllDirectories() {
for k := range mfs.dirs {
delete(mfs.dirs, k)
}
}

func (mfs *MemFS) mkdir(dirHandler js.Value, dirPath string) (js.Value, error) {
if dirPath == "/" {
return dirHandler, nil
}
currHandler, ok := mfs.dirs[dirPath]
if !ok {
currHandler = dirHandler
paths := strings.Split(dirPath, "/")
paths = paths[1:]
currPath := "/"
for _, path := range paths {
currPath = filepath.Join(currPath, path)
handler, ok := mfs.dirs[currPath]
if ok {
currHandler = handler
continue
}
options := js.Global().Get("Object").New()
options.Set("create", true)
currHandlers, err := jsbridge.Await(currHandler.Call("getDirectoryHandle", path, options))
if len(err) > 0 && !err[0].IsNull() {
return js.Value{}, errors.New("dir_picker: " + err[0].String())
}
currHandler = currHandlers[0]
mfs.dirs[currPath] = currHandler
}
if !currHandler.Truthy() {
return js.Value{}, errors.New("dir_picker: failed to create directory")
}
}
return currHandler, nil
}
6 changes: 6 additions & 0 deletions core/util/validation_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ func (v *ValidationTree) Finalize() error {
v.isFinalized = true

if v.writeCount > 0 {
if v.leafIndex == len(v.leaves) {
// increase leaves size
leaves := make([][]byte, len(v.leaves)+1)
copy(leaves, v.leaves)
v.leaves = leaves
}
v.leaves[v.leafIndex] = v.h.Sum(nil)
} else {
v.leafIndex--
Expand Down
55 changes: 55 additions & 0 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (

const FileOperationInsert = "insert"

var (
downloadDirContextMap = make(map[string]context.CancelCauseFunc)
downloadDirLock = sync.Mutex{}
)

// listObjects list allocation objects from its blobbers
// - allocationID is the allocation id
// - remotePath is the remote path of the file
Expand Down Expand Up @@ -1145,6 +1150,56 @@ func createWorkers(allocationID string) error {
return addWebWorkers(alloc)
}

// downloadDirectory download directory to local file system using fs api, will only work in browsers where fs api is available
// - allocationID : allocation ID of the file
// - remotePath : remote path of the directory
// - authticket : auth ticket of the file, if the file is shared
// - callbackFuncName : callback function name to get the progress of the download
func downloadDirectory(allocationID, remotePath, authticket, callbackFuncName string) error {
alloc, err := getAllocation(allocationID)
if err != nil {
return err
}
wg := &sync.WaitGroup{}
wg.Add(1)
statusBar := &StatusBar{wg: wg}
if callbackFuncName != "" {
callback := js.Global().Get(callbackFuncName)
statusBar.callback = func(totalBytes, completedBytes int, filename, objURL, err string) {
callback.Invoke(totalBytes, completedBytes, filename, objURL, err)
}
}
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
errChan := make(chan error, 1)
go func() {
errChan <- alloc.DownloadDirectory(ctx, remotePath, "", authticket, statusBar)
}()
downloadDirLock.Lock()
downloadDirContextMap[remotePath] = cancel
downloadDirLock.Unlock()
select {
case err = <-errChan:
if err != nil {
PrintError("Error in download directory: ", err)
}
return err
case <-ctx.Done():
return context.Cause(ctx)
}
}

// cancelDownloadDirectory cancel the download directory operation
// - remotePath : remote path of the directory
func cancelDownloadDirectory(remotePath string) {
downloadDirLock.Lock()
cancel, ok := downloadDirContextMap[remotePath]
if ok {
cancel(errors.New("download directory canceled by user"))
}
downloadDirLock.Unlock()
}

func startListener() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
20 changes: 19 additions & 1 deletion wasmsdk/jsbridge/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type FileWriter struct {
writeError bool
}

const writeBlocks = 60
const writeBlocks = 10

// len(p) will always be <= 64KB
func (w *FileWriter) Write(p []byte) (int, error) {
Expand Down Expand Up @@ -130,3 +130,21 @@ func NewFileWriter(filename string) (*FileWriter, error) {
fileHandle: fileHandle[0],
}, nil
}

func NewFileWriterFromHandle(dirHandler js.Value, name string) (*FileWriter, error) {
options := js.Global().Get("Object").New()
options.Set("create", true)
fileHandler, err := Await(dirHandler.Call("getFileHandle", name, options))
if len(err) > 0 && !err[0].IsNull() {
return nil, errors.New("dir_picker: " + err[0].String())
}

writableStream, err := Await(fileHandler[0].Call("createWritable"))
if len(err) > 0 && !err[0].IsNull() {
return nil, errors.New("file_writer: " + err[0].String())
}
return &FileWriter{
writableStream: writableStream[0],
fileHandle: fileHandler[0],
}, nil
}
2 changes: 2 additions & 0 deletions wasmsdk/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func main() {
"terminateWorkers": terminateWorkers,
"createWorkers": createWorkers,
"getFileMetaByName": getFileMetaByName,
"downloadDirectory": downloadDirectory,
"cancelDownloadDirectory": cancelDownloadDirectory,

// player
"play": play,
Expand Down
6 changes: 3 additions & 3 deletions zboxcore/fileref/fileref.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const CHUNK_SIZE = 64 * 1024

const (
// FILE represents a file for fileref
FILE = "f"
FILE = "f"

// DIRECTORY represents a directory for fileref
DIRECTORY = "d"
Expand Down Expand Up @@ -103,8 +103,8 @@ type Ref struct {
}

// GetReferenceLookup returns the lookup hash for a given allocationID and path
// - allocationID: allocation ID
// - path: path of the file
// - allocationID: allocation ID
// - path: path of the file
func GetReferenceLookup(allocationID string, path string) string {
return encryption.Hash(allocationID + ":" + path)
}
Expand Down
Loading

0 comments on commit c0ae8c0

Please sign in to comment.