Skip to content

Commit

Permalink
pool: track pending cost per payer (#845)
Browse files Browse the repository at this point in the history
  • Loading branch information
libotony authored Sep 25, 2024
1 parent fd5e569 commit 56bd50e
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 50 deletions.
16 changes: 9 additions & 7 deletions runtime/resolved_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (r *ResolvedTransaction) BuyGas(state *state.State, blockTime uint64) (
baseGasPrice *big.Int,
gasPrice *big.Int,
payer thor.Address,
prepaid *big.Int,
returnGas func(uint64) error,
err error,
) {
Expand All @@ -113,19 +114,20 @@ func (r *ResolvedTransaction) BuyGas(state *state.State, blockTime uint64) (
return returnedEnergy, nil
}

prepaid := new(big.Int).Mul(new(big.Int).SetUint64(r.tx.Gas()), gasPrice)
// prepaid is the max total of gas cost available to spend on this transaction
prepaid = new(big.Int).Mul(new(big.Int).SetUint64(r.tx.Gas()), gasPrice)
if r.Delegator != nil {
var sufficient bool
if sufficient, err = energy.Sub(*r.Delegator, prepaid); err != nil {
return
}
if sufficient {
return baseGasPrice, gasPrice, *r.Delegator, func(rgas uint64) error {
return baseGasPrice, gasPrice, *r.Delegator, prepaid, func(rgas uint64) error {
_, err := doReturnGas(rgas)
return err
}, nil
}
return nil, nil, thor.Address{}, nil, errors.New("insufficient energy")
return nil, nil, thor.Address{}, nil, nil, errors.New("insufficient energy")
}

commonTo := r.CommonTo()
Expand Down Expand Up @@ -164,7 +166,7 @@ func (r *ResolvedTransaction) BuyGas(state *state.State, blockTime uint64) (
return
}
if ok {
return baseGasPrice, gasPrice, sponsor, doReturnGasAndSetCredit, nil
return baseGasPrice, gasPrice, sponsor, prepaid, doReturnGasAndSetCredit, nil
}
}
// deduct from To
Expand All @@ -174,7 +176,7 @@ func (r *ResolvedTransaction) BuyGas(state *state.State, blockTime uint64) (
return
}
if sufficient {
return baseGasPrice, gasPrice, *commonTo, doReturnGasAndSetCredit, nil
return baseGasPrice, gasPrice, *commonTo, prepaid, doReturnGasAndSetCredit, nil
}
}
}
Expand All @@ -186,9 +188,9 @@ func (r *ResolvedTransaction) BuyGas(state *state.State, blockTime uint64) (
}

if sufficient {
return baseGasPrice, gasPrice, r.Origin, func(rgas uint64) error { _, err := doReturnGas(rgas); return err }, nil
return baseGasPrice, gasPrice, r.Origin, prepaid, func(rgas uint64) error { _, err := doReturnGas(rgas); return err }, nil
}
return nil, nil, thor.Address{}, nil, errors.New("insufficient energy")
return nil, nil, thor.Address{}, nil, nil, errors.New("insufficient energy")
}

