Skip to content

Commit

Permalink
fix consensus err check
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Nov 25, 2024
1 parent faee324 commit 0a4662a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
4 changes: 2 additions & 2 deletions zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,14 +711,14 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error {
if strings.Contains(err.Error(), "duplicate") {
su.consensus.Done()
errC := atomic.AddInt32(&su.addConsensus, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
return
}
logger.Logger.Error("error during sendUploadRequest", err, " connectionID: ", su.progress.ConnectionID)
errC := atomic.AddInt32(&errCount, 1)
if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later
wgErrors <- err
}
}
Expand Down
10 changes: 5 additions & 5 deletions zboxcore/sdk/chunked_upload_process_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
eventChan := allEventChan[pos]
if eventChan.C == nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- thrown.New("upload_failed", "Upload failed. Worker event channel not found")
}
return
Expand All @@ -282,7 +282,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
if !ok {
logger.Logger.Error("chan closed from: ", blobber.blobber.Baseurl)
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
if su.ctx.Err() != nil {
wgErrors <- context.Cause(su.ctx)
} else {
Expand All @@ -294,7 +294,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
msgType, data, err := jsbridge.GetMsgType(event)
if err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- errors.Wrap(err, "could not get msgType")
}
return
Expand All @@ -304,7 +304,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
case "auth":
if err := su.processWebWorkerAuthRequest(data, eventChan); err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
return
Expand All @@ -316,7 +316,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
isFinal, err = su.processWebWorkerUpload(data, blobber, pos)
if err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
} else {
Expand Down

0 comments on commit 0a4662a

Please sign in to comment.