Skip to content

Commit

Permalink
feat: op-batcher auto switch to economic DA type (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoieh authored Jun 24, 2024
1 parent 40cf16b commit 760663a
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 9 deletions.
16 changes: 16 additions & 0 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"sync"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand Down Expand Up @@ -225,6 +226,21 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

func (s *channelManager) SwitchDAType(targetDAType flags.DataAvailabilityType) {
s.mu.Lock()
defer s.mu.Unlock()
switch targetDAType {
case flags.BlobsType:
s.cfg.MaxFrameSize = eth.MaxBlobDataSize - 1
s.cfg.MultiFrameTxs = true
case flags.CalldataType:
s.cfg.MaxFrameSize = CallDataMaxTxSize - 1
s.cfg.MultiFrameTxs = false
default:
s.log.Crit("channel manager switch to a invalid DA type", "targetDAType", targetDAType.String())
}
}

// registerL1Block registers the given block at the pending channel.
func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.currentChannel.CheckTimeout(l1Head.Number)
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (c *CLIConfig) Check() error {
if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType)
}
if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 {
if (c.DataAvailabilityType == flags.BlobsType || c.DataAvailabilityType == flags.AutoType) && c.TargetNumFrames > 6 {
return errors.New("too many frames for blob transactions, max 6")
}
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {
Expand Down
178 changes: 177 additions & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,39 @@ import (
"io"
"math/big"
_ "net/http/pprof"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

const LimitLoadBlocksOneTime uint64 = 30

// Auto DA params
const DATypeSwitchThrehold int = 5
const CallDataMaxTxSize uint64 = 120000
const ApproximateGasPerCallDataTx int64 = 1934892
const MaxBlobsNumberPerTx int64 = 6

var ErrBatcherNotRunning = errors.New("batcher is not running")

type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
}

type L2Client interface {
Expand All @@ -47,6 +61,7 @@ type DriverSetup struct {
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfig
PlasmaDA *plasma.DAClient
AutoSwitchDA bool
}

// BatchSubmitter encapsulates a service responsible for submitting L2 tx
Expand All @@ -68,6 +83,9 @@ type BatchSubmitter struct {
lastStoredBlock eth.BlockID
lastL1Tip eth.L1BlockRef

// addressReservedError is recorded from L1 txpool, which may occur when switch DA type
addressReservedError atomic.Bool

state *channelManager
}

Expand Down Expand Up @@ -155,10 +173,15 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
} else if start.Number >= end.Number {
return errors.New("start number is >= end number")
}
// Limit the max loaded blocks one time
endNumber := end.Number
if endNumber-start.Number > LimitLoadBlocksOneTime {
endNumber = start.Number + LimitLoadBlocksOneTime
}

var latestBlock *types.Block
// Add all blocks to "state"
for i := start.Number + 1; i < end.Number+1; i++ {
for i := start.Number + 1; i < endNumber+1; i++ {
block, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.Log.Warn("Found L2 reorg", "block_number", i)
Expand Down Expand Up @@ -272,6 +295,78 @@ func (l *BatchSubmitter) loop() {
}
}()

economicDATypeCh := make(chan flags.DataAvailabilityType)
waitSwitchDACh := make(chan struct{})
if l.AutoSwitchDA {
// start auto choose economic DA type processing loop
economicDALoopDone := make(chan struct{})
defer close(economicDALoopDone) // shut down auto DA loop
go func() {
economicDAType := flags.BlobsType
l.Metr.RecordAutoChoosedDAType(economicDAType)
switchCount := 0
economicDATicker := time.NewTicker(5 * time.Second)
defer economicDATicker.Stop()
addressReservedErrorTicker := time.NewTicker(time.Second)
defer addressReservedErrorTicker.Stop()
for {
select {
case <-economicDATicker.C:
newEconomicDAType, err := l.getEconomicDAType(l.shutdownCtx)
if err != nil {
l.Log.Error("getEconomicDAType failed: %w", err)
continue
}
if newEconomicDAType != economicDAType {
switchCount++
} else {
switchCount = 0
}
threhold := DATypeSwitchThrehold
if economicDAType == flags.CalldataType {
threhold = 20 * DATypeSwitchThrehold
}
if switchCount >= threhold {
l.Log.Info("start economic switch", "from type", economicDAType.String(), "to type", newEconomicDAType.String())
start := time.Now()
economicDAType = newEconomicDAType
switchCount = 0
economicDATypeCh <- economicDAType
<-waitSwitchDACh
l.Log.Info("finish economic switch", "duration", time.Since(start))
l.Metr.RecordAutoChoosedDAType(economicDAType)
l.Metr.RecordEconomicAutoSwitchCount()
l.Metr.RecordAutoSwitchTimeDuration(time.Since(start))
}
case <-addressReservedErrorTicker.C:
if l.addressReservedError.Load() {
if economicDAType == flags.BlobsType {
economicDAType = flags.CalldataType
l.Log.Info("start resolve addressReservedError switch", "from type", flags.BlobsType.String(), "to type", flags.CalldataType.String())
} else if economicDAType == flags.CalldataType {
economicDAType = flags.BlobsType
l.Log.Info("start resolve addressReservedError switch", "from type", flags.CalldataType.String(), "to type", flags.BlobsType.String())
} else {
l.Log.Crit("invalid DA type in economic switch loop", "invalid type", economicDAType.String())
}
switchCount = 0
start := time.Now()
economicDATypeCh <- economicDAType
<-waitSwitchDACh
l.Log.Info("finish resolve addressReservedError switch", "duration", time.Since(start))
l.Metr.RecordAutoChoosedDAType(economicDAType)
l.Metr.RecordReservedErrorSwitchCount()
l.Metr.RecordAutoSwitchTimeDuration(time.Since(start))
l.addressReservedError.Store(false)
}
case <-economicDALoopDone:
l.Log.Info("auto DA processing loop done")
return
}
}
}()
}

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()

Expand Down Expand Up @@ -302,6 +397,26 @@ func (l *BatchSubmitter) loop() {
continue
}
l.publishStateToL1(queue, receiptsCh)
case targetDAType := <-economicDATypeCh:
l.lastStoredBlock = eth.BlockID{}
// close current state to prepare for switch
err := l.state.Close()
if err != nil {
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("Closed channel manager to handle DA type switch with pending channel(s) remaining - submitting")
} else {
l.Log.Error("Error closing the channel manager to handle a DA type switch", "err", err)
}
}
// on DA type switch we want to publish all pending state then wait until each result clears before resetting
// the state.
publishAndWait()
l.clearState(l.shutdownCtx)
// switch action after clear state
l.switchDAType(targetDAType)
time.Sleep(time.Minute) // wait op-node derivation published DA data
waitSwitchDACh <- struct{}{}
continue
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand All @@ -324,6 +439,54 @@ func (l *BatchSubmitter) loop() {
}
}

func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvailabilityType, error) {
sCtx, sCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer sCancel()
gasPrice, err := l.L1Client.SuggestGasTipCap(sCtx)
if err != nil {
return "", fmt.Errorf("getEconomicDAType failed to fetch the suggested gas tip cap: %w", err)
}
calldataCost := big.NewInt(0).Mul(big.NewInt(MaxBlobsNumberPerTx*ApproximateGasPerCallDataTx), gasPrice)

hCtx, hCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer hCancel()
header, err := l.L1Client.HeaderByNumber(hCtx, nil)
if err != nil {
return "", fmt.Errorf("getEconomicDAType failed to fetch the latest header: %w", err)
}
if header.ExcessBlobGas == nil {
return "", fmt.Errorf("getEconomicDAType fetched header with nil ExcessBlobGas: %v", header)
}
blobGasPrice := eip4844.CalcBlobFee(*header.ExcessBlobGas)
blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(int64(params.TxGas)), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice))

