Skip to content

Commit

Permalink
Merge pull request #1731 from 0chain/fix/rollback-v2
Browse files Browse the repository at this point in the history
Fix rollback for v2
  • Loading branch information
dabasov authored Jan 11, 2025
2 parents ae439a7 + f72c706 commit 3d07369
Showing 1 changed file with 81 additions and 0 deletions.
81 changes: 81 additions & 0 deletions zboxcore/sdk/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) {
return Broken, blobberRes, common.NewError("check_alloc_status_failed", markerError.Error())
}

if a.StorageVersion == StorageV2 {
return a.checkStatusV2(markerChan, blobberRes)
}

versionMap := make(map[string][]*RollbackBlobber)

var (
Expand Down Expand Up @@ -472,3 +476,80 @@ func (a *Allocation) RollbackWithMask(mask zboxutil.Uint128) {

wg.Wait()
}

func (a *Allocation) checkStatusV2(markerChan chan *RollbackBlobber, blobStatus []BlobberStatus) (AllocStatus, []BlobberStatus, error) {
var (
latestVersionMap = make(map[string][]*RollbackBlobber)
allVersionMap = make(map[string][]*RollbackBlobber)
consensusVersion string
allVersionConensus string
)

for rb := range markerChan {
if rb == nil || rb.lpm.LatestWM == nil {
continue
}
version := rb.lpm.LatestWM.AllocationRoot
latestVersionMap[version] = append(latestVersionMap[version], rb)
if len(latestVersionMap[version]) > a.DataShards {
consensusVersion = version
}
allVersionMap[version] = append(allVersionMap[version], rb)
if rb.lpm.PrevWM != nil && rb.lpm.PrevWM.AllocationRoot != version {
allVersionMap[rb.lpm.PrevWM.AllocationRoot] = append(allVersionMap[rb.lpm.PrevWM.AllocationRoot], rb)
if len(allVersionMap[rb.lpm.PrevWM.AllocationRoot]) >= a.DataShards {
allVersionConensus = rb.lpm.PrevWM.AllocationRoot
}
}
if len(allVersionMap[version]) >= a.DataShards {
allVersionConensus = version
}
}

if consensusVersion != "" {
a.allocationRoot = consensusVersion
return Commit, blobStatus, nil
}

if allVersionConensus == "" {
return Broken, blobStatus, nil
}

if len(latestVersionMap[allVersionConensus]) >= a.DataShards {
a.allocationRoot = allVersionConensus
return Repair, blobStatus, nil
}
l.Logger.Info("Rolling back to previous version")
fullConsensus := len(allVersionMap[allVersionConensus]) - len(latestVersionMap[allVersionConensus])
consensusThresh := a.DataShards - len(latestVersionMap[allVersionConensus])
var successCnt int32
wg := &sync.WaitGroup{}

for _, rb := range allVersionMap[allVersionConensus] {
if rb.lpm.LatestWM.AllocationRoot == allVersionConensus {
continue
}
wg.Add(1)
go func(rb *RollbackBlobber) {
defer wg.Done()
err := rb.processRollback(context.TODO(), a.Tx)
if err != nil {
rb.commitResult = ErrorCommitResult(err.Error())
l.Logger.Error("error during rollback", zap.Error(err))
} else {
atomic.AddInt32(&successCnt, 1)
rb.commitResult = SuccessCommitResult()
}
}(rb)
}
wg.Wait()
if successCnt < int32(consensusThresh) {
return Broken, blobStatus, common.NewError("rollback_failed", "Rollback failed")
}
a.allocationRoot = allVersionConensus
if successCnt == int32(fullConsensus) {
return Repair, blobStatus, nil
}

return Commit, blobStatus, nil
}

0 comments on commit 3d07369

Please sign in to comment.