Skip to content

Commit

Permalink
Updated error flow in upload worker
Browse files Browse the repository at this point in the history
  • Loading branch information
KrishnaDeqode committed Jul 21, 2020
1 parent 1960869 commit d6c6ef5
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions zboxcore/sdk/uploadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os"
"sync"

"github.com/0chain/gosdk/core/common"
"github.com/0chain/gosdk/core/util"
"github.com/0chain/gosdk/zboxcore/allocationchange"
"github.com/0chain/gosdk/zboxcore/blockchain"
Expand Down Expand Up @@ -92,6 +93,7 @@ type UploadRequest struct {
encscheme encryption.EncryptionScheme
isUploadCanceled bool
completedCallback func(filepath string)
err error
Consensus
}

Expand Down Expand Up @@ -252,29 +254,34 @@ func (req *UploadRequest) prepareUpload(a *Allocation, blobber *blockchain.Stora
_ = zboxutil.HttpDo(a.ctx, a.ctxCancelF, httpreq, func(resp *http.Response, err error) error {
if err != nil {
Logger.Error("Upload : ", err)
req.err = err
return err
}
defer resp.Body.Close()

respbody, err := ioutil.ReadAll(resp.Body)
if err != nil {
Logger.Error("Error: Resp ", err)
req.err = err
return err
}
if resp.StatusCode != http.StatusOK {
Logger.Error(blobber.Baseurl, " Upload error response: ", resp.StatusCode, string(respbody))
req.err = fmt.Errorf(string(respbody))
return err
}
var r uploadResult
err = json.Unmarshal(respbody, &r)
if err != nil {
Logger.Error(blobber.Baseurl, " Upload response parse error: ", err)
req.err = err
return err
}
if r.Filename != formData.Filename || r.ShardSize != shardSize ||
r.Hash != formData.Hash || r.MerkleRoot != formData.MerkleRoot {
err = fmt.Errorf(blobber.Baseurl, "Unexpected upload response data", string(respbody))
Logger.Error(err)
req.err = err
return err
}
req.consensus++
Expand Down Expand Up @@ -406,19 +413,19 @@ func (req *UploadRequest) processUpload(ctx context.Context, a *Allocation) {
var inFile *os.File
inFile, err := os.Open(req.filepath)
if err != nil && req.statusCallback != nil {
req.statusCallback.Error(a.ID, req.filepath, OpUpload, fmt.Errorf("Open file failed: %s", err.Error()))
req.statusCallback.Error(a.ID, req.filepath, OpUpload, common.NewError("open_file_failed", err.Error()))
return
}
defer inFile.Close()
mimetype, err := zboxutil.GetFileContentType(inFile)
if err != nil && req.statusCallback != nil {
req.statusCallback.Error(a.ID, req.filepath, OpUpload, fmt.Errorf("Error detecting the mimetype: %s", err.Error()))
req.statusCallback.Error(a.ID, req.filepath, OpUpload, common.NewError("mime_type_error", err.Error()))
return
}
req.filemeta.MimeType = mimetype
err = req.setupUpload(a)
if err != nil && req.statusCallback != nil {
req.statusCallback.Error(a.ID, req.filepath, OpUpload, fmt.Errorf("setting up of upload failed : %s", err.Error()))
req.statusCallback.Error(a.ID, req.filepath, OpUpload, common.NewError("setup_upload_failed", err.Error()))
return
}
size := req.filemeta.Size
Expand Down Expand Up @@ -452,7 +459,7 @@ func (req *UploadRequest) processUpload(ctx context.Context, a *Allocation) {
b1 := make([]byte, remaining*int64(a.DataShards))
_, err = dataReader.Read(b1)
if err != nil && req.statusCallback != nil {
req.statusCallback.Error(a.ID, req.filepath, OpUpload, fmt.Errorf("Read failed: %s", err.Error()))
req.statusCallback.Error(a.ID, req.filepath, OpUpload, common.NewError("read_failed", err.Error()))
return
}
if req.isUploadCanceled {
Expand All @@ -461,20 +468,20 @@ func (req *UploadRequest) processUpload(ctx context.Context, a *Allocation) {
go a.DeleteFile(req.remotefilepath)
}
if req.statusCallback != nil {
req.statusCallback.Error(a.ID, req.filepath, OpUpload, fmt.Errorf("Upload aborted by user"))
req.statusCallback.Error(a.ID, req.filepath, OpUpload, common.NewError("user_aborted", "Upload aborted by user"))
}
return
}
err = req.pushData(b1)
if err != nil {
req.statusCallback.Error(a.ID, req.filepath, OpUpload, fmt.Errorf("Push error: %s", err.Error()))
req.statusCallback.Error(a.ID, req.filepath, OpUpload, common.NewError("push_error", err.Error()))
return
}

}
err = req.completePush()
if err != nil && req.statusCallback != nil {
req.statusCallback.Error(a.ID, req.remotefilepath, OpUpload, fmt.Errorf("Upload failed: %s", err.Error()))
req.statusCallback.Error(a.ID, req.remotefilepath, OpUpload, req.err)
return
}
}()
Expand Down Expand Up @@ -574,7 +581,7 @@ func (req *UploadRequest) processUpload(ctx context.Context, a *Allocation) {
a.deleteFile(req.remotefilepath, req.consensus, req.consensus)
}
if req.statusCallback != nil {
req.statusCallback.Error(a.ID, req.remotefilepath, OpUpload, fmt.Errorf("Upload failed: Commit consensus failed"))
req.statusCallback.Error(a.ID, req.remotefilepath, OpUpload, common.NewError("commit_consensus_failed", "Upload failed as there was no commit consensus"))
return
}
}
Expand Down

0 comments on commit d6c6ef5

Please sign in to comment.