// ToContext create a tx context object.
Expand Down
2 changes: 1 addition & 1 deletion runtime/resolved_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (tr *testResolvedTransaction) TestBuyGas() {
if err != nil {
tr.t.Fatal(err)
}
_, _, payer, returnGas, err := resolve.BuyGas(state, targetTime)
_, _, payer, _, returnGas, err := resolve.BuyGas(state, targetTime)
tr.assert.Nil(err)
returnGas(100)
return payer
Expand Down
2 changes: 1 addition & 1 deletion runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (rt *Runtime) PrepareTransaction(tx *tx.Transaction) (*TransactionExecutor,
return nil, err
}

baseGasPrice, gasPrice, payer, returnGas, err := resolvedTx.BuyGas(rt.state, rt.ctx.Time)
baseGasPrice, gasPrice, payer, _, returnGas, err := resolvedTx.BuyGas(rt.state, rt.ctx.Time)
if err != nil {
return nil, err
}
Expand Down
28 changes: 24 additions & 4 deletions txpool/tx_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ type txObject struct {
*tx.Transaction
resolved *runtime.ResolvedTransaction

timeAdded int64
executable bool
timeAdded int64
localSubmitted bool // tx is submitted locally on this node, or synced remotely from p2p.
payer *thor.Address // payer of the tx, either origin, delegator, or on-chain delegation payer
cost *big.Int // total tx cost the payer needs to pay before execution(gas price * gas)

executable bool // don't touch this value, will be updated by the pool
overallGasPrice *big.Int // don't touch this value, it's only be used in pool's housekeeping
localSubmitted bool // tx is submitted locally on this node, or synced remotely from p2p.
}

func resolveTx(tx *tx.Transaction, localSubmitted bool) (*txObject, error) {
Expand All @@ -51,6 +54,14 @@ func (o *txObject) Delegator() *thor.Address {
return o.resolved.Delegator
}

func (o *txObject) Cost() *big.Int {
return o.cost
}

func (o *txObject) Payer() *thor.Address {
return o.payer
}

func (o *txObject) Executable(chain *chain.Chain, state *state.State, headBlock *block.Header) (bool, error) {
switch {
case o.Gas() > headBlock.GasLimit():
Expand Down Expand Up @@ -86,9 +97,18 @@ func (o *txObject) Executable(chain *chain.Chain, state *state.State, headBlock
return false, nil
}

if _, _, _, _, err := o.resolved.BuyGas(state, headBlock.Timestamp()+thor.BlockInterval); err != nil {
checkpoint := state.NewCheckpoint()
defer state.RevertTo(checkpoint)

_, _, payer, prepaid, _, err := o.resolved.BuyGas(state, headBlock.Timestamp()+thor.BlockInterval)
if err != nil {
return false, err
}

if !o.executable {
o.payer = &payer
o.cost = prepaid
}
return true, nil
}

Expand Down
80 changes: 67 additions & 13 deletions txpool/tx_object_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,28 @@ package txpool

import (
"errors"
"math/big"
"sync"

"github.com/vechain/thor/v2/thor"
"github.com/vechain/thor/v2/tx"
)

// txObjectMap to maintain mapping of tx hash to tx object, and account quota.
// txObjectMap to maintain mapping of tx hash to tx object, account quota and pending cost.
type txObjectMap struct {
lock sync.RWMutex
mapByHash map[thor.Bytes32]*txObject
mapByID map[thor.Bytes32]*txObject
quota map[thor.Address]int
cost map[thor.Address]*big.Int
}

func newTxObjectMap() *txObjectMap {
return &txObjectMap{
mapByHash: make(map[thor.Bytes32]*txObject),
mapByID: make(map[thor.Bytes32]*txObject),
quota: make(map[thor.Address]int),
cost: make(map[thor.Address]*big.Int),
}
}

Expand All @@ -36,7 +39,7 @@ func (m *txObjectMap) ContainsHash(txHash thor.Bytes32) bool {
return found
}

func (m *txObjectMap) Add(txObj *txObject, limitPerAccount int) error {
func (m *txObjectMap) Add(txObj *txObject, limitPerAccount int, validatePayer func(payer thor.Address, needs *big.Int) error) error {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -49,22 +52,50 @@ func (m *txObjectMap) Add(txObj *txObject, limitPerAccount int) error {
return errors.New("account quota exceeded")
}

if d := txObj.Delegator(); d != nil {
if m.quota[*d] >= limitPerAccount {
delegator := txObj.Delegator()
if delegator != nil {
if m.quota[*delegator] >= limitPerAccount {
return errors.New("delegator quota exceeded")
}
m.quota[*d]++
}

var (
cost *big.Int
payer thor.Address
)

if txObj.Cost() != nil {
payer = *txObj.Payer()
pending := m.cost[payer]

if pending == nil {
cost = new(big.Int).Set(txObj.Cost())
} else {
cost = new(big.Int).Add(pending, txObj.Cost())
}

if err := validatePayer(payer, cost); err != nil {
return err
}
}

m.quota[txObj.Origin()]++
if delegator != nil {
m.quota[*delegator]++
}

if cost != nil {
m.cost[payer] = cost
}

m.mapByHash[hash] = txObj
m.mapByID[txObj.ID()] = txObj
return nil
}

func (m *txObjectMap) GetByID(id thor.Bytes32) *txObject {
m.lock.Lock()
defer m.lock.Unlock()
m.lock.RLock()
defer m.lock.RUnlock()
return m.mapByID[id]
}

Expand All @@ -79,11 +110,22 @@ func (m *txObjectMap) RemoveByHash(txHash thor.Bytes32) bool {
delete(m.quota, txObj.Origin())
}

if d := txObj.Delegator(); d != nil {
if m.quota[*d] > 1 {
m.quota[*d]--
if delegator := txObj.Delegator(); delegator != nil {
if m.quota[*delegator] > 1 {
m.quota[*delegator]--
} else {
delete(m.quota, *d)
delete(m.quota, *delegator)
}
}

// update the pending cost of payers
if payer := txObj.Payer(); payer != nil {
if pending := m.cost[*payer]; pending != nil {
if pending.Cmp(txObj.Cost()) <= 0 {
delete(m.cost, *payer)
} else {
m.cost[*payer] = new(big.Int).Sub(pending, txObj.Cost())
}
}
}

Expand All @@ -94,6 +136,17 @@ func (m *txObjectMap) RemoveByHash(txHash thor.Bytes32) bool {
return false
}

func (m *txObjectMap) UpdatePendingCost(txObj *txObject) {
m.lock.Lock()
defer m.lock.Unlock()

if pending := m.cost[*txObj.Payer()]; pending != nil {
m.cost[*txObj.Payer()] = new(big.Int).Add(pending, txObj.Cost())
} else {
m.cost[*txObj.Payer()] = new(big.Int).Set(txObj.Cost())
}
}

func (m *txObjectMap) ToTxObjects() []*txObject {
m.lock.RLock()
defer m.lock.RUnlock()
Expand Down Expand Up @@ -125,11 +178,12 @@ func (m *txObjectMap) Fill(txObjs []*txObject) {
}
// skip account limit check
m.quota[txObj.Origin()]++
if d := txObj.Delegator(); d != nil {
m.quota[*d]++
if delegator := txObj.Delegator(); delegator != nil {
m.quota[*delegator]++
}
m.mapByHash[txObj.Hash()] = txObj
m.mapByID[txObj.ID()] = txObj
// skip cost check and accumulation
}
}

Expand Down
Loading

0 comments on commit 56bd50e

Please sign in to comment.