Skip to content

Commit

Permalink
Merge pull request #1706 from 0chain/feat/streaming-blobbers
Browse files Browse the repository at this point in the history
Break when enoughs shards for reconstruction
  • Loading branch information
dabasov authored Dec 14, 2024
2 parents 8d49768 + 0f3599c commit 14d6ccc
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 9 deletions.
6 changes: 3 additions & 3 deletions core/conf/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestLoadConfig(t *testing.T) {
return mockDefaultReader()
},
run: func(r *require.Assertions, cfg Config) {
r.Equal(10, cfg.MinSubmit)
r.Equal(20, cfg.MinSubmit)
},
},
{
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestLoadConfig(t *testing.T) {
return mockDefaultReader()
},
run: func(r *require.Assertions, cfg Config) {
r.Equal(5, cfg.QuerySleepTime)
r.Equal(1, cfg.QuerySleepTime)
},
}, {
name: "Test_Config_Max_Txn_Query_Less_Than_1",
Expand All @@ -157,7 +157,7 @@ func TestLoadConfig(t *testing.T) {
return mockDefaultReader()
},
run: func(r *require.Assertions, cfg Config) {
r.Equal(5, cfg.MaxTxnQuery)
r.Equal(10, cfg.MaxTxnQuery)
},
}, {
name: "Test_Config_Confirmation_Chain_Length_Less_Than_1",
Expand Down
8 changes: 8 additions & 0 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,14 @@ func cancelDownloadDirectory(remotePath string) {
downloadDirLock.Unlock()
}

func cancelDownloadBlocks(allocationID, remotePath string, start, end int64) error {
alloc, err := getAllocation(allocationID)
if err != nil {
return err
}
return alloc.CancelDownloadBlocks(remotePath, start, end)
}

func startListener(respChan chan string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions wasmsdk/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func main() {
"getFileMetaByName": getFileMetaByName,
"downloadDirectory": downloadDirectory,
"cancelDownloadDirectory": cancelDownloadDirectory,
"cancelDownloadBlocks": cancelDownloadBlocks,

// player
"play": play,
Expand Down
20 changes: 17 additions & 3 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/0chain/gosdk/core/client"
"github.com/0chain/gosdk/core/encryption"
"github.com/0chain/gosdk/core/transaction"

"github.com/0chain/common/core/currency"
Expand Down Expand Up @@ -1451,7 +1452,8 @@ func (a *Allocation) addAndGenerateDownloadRequest(
opt(downloadReq)
}
downloadReq.workdir = filepath.Join(downloadReq.workdir, ".zcn")
a.downloadProgressMap[remotePath] = downloadReq
hash := encryption.Hash(fmt.Sprintf("%s:%d:%d", remotePath, startBlock, endBlock))
a.downloadProgressMap[hash] = downloadReq
a.downloadRequests = append(a.downloadRequests, downloadReq)
if isFinal {
downloadOps := a.downloadRequests
Expand Down Expand Up @@ -2465,7 +2467,18 @@ func (a *Allocation) UploadAuthTicketToBlobber(authTicket string, clientEncPubKe
// It cancels the download operation and removes the download request from the download progress map.
// - remotepath: The remote path of the file to cancel the download operation.
func (a *Allocation) CancelDownload(remotepath string) error {
if downloadReq, ok := a.downloadProgressMap[remotepath]; ok {
hash := encryption.Hash(fmt.Sprintf("%s:%d:%d", remotepath, 1, 0))
if downloadReq, ok := a.downloadProgressMap[hash]; ok {
downloadReq.isDownloadCanceled = true
downloadReq.ctxCncl()
return nil
}
return errors.New("remote_path_not_found", "Invalid path. No download in progress for the path "+remotepath)
}

func (a *Allocation) CancelDownloadBlocks(remotepath string, start, end int64) error {
hash := encryption.Hash(fmt.Sprintf("%s:%d:%d", remotepath, start, end))
if downloadReq, ok := a.downloadProgressMap[hash]; ok {
downloadReq.isDownloadCanceled = true
downloadReq.ctxCncl()
return nil
Expand Down Expand Up @@ -2865,7 +2878,8 @@ func (a *Allocation) downloadFromAuthTicket(fileHandler sys.File, authTicket str
opt(downloadReq)
}
a.mutex.Lock()
a.downloadProgressMap[remoteLookupHash] = downloadReq
hash := encryption.Hash(fmt.Sprintf("%s:%d:%d", remoteLookupHash, startBlock, endBlock))
a.downloadProgressMap[hash] = downloadReq
if len(a.downloadRequests) > 0 {
downloadReq.connectionID = a.downloadRequests[0].connectionID
}
Expand Down
5 changes: 4 additions & 1 deletion zboxcore/sdk/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/fs"
"log"
Expand All @@ -18,6 +19,7 @@ import (

"github.com/0chain/gosdk/zboxcore/mocks"

encrypt "github.com/0chain/gosdk/core/encryption"
"github.com/0chain/gosdk/dev/blobber"
"github.com/0chain/gosdk/dev/blobber/model"
"github.com/0chain/gosdk/zboxcore/encryption"
Expand Down Expand Up @@ -1452,7 +1454,8 @@ func TestAllocation_CancelDownload(t *testing.T) {
setup: func(t *testing.T, a *Allocation) (teardown func(t *testing.T)) {
req := &DownloadRequest{}
req.ctx, req.ctxCncl = context.WithCancel(context.TODO())
a.downloadProgressMap[remotePath] = req
hash := encrypt.Hash(fmt.Sprintf("%s:%d:%d", remotePath, 1, 0))
a.downloadProgressMap[hash] = req
return nil
},
},
Expand Down
3 changes: 3 additions & 0 deletions zboxcore/sdk/chunked_upload_process_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,9 @@ func parseEventData(data safejs.Value) (*FileMeta, *ChunkedUploadFormInfo, [][]b
buf := make([]byte, fileShardLen)
safejs.CopyBytesToGo(buf, fileShardUint8)
fileShards := splitData(buf, int(chunkSize))
fileShardUint8.Set("buffer", js.Null())
formInfoUint8.Set("buffer", js.Null())
fileMetaUint8.Set("buffer", js.Null())

thumbnailChunkDataUint8, err := data.Get("thumbnailChunkData")
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (req *DownloadRequest) downloadBlock(
fmt.Sprintf("Required downloads %d, remaining active blobber %d",
req.consensusThresh, activeBlobbers))
}
actualRequiredDownloads := requiredDownloads
if timeRequest {
requiredDownloads = activeBlobbers
}
Expand Down Expand Up @@ -290,10 +291,16 @@ func (req *DownloadRequest) downloadBlock(
c++
}

var failed int32
var (
failed int32
success int32
)
downloadErrors := make([]string, requiredDownloads)
wg := &sync.WaitGroup{}
for i := 0; i < requiredDownloads; i++ {
if atomic.LoadInt32(&success) >= int32(actualRequiredDownloads) {
break
}
result := <-rspCh
wg.Add(1)
go func(i int) {
Expand All @@ -314,6 +321,7 @@ func (req *DownloadRequest) downloadBlock(
req.bufferMap[result.idx].ReleaseChunk(int(req.startBlock))
}
} else if timeRequest {
atomic.AddInt32(&success, 1)
req.downloadQueue[result.maskIdx].timeTaken = result.timeTaken
}
wg.Done()
Expand Down Expand Up @@ -729,7 +737,7 @@ func (req *DownloadRequest) processDownload() {
if startBlock+int64(j)*numBlocks+numBlocks > endBlock {
blocksToDownload = endBlock - (startBlock + int64(j)*numBlocks)
}
data, err := req.getBlocksData(startBlock+int64(j)*numBlocks, blocksToDownload, j == 0 && n > 1)
data, err := req.getBlocksData(startBlock+int64(j)*numBlocks, blocksToDownload, j == 0)
if req.isDownloadCanceled {
return errors.New("download_abort", "Download aborted by user")
}
Expand Down

0 comments on commit 14d6ccc

Please sign in to comment.