Skip to content

Commit

Permalink
Merge pull request #1388 from 0chain/feat/wm-chain
Browse files Browse the repository at this point in the history
Chaining WM
  • Loading branch information
dabasov authored Apr 10, 2024
2 parents 20b8614 + 66ea139 commit f1a24fd
Show file tree
Hide file tree
Showing 21 changed files with 399 additions and 182 deletions.
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Allocation struct {
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
LastRedeemedSeq int64 `gorm:"column:last_redeemed_sequence;default:0"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
StartTime common.Timestamp `gorm:"column:start_time;not null"`
Expand Down
5 changes: 4 additions & 1 deletion code/go/0chain.net/blobbercore/allocation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *Repository) GetAllocationIds(ctx context.Context) []Res {

}

func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation) error {
func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation, redeemSeq int64) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
Expand All @@ -205,17 +205,20 @@ func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, A
allocationUpdates := make(map[string]interface{})
allocationUpdates["latest_redeemed_write_marker"] = AllocationRoot
allocationUpdates["is_redeem_required"] = false
allocationUpdates["last_redeemed_sequence"] = redeemSeq
err = tx.Model(allocationObj).Updates(allocationUpdates).Error
if err != nil {
return err
}
allocationObj.LatestRedeemedWM = AllocationRoot
allocationObj.IsRedeemRequired = false
allocationObj.LastRedeemedSeq = redeemSeq
txnCache := cache[allocationID]
txnCache.Allocation = allocationObj
updateAlloc := func(a *Allocation) {
a.LatestRedeemedWM = AllocationRoot
a.IsRedeemRequired = false
a.LastRedeemedSeq = redeemSeq
}
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateAlloc)
cache[allocationID] = txnCache
Expand Down
8 changes: 4 additions & 4 deletions code/go/0chain.net/blobbercore/blobberhttp/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type ConnectionResult struct {

// swagger:model CommitResult
type CommitResult struct {
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarker `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarkerEntity `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
//Result []*UploadResult `json:"result"`
}

Expand Down
9 changes: 9 additions & 0 deletions code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func SetupDefaultConfig() {
viper.SetDefault("openconnection_cleaner.frequency", 30)
viper.SetDefault("writemarker_redeem.frequency", 10)
viper.SetDefault("writemarker_redeem.num_workers", 5)
viper.SetDefault("writemarker_redeem.max_chain_length", 32)
viper.SetDefault("writemarker_redeem.max_timestamp_gap", 1800)
viper.SetDefault("writemarker_redeem.marker_redeem_interval", time.Minute*10)
viper.SetDefault("readmarker_redeem.frequency", 10)
viper.SetDefault("readmarker_redeem.num_workers", 5)
viper.SetDefault("challenge_response.frequency", 10)
Expand Down Expand Up @@ -100,6 +103,9 @@ type Config struct {
OpenConnectionWorkerTolerance int64
WMRedeemFreq int64
WMRedeemNumWorkers int
MaxChainLength int
MaxTimestampGap int64
MarkerRedeemInterval time.Duration
RMRedeemFreq int64
RMRedeemNumWorkers int
ChallengeResolveFreq int64
Expand Down Expand Up @@ -218,6 +224,9 @@ func ReadConfig(deploymentMode int) {

Configuration.WMRedeemFreq = viper.GetInt64("writemarker_redeem.frequency")
Configuration.WMRedeemNumWorkers = viper.GetInt("writemarker_redeem.num_workers")
Configuration.MaxChainLength = viper.GetInt("writemarker_redeem.max_chain_length")
Configuration.MaxTimestampGap = viper.GetInt64("writemarker_redeem.max_timestamp_gap")
Configuration.MarkerRedeemInterval = viper.GetDuration("writemarker_redeem.marker_redeem_interval")

Configuration.RMRedeemFreq = viper.GetInt64("readmarker_redeem.frequency")
Configuration.RMRedeemNumWorkers = viper.GetInt("readmarker_redeem.num_workers")
Expand Down
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/convert/response_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func CommitWriteResponseCreator(r interface{}) *blobbergrpc.CommitResponse {

return &blobbergrpc.CommitResponse{
AllocationRoot: httpResp.AllocationRoot,
WriteMarker: WriteMarkerToWriteMarkerGRPC(httpResp.WriteMarker),
ErrorMessage: httpResp.ErrorMessage,
Success: httpResp.Success,
// WriteMarker: WriteMarkerToWriteMarkerGRPC(httpResp.WriteMarker),
ErrorMessage: httpResp.ErrorMessage,
Success: httpResp.Success,
}
}

Expand Down
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/convert/response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func GetObjectTreeResponseHandler(getObjectTreeResponse *blobbergrpc.GetObjectTr
func CommitWriteResponseHandler(resp *blobbergrpc.CommitResponse) *blobberhttp.CommitResult {
return &blobberhttp.CommitResult{
AllocationRoot: resp.AllocationRoot,
WriteMarker: WriteMarkerGRPCToWriteMarker(resp.WriteMarker),
Success: resp.Success,
ErrorMessage: resp.ErrorMessage,
// WriteMarker: WriteMarkerGRPCToWriteMarker(resp.WriteMarker),
Success: resp.Success,
ErrorMessage: resp.ErrorMessage,
}
}

Expand Down
9 changes: 9 additions & 0 deletions code/go/0chain.net/blobbercore/handler/handler_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/build"
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
Expand Down Expand Up @@ -153,6 +155,13 @@ func WithStatusConnectionForWM(handler common.StatusCodeResponderF) common.Statu
return resp, statusCode, common.NewErrorf("commit_error",
"error committing to meta store: %v", err)
}

if blobberRes, ok := resp.(*blobberhttp.CommitResult); ok {
// Save the write marker data
writemarker.SaveMarkerData(allocationID, blobberRes.WriteMarker.WM.Timestamp, blobberRes.WriteMarker.WM.ChainLength)
} else {
Logger.Error("Invalid response type for commit handler")
}
return
}
}
Expand Down
9 changes: 2 additions & 7 deletions code/go/0chain.net/blobbercore/handler/handler_writemarker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@ package handler

import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
. "github.com/0chain/blobber/code/go/0chain.net/core/logging"
"go.uber.org/zap"
)

var WriteMarkerMutext = &writemarker.Mutex{
ML: common.GetNewLocker(),
}

// LockWriteMarker try to lock writemarker for specified allocation id, and return latest RefTree
func LockWriteMarker(ctx *Context) (interface{}, error) {
connectionID, _ := ctx.FormValue("connection_id")

result, err := WriteMarkerMutext.Lock(ctx, ctx.AllocationId, connectionID)
result, err := writemarker.WriteMarkerMutext.Lock(ctx, ctx.AllocationId, connectionID)
Logger.Info("Lock write marker result", zap.Any("result", result), zap.Error(err))
if err != nil {
return nil, err
Expand All @@ -28,7 +23,7 @@ func LockWriteMarker(ctx *Context) (interface{}, error) {
func UnlockWriteMarker(ctx *Context) (interface{}, error) {
connectionID := ctx.Vars["connection"]

err := WriteMarkerMutext.Unlock(ctx, ctx.AllocationId, connectionID)
err := writemarker.WriteMarkerMutext.Unlock(ctx.AllocationId, connectionID)
if err != nil {
return nil, err
}
Expand Down
70 changes: 44 additions & 26 deletions code/go/0chain.net/blobbercore/handler/object_operation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,10 @@ func readPreRedeem(
}

func checkPendingMarkers(ctx context.Context, allocationID string) error {

mut := writemarker.GetLock(allocationID)
if mut == nil {
return nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err := mut.Acquire(ctx, 1)
if err != nil {
return common.NewError("check_pending_markers", "write marker is still not redeemed")
pending := writemarker.CheckProcessingMarker(allocationID)
if pending {
return common.NewError("pending_markers", "previous marker is still pending to be redeemed")
}
mut.Release(1)
return nil
}

Expand Down Expand Up @@ -507,6 +499,7 @@ func (fsh *StorageHandler) CreateConnection(ctx context.Context, r *http.Request
}

func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*blobberhttp.CommitResult, error) {
var prevChainHash string
startTime := time.Now()
if r.Method == "GET" {
return nil, common.NewError("invalid_method", "Invalid method used for the upload URL. Use POST instead")
Expand Down Expand Up @@ -602,6 +595,20 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
return nil, common.NewErrorf("latest_write_marker_read_error",
"Error reading the latest write marker for allocation: %v", err)
}
if latestWriteMarkerEntity.Status == writemarker.Failed {
return nil, common.NewError("latest_write_marker_failed",
"Latest write marker is in failed state")
}

if latestWriteMarkerEntity.WM.ChainSize+connectionObj.Size != writeMarker.ChainSize {
return nil, common.NewErrorf("invalid_chain_size",
"Invalid chain size. expected:%v got %v", latestWriteMarkerEntity.WM.ChainSize+connectionObj.Size, writeMarker.ChainSize)
}

if latestWriteMarkerEntity.Status != writemarker.Committed {
writeMarker.ChainLength = latestWriteMarkerEntity.WM.ChainLength
}
prevChainHash = latestWriteMarkerEntity.WM.ChainHash
}

writemarkerEntity := &writemarker.WriteMarkerEntity{}
Expand All @@ -613,7 +620,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
result.ErrorMessage = "Verification of write marker failed: " + err.Error()
result.Success = false
if latestWriteMarkerEntity != nil {
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
}
Logger.Error("verify_writemarker_failed", zap.Error(err))
return &result, common.NewError("write_marker_verification_failed", result.ErrorMessage)
Expand Down Expand Up @@ -667,18 +674,23 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
if allocationRoot != writeMarker.AllocationRoot {
result.AllocationRoot = allocationObj.AllocationRoot
if latestWriteMarkerEntity != nil {
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
}
result.Success = false
result.ErrorMessage = "Allocation root in the write marker does not match the calculated allocation root." +
" Expected hash: " + allocationRoot
return &result, common.NewError("allocation_root_mismatch", result.ErrorMessage)
}

chainHash := writemarker.CalculateChainHash(prevChainHash, allocationRoot)
if chainHash != writeMarker.ChainHash {
return nil, common.NewError("chain_hash_mismatch", "Chain hash in the write marker does not match the calculated chain hash")
}

if fileMetaRoot != writeMarker.FileMetaRoot {
// result.AllocationRoot = allocationObj.AllocationRoot
if latestWriteMarkerEntity != nil {
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
}
result.Success = false
result.ErrorMessage = "File meta root in the write marker does not match the calculated file meta root." +
Expand All @@ -688,6 +700,10 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b

writemarkerEntity.ConnectionID = connectionObj.ID
writemarkerEntity.ClientPublicKey = clientKey
writemarkerEntity.WM.ChainLength += 1
if writemarkerEntity.WM.ChainLength > config.Configuration.MaxChainLength {
return nil, common.NewError("chain_length_exceeded", "Chain length exceeded")
}

db := datastore.GetStore().GetTransaction(ctx)
writemarkerEntity.Latest = true
Expand Down Expand Up @@ -734,7 +750,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b

db.Model(connectionObj).Updates(allocation.AllocationChangeCollector{Status: allocation.CommittedConnection})
result.AllocationRoot = allocationObj.AllocationRoot
result.WriteMarker = &writeMarker
result.WriteMarker = writemarkerEntity
result.Success = true
result.ErrorMessage = ""
commitOperation := connectionObj.Changes[0].Operation
Expand All @@ -743,10 +759,6 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
//Delete connection object and its changes

db.Delete(connectionObj)
err = writemarkerEntity.SendToChan(ctx)
if err != nil {
return nil, common.NewError("write_marker_error", "Error redeeming the write marker")
}
go allocation.DeleteConnectionObjEntry(connectionID)
go AddWriteMarkerCount(clientID, connectionObj.Size <= 0)

Expand Down Expand Up @@ -1390,6 +1402,10 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
return nil, common.NewError("write_marker_verification_failed", "Verification of the write marker failed: "+err.Error())
}

if writemarkerEntity.WM.ChainLength > config.Configuration.MaxChainLength {
return nil, common.NewError("chain_length_exceeded", "Chain length exceeded")
}

elapsedVerifyWM := time.Since(startTime) - elapsedAllocation - elapsedGetLock

var clientIDForWriteRedeem = writeMarker.ClientID
Expand Down Expand Up @@ -1425,17 +1441,23 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob

if allocationRoot != writeMarker.AllocationRoot {
result.AllocationRoot = allocationObj.AllocationRoot
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
result.Success = false
result.ErrorMessage = "Allocation root in the write marker does not match the calculated allocation root." +
" Expected hash: " + allocationRoot
txn.Rollback()
return &result, common.NewError("allocation_root_mismatch", result.ErrorMessage)
}

chainHash := writemarker.CalculateChainHash(latestWriteMarkerEntity.WM.ChainHash, allocationRoot)
if chainHash != writeMarker.ChainHash {
txn.Rollback()
return nil, common.NewError("chain_hash_mismatch", "Chain hash in the write marker does not match the calculated chain hash")
}

if fileMetaRoot != writeMarker.FileMetaRoot {
if latestWriteMarkerEntity != nil {
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
}
result.Success = false
result.ErrorMessage = "File meta root in the write marker does not match the calculated file meta root." +
Expand Down Expand Up @@ -1490,18 +1512,14 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
if err != nil {
return &result, common.NewError("allocation_commit_error", "Error committing the transaction "+err.Error())
}
err = writemarkerEntity.SendToChan(ctx)
if err != nil {
return nil, common.NewError("write_marker_error", "Error redeeming the write marker")
}
err = allocation.CommitRollback(allocationID)
if err != nil {
Logger.Error("Error committing the rollback for allocation", zap.Error(err))
}

elapsedCommitRollback := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedVerifyWM - elapsedWritePreRedeem
result.AllocationRoot = allocationObj.AllocationRoot
result.WriteMarker = &writeMarker
result.WriteMarker = writemarkerEntity
result.Success = true
result.ErrorMessage = ""
commitOperation := "rollback"
Expand Down
9 changes: 9 additions & 0 deletions code/go/0chain.net/blobbercore/handler/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ func (fsh *StorageHandler) GetLatestWriteMarker(ctx context.Context, r *http.Req

var result blobberhttp.LatestWriteMarkerResult
if latestWM != nil {
if latestWM.Status == writemarker.Committed {
latestWM.WM.ChainLength = 0 // start a new chain
}
result.LatestWM = &latestWM.WM
}
if prevWM != nil {
Expand Down Expand Up @@ -560,6 +563,9 @@ func (fsh *StorageHandler) getReferencePath(ctx context.Context, r *http.Request
var refPathResult blobberhttp.ReferencePathResult
refPathResult.ReferencePath = refPath
if latestWM != nil {
if latestWM.Status == writemarker.Committed {
latestWM.WM.ChainLength = 0 // start a new chain
}
refPathResult.LatestWM = &latestWM.WM
}

Expand Down Expand Up @@ -628,6 +634,9 @@ func (fsh *StorageHandler) GetObjectTree(ctx context.Context, r *http.Request) (
var refPathResult blobberhttp.ReferencePathResult
refPathResult.ReferencePath = refPath
if latestWM != nil {
if latestWM.Status == writemarker.Committed {
latestWM.WM.ChainLength = 0 // start a new chain
}
refPathResult.LatestWM = &latestWM.WM
}
return &refPathResult, nil
Expand Down
Loading

0 comments on commit f1a24fd

Please sign in to comment.