diff --git a/core/sys/fs.go b/core/sys/fs.go index 50a888c2f..b2b77c2d8 100644 --- a/core/sys/fs.go +++ b/core/sys/fs.go @@ -40,6 +40,15 @@ type FS interface { // RemoveProgress remove progress RemoveProgress(progressID string) error + + // Create Directory + CreateDirectory(dirID string) error + + // GetFileHandler + GetFileHandler(dirID, name string) (File, error) + + // Remove all created directories(used in download directory) + RemoveAllDirectories() } type File interface { diff --git a/core/sys/fs_disk.go b/core/sys/fs_disk.go index 11a0ce729..338d47f90 100644 --- a/core/sys/fs_disk.go +++ b/core/sys/fs_disk.go @@ -3,6 +3,7 @@ package sys import ( "io/fs" "os" + "path/filepath" ) // DiskFS implement file system on disk @@ -23,6 +24,12 @@ func (dfs *DiskFS) Open(name string) (File, error) { } func (dfs *DiskFS) OpenFile(name string, flag int, perm os.FileMode) (File, error) { + dir := filepath.Dir(name) + if _, err := os.Stat(dir); os.IsNotExist(err) { + if err := os.MkdirAll(dir, 0744); err != nil { + return nil, err + } + } return os.OpenFile(name, flag, perm) } @@ -64,3 +71,20 @@ func (dfs *DiskFS) SaveProgress(progressID string, data []byte, perm fs.FileMode func (dfs *DiskFS) RemoveProgress(progressID string) error { return dfs.Remove(progressID) } + +func (dfs *DiskFS) CreateDirectory(_ string) error { + return nil +} + +func (dfs *DiskFS) GetFileHandler(_, name string) (File, error) { + dir := filepath.Dir(name) + if _, err := os.Stat(dir); os.IsNotExist(err) { + if err := os.MkdirAll(dir, 0744); err != nil { + return nil, err + } + } + return os.OpenFile(name, os.O_CREATE|os.O_WRONLY, 0644) +} + +func (dfs *DiskFS) RemoveAllDirectories() { +} diff --git a/core/sys/fs_mem.go b/core/sys/fs_mem.go index b054b6601..463584f8f 100644 --- a/core/sys/fs_mem.go +++ b/core/sys/fs_mem.go @@ -4,22 +4,28 @@ package sys import ( + "errors" "io/fs" "os" "path/filepath" + "strings" "syscall/js" "time" + + "github.com/0chain/gosdk/wasmsdk/jsbridge" ) // MemFS implement file system on memory type MemFS struct { files map[string]*MemFile + dirs map[string]js.Value } // NewMemFS create MemFS instance func NewMemFS() FS { return &MemFS{ files: make(map[string]*MemFile), + dirs: make(map[string]js.Value), } } @@ -120,3 +126,67 @@ func (mfs *MemFS) RemoveProgress(progressID string) error { js.Global().Get("localStorage").Call("removeItem", key) return nil } + +func (mfs *MemFS) CreateDirectory(dirID string) error { + if !js.Global().Get("showDirectoryPicker").Truthy() || !js.Global().Get("WritableStream").Truthy() { + return errors.New("dir_picker: not supported") + } + showDirectoryPicker := js.Global().Get("showDirectoryPicker") + dirHandle, err := jsbridge.Await(showDirectoryPicker.Invoke()) + if len(err) > 0 && !err[0].IsNull() { + return errors.New("dir_picker: " + err[0].String()) + } + mfs.dirs[dirID] = dirHandle[0] + return nil +} + +func (mfs *MemFS) GetFileHandler(dirID, path string) (File, error) { + dirHandler, ok := mfs.dirs[dirID] + if !ok { + return nil, errors.New("dir_picker: directory not found") + } + currHandler, err := mfs.mkdir(dirHandler, filepath.Dir(path)) + if err != nil { + return nil, err + } + return jsbridge.NewFileWriterFromHandle(currHandler, filepath.Base(path)) +} + +func (mfs *MemFS) RemoveAllDirectories() { + for k := range mfs.dirs { + delete(mfs.dirs, k) + } +} + +func (mfs *MemFS) mkdir(dirHandler js.Value, dirPath string) (js.Value, error) { + if dirPath == "/" { + return dirHandler, nil + } + currHandler, ok := mfs.dirs[dirPath] + if !ok { + currHandler = dirHandler + paths := strings.Split(dirPath, "/") + paths = paths[1:] + currPath := "/" + for _, path := range paths { + currPath = filepath.Join(currPath, path) + handler, ok := mfs.dirs[currPath] + if ok { + currHandler = handler + continue + } + options := js.Global().Get("Object").New() + options.Set("create", true) + currHandlers, err := jsbridge.Await(currHandler.Call("getDirectoryHandle", path, options)) + if len(err) > 0 && !err[0].IsNull() { + return js.Value{}, errors.New("dir_picker: " + err[0].String()) + } + currHandler = currHandlers[0] + mfs.dirs[currPath] = currHandler + } + if !currHandler.Truthy() { + return js.Value{}, errors.New("dir_picker: failed to create directory") + } + } + return currHandler, nil +} diff --git a/core/util/validation_tree.go b/core/util/validation_tree.go index 08e70356f..db97880a9 100644 --- a/core/util/validation_tree.go +++ b/core/util/validation_tree.go @@ -167,6 +167,12 @@ func (v *ValidationTree) Finalize() error { v.isFinalized = true if v.writeCount > 0 { + if v.leafIndex == len(v.leaves) { + // increase leaves size + leaves := make([][]byte, len(v.leaves)+1) + copy(leaves, v.leaves) + v.leaves = leaves + } v.leaves[v.leafIndex] = v.h.Sum(nil) } else { v.leafIndex-- diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index cf7a6b097..4de6909f8 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -30,6 +30,11 @@ import ( const FileOperationInsert = "insert" +var ( + downloadDirContextMap = make(map[string]context.CancelCauseFunc) + downloadDirLock = sync.Mutex{} +) + // listObjects list allocation objects from its blobbers // - allocationID is the allocation id // - remotePath is the remote path of the file @@ -1145,6 +1150,56 @@ func createWorkers(allocationID string) error { return addWebWorkers(alloc) } +// downloadDirectory download directory to local file system using fs api, will only work in browsers where fs api is available +// - allocationID : allocation ID of the file +// - remotePath : remote path of the directory +// - authticket : auth ticket of the file, if the file is shared +// - callbackFuncName : callback function name to get the progress of the download +func downloadDirectory(allocationID, remotePath, authticket, callbackFuncName string) error { + alloc, err := getAllocation(allocationID) + if err != nil { + return err + } + wg := &sync.WaitGroup{} + wg.Add(1) + statusBar := &StatusBar{wg: wg} + if callbackFuncName != "" { + callback := js.Global().Get(callbackFuncName) + statusBar.callback = func(totalBytes, completedBytes int, filename, objURL, err string) { + callback.Invoke(totalBytes, completedBytes, filename, objURL, err) + } + } + ctx, cancel := context.WithCancelCause(context.Background()) + defer cancel(nil) + errChan := make(chan error, 1) + go func() { + errChan <- alloc.DownloadDirectory(ctx, remotePath, "", authticket, statusBar) + }() + downloadDirLock.Lock() + downloadDirContextMap[remotePath] = cancel + downloadDirLock.Unlock() + select { + case err = <-errChan: + if err != nil { + PrintError("Error in download directory: ", err) + } + return err + case <-ctx.Done(): + return context.Cause(ctx) + } +} + +// cancelDownloadDirectory cancel the download directory operation +// - remotePath : remote path of the directory +func cancelDownloadDirectory(remotePath string) { + downloadDirLock.Lock() + cancel, ok := downloadDirContextMap[remotePath] + if ok { + cancel(errors.New("download directory canceled by user")) + } + downloadDirLock.Unlock() +} + func startListener() error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/wasmsdk/jsbridge/file_writer.go b/wasmsdk/jsbridge/file_writer.go index df26d1ad9..21b0e17b2 100644 --- a/wasmsdk/jsbridge/file_writer.go +++ b/wasmsdk/jsbridge/file_writer.go @@ -20,7 +20,7 @@ type FileWriter struct { writeError bool } -const writeBlocks = 60 +const writeBlocks = 10 // len(p) will always be <= 64KB func (w *FileWriter) Write(p []byte) (int, error) { @@ -130,3 +130,21 @@ func NewFileWriter(filename string) (*FileWriter, error) { fileHandle: fileHandle[0], }, nil } + +func NewFileWriterFromHandle(dirHandler js.Value, name string) (*FileWriter, error) { + options := js.Global().Get("Object").New() + options.Set("create", true) + fileHandler, err := Await(dirHandler.Call("getFileHandle", name, options)) + if len(err) > 0 && !err[0].IsNull() { + return nil, errors.New("dir_picker: " + err[0].String()) + } + + writableStream, err := Await(fileHandler[0].Call("createWritable")) + if len(err) > 0 && !err[0].IsNull() { + return nil, errors.New("file_writer: " + err[0].String()) + } + return &FileWriter{ + writableStream: writableStream[0], + fileHandle: fileHandler[0], + }, nil +} diff --git a/wasmsdk/proxy.go b/wasmsdk/proxy.go index fac087a50..d38dfc789 100644 --- a/wasmsdk/proxy.go +++ b/wasmsdk/proxy.go @@ -202,6 +202,8 @@ func main() { "terminateWorkers": terminateWorkers, "createWorkers": createWorkers, "getFileMetaByName": getFileMetaByName, + "downloadDirectory": downloadDirectory, + "cancelDownloadDirectory": cancelDownloadDirectory, // player "play": play, diff --git a/zboxcore/fileref/fileref.go b/zboxcore/fileref/fileref.go index ae8581468..c4a60fdfa 100644 --- a/zboxcore/fileref/fileref.go +++ b/zboxcore/fileref/fileref.go @@ -17,7 +17,7 @@ const CHUNK_SIZE = 64 * 1024 const ( // FILE represents a file for fileref - FILE = "f" + FILE = "f" // DIRECTORY represents a directory for fileref DIRECTORY = "d" @@ -103,8 +103,8 @@ type Ref struct { } // GetReferenceLookup returns the lookup hash for a given allocationID and path -// - allocationID: allocation ID -// - path: path of the file +// - allocationID: allocation ID +// - path: path of the file func GetReferenceLookup(allocationID string, path string) string { return encryption.Hash(allocationID + ":" + path) } diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index cfc4b736b..e42e76429 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -1780,6 +1780,54 @@ func (a *Allocation) GetRefs(path, offsetPath, updatedDate, offsetDate, fileType return a.getRefs(path, "", "", offsetPath, updatedDate, offsetDate, fileType, refType, level, pageLimit, opts...) } +func (a *Allocation) ListObjects(ctx context.Context, path, offsetPath, updatedDate, offsetDate, fileType, refType string, level, pageLimit int) <-chan ORef { + oRefChan := make(chan ORef, 1) + sendObjectRef := func(ref ORef) { + select { + case oRefChan <- ref: + case <-ctx.Done(): + } + } + go func(oRefChan chan<- ORef) { + defer func() { + if contextCanceled(ctx) { + oRefChan <- ORef{ + Err: ctx.Err(), + } + } + close(oRefChan) + }() + continuationPath := offsetPath + for { + oRefs, err := a.GetRefs(path, continuationPath, updatedDate, offsetDate, fileType, refType, level, pageLimit) + if err != nil { + sendObjectRef(ORef{ + Err: err, + }) + return + } + for _, ref := range oRefs.Refs { + select { + // Send object content. + case oRefChan <- ref: + // If receives done from the caller, return here. + case <-ctx.Done(): + return + } + } + if len(oRefs.Refs) < pageLimit { + return + } + if oRefs.OffsetPath == "" || oRefs.OffsetPath == continuationPath { + return + } + continuationPath = oRefs.OffsetPath + } + + }(oRefChan) + return oRefChan +} + func (a *Allocation) GetRefsFromLookupHash(pathHash, offsetPath, updatedDate, offsetDate, fileType, refType string, level, pageLimit int) (*ObjectTreeResult, error) { if pathHash == "" { return nil, errors.New("invalid_lookup_hash", "lookup hash cannot be empty") @@ -1867,6 +1915,7 @@ func (a *Allocation) GetFileMeta(path string) (*ConsolidatedFileMeta, error) { // GetFileMetaByName retrieve consolidated file metadata given its name (its full path starting from root "/"). // - fileName: full file path starting from the allocation root. +// - fileName: full file path starting from the allocation root. func (a *Allocation) GetFileMetaByName(fileName string) ([]*ConsolidatedFileMetaByName, error) { if !a.isInitialized() { return nil, notInitialized @@ -3123,3 +3172,135 @@ repair: return alloc, hash, isRepairRequired, nil } + +func (a *Allocation) DownloadDirectory(ctx context.Context, remotePath, localPath, authTicket string, sb StatusCallback) error { + if len(a.Blobbers) == 0 { + return noBLOBBERS + } + localPath = filepath.Clean(localPath) + dirID := zboxutil.NewConnectionId() + err := sys.Files.CreateDirectory(dirID) + if err != nil { + if sb != nil { + sb.Error(a.ID, remotePath, OpDownload, err) + } + return err + } + defer sys.Files.RemoveAllDirectories() + + oRefChan := a.ListObjects(ctx, remotePath, "", "", "", fileref.FILE, fileref.REGULAR, 0, getRefPageLimit) + refSlice := make([]ORef, BatchSize) + refIndex := 0 + wg := &sync.WaitGroup{} + dirPath := path.Dir(remotePath) + var totalSize int + for oRef := range oRefChan { + if contextCanceled(ctx) { + if sb != nil { + sb.Error(a.ID, remotePath, OpDownload, ctx.Err()) + } + return ctx.Err() + } + if oRef.Err != nil { + if sb != nil { + sb.Error(a.ID, remotePath, OpDownload, oRef.Err) + } + return oRef.Err + } + refSlice[refIndex] = oRef + refIndex++ + if refIndex == BatchSize { + wg.Add(refIndex) + downloadStatusBar := &StatusBar{ + wg: wg, + sb: sb, + } + for ind, ref := range refSlice { + fPath := ref.Path + if dirPath != "/" { + fPath = strings.TrimPrefix(ref.Path, dirPath) + } + if localPath != "" { + fPath = filepath.Join(localPath, fPath) + } + fh, err := sys.Files.GetFileHandler(dirID, fPath) + if err != nil { + if sb != nil { + sb.Error(a.ID, remotePath, OpDownload, err) + } + return err + } + if authTicket == "" { + _ = a.DownloadFileToFileHandler(fh, ref.Path, false, downloadStatusBar, ind == BatchSize-1, WithFileCallback(func() { + fh.Close() //nolint: errcheck + })) //nolint: errcheck + } else { + _ = a.DownloadFileToFileHandlerFromAuthTicket(fh, authTicket, ref.LookupHash, ref.Path, false, downloadStatusBar, ind == BatchSize-1, WithFileCallback(func() { + fh.Close() //nolint: errcheck + })) //nolint: errcheck + } + totalSize += int(ref.ActualFileSize) + } + wg.Wait() + if downloadStatusBar.err != nil { + return downloadStatusBar.err + } + refIndex = 0 + } + } + if refIndex > 0 { + wg.Add(refIndex) + downloadStatusBar := &StatusBar{ + wg: wg, + sb: sb, + } + for ind, ref := range refSlice[:refIndex] { + fPath := ref.Path + if dirPath != "/" { + fPath = strings.TrimPrefix(ref.Path, dirPath) + } + if localPath != "" { + fPath = filepath.Join(localPath, fPath) + } + fh, err := sys.Files.GetFileHandler(dirID, fPath) + if err != nil { + if sb != nil { + sb.Error(a.ID, remotePath, OpDownload, err) + } + return err + } + if authTicket == "" { + _ = a.DownloadFileToFileHandler(fh, ref.Path, false, downloadStatusBar, ind == refIndex-1, WithFileCallback(func() { + fh.Close() //nolint: errcheck + })) //nolint: errcheck + } else { + _ = a.DownloadFileToFileHandlerFromAuthTicket(fh, authTicket, ref.LookupHash, ref.Path, false, downloadStatusBar, ind == refIndex-1, WithFileCallback(func() { + fh.Close() //nolint: errcheck + })) //nolint: errcheck + } + totalSize += int(ref.ActualFileSize) + } + wg.Wait() + if downloadStatusBar.err != nil { + if sb != nil { + sb.Error(a.ID, remotePath, OpDownload, downloadStatusBar.err) + } + return downloadStatusBar.err + } + } + refSlice = nil + if sb != nil { + sb.Completed(a.ID, remotePath, filepath.Base(remotePath), "", totalSize, OpDownload) + } + return nil +} + +// contextCanceled returns whether a context is canceled. +func contextCanceled(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index c1d1496d8..a899b2d9d 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -33,7 +33,7 @@ import ( ) const ( - DefaultUploadTimeOut = 120 * time.Second + DefaultUploadTimeOut = 180 * time.Second ) var ( @@ -302,11 +302,6 @@ func CreateChunkedUpload( } func calculateWorkersAndRequests(dataShards, totalShards, chunknumber int) (uploadWorkers int, uploadRequests int) { - if IsWasm { - uploadWorkers = 1 - uploadRequests = 2 - return - } if totalShards < 4 { uploadWorkers = 4 } else { @@ -320,7 +315,7 @@ func calculateWorkersAndRequests(dataShards, totalShards, chunknumber int) (uplo } } - if chunknumber*dataShards < 640 { + if chunknumber*dataShards < 640 && !IsWasm { uploadRequests = 4 } else { uploadRequests = 2 diff --git a/zboxcore/sdk/chunked_upload_process_js.go b/zboxcore/sdk/chunked_upload_process_js.go index 5bd99a95d..2f9bf35a1 100644 --- a/zboxcore/sdk/chunked_upload_process_js.go +++ b/zboxcore/sdk/chunked_upload_process_js.go @@ -615,6 +615,12 @@ func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobb }() if shouldContinue { + if i == 2 { + if err != nil { + logger.Logger.Error("Retry limit exceeded for upload: ", err) + } + return errors.Throw(constants.ErrBadRequest, "Retry limit exceeded for upload") + } continue } diff --git a/zboxcore/sdk/filerefsworker.go b/zboxcore/sdk/filerefsworker.go index 10c9edad0..63bc5221b 100644 --- a/zboxcore/sdk/filerefsworker.go +++ b/zboxcore/sdk/filerefsworker.go @@ -143,7 +143,6 @@ func (o *ObjectTreeRequest) GetRefs() (*ObjectTreeResult, error) { case oTreeResponse := <-respChan: oTreeResponseErrors[oTreeResponse.idx] = oTreeResponse.err if oTreeResponse.err != nil { - l.Logger.Error(oTreeResponse.err) if code, _ := zboxutil.GetErrorMessageCode(oTreeResponse.err.Error()); code != INVALID_PATH { l.Logger.Error("Error while getting file refs from blobber:", oTreeResponse.err) } @@ -304,6 +303,7 @@ type ORef struct { ID int64 `json:"id"` CreatedAt common.Timestamp `json:"created_at"` UpdatedAt common.Timestamp `json:"updated_at"` + Err error `json:"-"` } type SimilarField struct { diff --git a/zboxcore/sdk/repairCallback.go b/zboxcore/sdk/repairCallback.go index 3a62d3308..1e122e848 100644 --- a/zboxcore/sdk/repairCallback.go +++ b/zboxcore/sdk/repairCallback.go @@ -12,21 +12,34 @@ var ( ) func (s *StatusBar) Started(allocationId, filePath string, op int, totalBytes int) { + if s.sb != nil { + s.sb.Started(allocationId, filePath, op, totalBytes) + } } func (s *StatusBar) InProgress(allocationId, filePath string, op int, completedBytes int, data []byte) { - + if s.sb != nil { + s.sb.InProgress(allocationId, filePath, op, completedBytes, data) + } } func (s *StatusBar) Completed(allocationId, filePath string, filename string, mimetype string, size int, op int) { s.success = true - l.Logger.Info("Repair for file completed. File = ", filePath) + if s.isRepair { + l.Logger.Info("Repair for file completed. File = ", filePath) + } else { + l.Logger.Info("Operation completed. File = ", filePath) + s.wg.Done() + } } func (s *StatusBar) Error(allocationID string, filePath string, op int, err error) { s.success = false s.err = err l.Logger.Error("Error in status callback. Error = ", err.Error()) + if !s.isRepair { + s.wg.Done() + } } func (s *StatusBar) RepairCompleted(filesRepaired int) { @@ -40,10 +53,12 @@ func (s *StatusBar) RepairCompleted(filesRepaired int) { } type StatusBar struct { - wg *sync.WaitGroup - allocID string - success bool - err error + wg *sync.WaitGroup + allocID string + success bool + err error + isRepair bool + sb StatusCallback } func NewRepairBar(allocID string) *StatusBar { @@ -58,8 +73,9 @@ func NewRepairBar(allocID string) *StatusBar { wg := &sync.WaitGroup{} wg.Add(1) return &StatusBar{ - wg: wg, - allocID: allocID, + wg: wg, + allocID: allocID, + isRepair: true, } } diff --git a/zboxcore/zboxutil/http.go b/zboxcore/zboxutil/http.go index 63a8d748a..7d29f6223 100644 --- a/zboxcore/zboxutil/http.go +++ b/zboxcore/zboxutil/http.go @@ -177,8 +177,8 @@ func init() { Concurrency: 4096, DNSCacheDuration: time.Hour, }).Dial, - ReadTimeout: 120 * time.Second, - WriteTimeout: 120 * time.Second, + ReadTimeout: 180 * time.Second, + WriteTimeout: 180 * time.Second, MaxConnDuration: 45 * time.Second, MaxResponseBodySize: 1024 * 1024 * 64, //64MB MaxConnsPerHost: 1024,