l.Metr.RecordEstimatedCalldataTypeFee(calldataCost)
l.Metr.RecordEstimatedBlobTypeFee(blobCost)
if calldataCost.Cmp(blobCost) < 0 {
l.Log.Info("Economic DA type is calldata", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost)
return flags.CalldataType, nil
}
l.Log.Info("Economic DA type is blobs", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost)
return flags.BlobsType, nil
}

func (l *BatchSubmitter) switchDAType(targetDAType flags.DataAvailabilityType) {
switch targetDAType {
case flags.BlobsType:
l.Config.UseBlobs = true
l.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize - 1
l.ChannelConfig.MultiFrameTxs = true
l.state.SwitchDAType(targetDAType)
case flags.CalldataType:
l.Config.UseBlobs = false
l.ChannelConfig.MaxFrameSize = CallDataMaxTxSize - 1
l.ChannelConfig.MultiFrameTxs = false
l.state.SwitchDAType(targetDAType)
default:
l.Log.Crit("batch submitter switch to a invalid DA type", "targetDAType", targetDAType.String())
}
}

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
Expand Down Expand Up @@ -525,6 +688,10 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.Log.Warn("Transaction failed to send", logFields(id, err)...)
l.state.TxFailed(id)
if errStringMatch(err, txmgr.ErrAlreadyReserved) && l.AutoSwitchDA {
l.Log.Warn("Encounter ErrAlreadyReserved", "id", id.String())
l.addressReservedError.Store(true)
}
}

func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
Expand Down Expand Up @@ -560,3 +727,12 @@ func logFields(xs ...any) (fs []any) {
}
return fs
}

func errStringMatch(err, target error) bool {
if err == nil && target == nil {
return true
} else if err == nil || target == nil {
return false
}
return strings.Contains(err.Error(), target.Error())
}
8 changes: 5 additions & 3 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPlasmaDA(cfg); err != nil {
return fmt.Errorf("failed to init plasma DA: %w", err)
}
bs.initDriver()
bs.initDriver(cfg)
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
}

switch cfg.DataAvailabilityType {
case flags.BlobsType:
case flags.BlobsType, flags.AutoType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
Expand Down Expand Up @@ -228,6 +228,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
return fmt.Errorf("invalid channel configuration: %w", err)
}
bs.Log.Info("Initialized channel-config",
"da_type", cfg.DataAvailabilityType.String(),
"use_blobs", bs.UseBlobs,
"use_plasma", bs.UsePlasma,
"max_frame_size", cc.MaxFrameSize,
Expand Down Expand Up @@ -286,7 +287,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
return nil
}

func (bs *BatcherService) initDriver() {
func (bs *BatcherService) initDriver(cfg *CLIConfig) {
bs.driver = NewBatchSubmitter(DriverSetup{
Log: bs.Log,
Metr: bs.Metrics,
Expand All @@ -297,6 +298,7 @@ func (bs *BatcherService) initDriver() {
EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig,
PlasmaDA: bs.PlasmaDA,
AutoSwitchDA: cfg.DataAvailabilityType == flags.AutoType,
})
}

Expand Down
2 changes: 2 additions & 0 deletions op-batcher/flags/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ const (
// data availability types
CalldataType DataAvailabilityType = "calldata"
BlobsType DataAvailabilityType = "blobs"
AutoType DataAvailabilityType = "auto"
)

var DataAvailabilityTypes = []DataAvailabilityType{
CalldataType,
BlobsType,
AutoType,
}

func (kind DataAvailabilityType) String() string {
Expand Down
Loading

0 comments on commit 760663a

Please sign in to comment.