Skip to content

Commit

Permalink
Mempool async processing (#109)
Browse files Browse the repository at this point in the history
* Backport cometbft#3211

* Fix Race

* bp cometbft#3157

* Speedup tests that were hitting timeouts

* bp cometbft#3161

* Fix data race

* Mempool async processing

* Forgot to commit important part

* Add changelog
  • Loading branch information
ValarDragon authored and czarcas7ic committed Jun 19, 2024
1 parent aafecec commit d62c5ef
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 13 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# CHANGELOG

## v0.37.4-v25-osmo-6
## v0.37.4-v25-osmo-8

* [#108](https://github.com/osmosis-labs/cometbft/pull/108) perf(consensus): Make some consensus reactor messages take RLock's not WLock's (#3211)
* [#108](https://github.com/osmosis-labs/cometbft/pull/108) perf(consensus): Make late votes outside of last block commits not get to peerMsgQueue (#3157)
* [#108](https://github.com/osmosis-labs/cometbft/pull/108) perf(consensus): Make reactor check for duplicate/old block parts (#3161)
* [#109](https://github.com/osmosis-labs/cometbft/pull/109) perf(p2p,mempool): Make mempool reactor receive not block. (Fixed by either #3209, #3230)

## v0.37.4-v25-osmo-7

* [#83](https://github.com/osmosis-labs/cometbft/pull/83) perf(types): 3x speedup MakePartSet (#3117)
* [#85](https://github.com/osmosis-labs/cometbft/pull/85) perf(flowrate): Speedup flowrate.Clock (#3016)
Expand Down
51 changes: 39 additions & 12 deletions mempool/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Reactor struct {
mempool *CListMempool
ids *mempoolIDs

peerTxProcesserChan chan *peerIncomingTx

// Semaphores to keep track of how many connections to peers are active for broadcasting
// transactions. Each semaphore has a capacity that puts an upper bound on the number of
// connections for different groups of peers.
Expand Down Expand Up @@ -106,6 +108,7 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers))
memR.activeNonPersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers))
memR.peerTxProcesserChan = make(chan *peerIncomingTx, 10000)

return memR
}
Expand All @@ -127,9 +130,15 @@ func (memR *Reactor) OnStart() error {
if !memR.config.Broadcast {
memR.Logger.Info("Tx broadcasting is disabled")
}

go memR.incomingPacketProcessor()
return nil
}

func (memR *Reactor) OnStop() {
close(memR.peerTxProcesserChan)
}

// GetChannels implements Reactor by returning the list of channels for this
// reactor.
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
Expand Down Expand Up @@ -199,20 +208,44 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// broadcast routine checks if peer is gone and returns
}

type peerIncomingTx struct {
tx *protomem.Txs
peer p2p.Peer
}

// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
pit := &peerIncomingTx{
tx: msg,
peer: e.Src,
}
memR.peerTxProcesserChan <- pit
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
memR.Switch.StopPeerForError(e.Src, fmt.Errorf("mempool cannot handle message of type: %T", e.Message))
return
}
}

func (memR *Reactor) incomingPacketProcessor() {
for {
pit, chanOpen := <-memR.peerTxProcesserChan
if !chanOpen {
break
}

protoTxs := pit.tx.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received empty txs from peer", "src", e.Src)
return
memR.Logger.Error("received empty txs from peer", "src", pit.peer)
continue
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(pit.peer)}
if pit.peer != nil {
txInfo.SenderP2PID = pit.peer.ID()
}

var err error
Expand All @@ -225,13 +258,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
memR.Switch.StopPeerForError(e.Src, fmt.Errorf("mempool cannot handle message of type: %T", e.Message))
return
}

// broadcasting happens from go routines per peer
}

// PeerState describes the state of a peer.
Expand Down

0 comments on commit d62c5ef

Please sign in to comment.