Skip to content

Commit

Permalink
Allow processor to run actions and run fill on processor (#560)
Browse files Browse the repository at this point in the history
Allow processor to run actions and run fill on processor to prevent starvation under load
  • Loading branch information
purplefox authored Sep 8, 2022
1 parent d299443 commit 85cc6d3
Show file tree
Hide file tree
Showing 17 changed files with 427 additions and 167 deletions.
2 changes: 1 addition & 1 deletion cfg/example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,4 @@ locks-compaction-overhead = 250 // After a snapshot is taken how many wr
raft-rtt-ms = 50 // The size of a Raft RTT unit in ms
raft-heartbeat-rtt = 30 // The Raft heartbeat period in units of raft-rtt-ms
raft-election-rtt = 300 // The Raft election period in units of raft-rtt-ms
screen-dragon-log-spam = true
screen-dragon-log-spam = true
10 changes: 6 additions & 4 deletions cluster/dragon/dragon.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (d *Dragon) WriteForwardBatch(batch *cluster.WriteBatch, direct bool) error
batch.ShardID)
}
} else {
// For a non direct write forward batch we queued them on the processor
// For a non direct write forward batch we queue them on the processor
start := time.Now()
for {
leaderNodeID, ok := d.procMgr.getLeaderNode(batch.ShardID)
Expand Down Expand Up @@ -873,20 +873,22 @@ func (d *Dragon) executeRaftOpWithRetry(f func() (interface{}, error)) (interfac
if !errors.Is(err, dragonboat.ErrClusterNotReady) && !errors.Is(err, dragonboat.ErrTimeout) &&
!errors.Is(err, dragonboat.ErrClusterNotFound) && !errors.Is(err, dragonboat.ErrClusterClosed) &&
!errors.Is(err, dragonboat.ErrClusterNotInitialized) && !errors.Is(err, dragonboat.ErrClusterNotBootstrapped) &&
!errors.Is(err, dragonboat.ErrInvalidDeadline) {
!errors.Is(err, dragonboat.ErrSystemBusy) && !errors.Is(err, dragonboat.ErrInvalidDeadline) &&
!errors.Is(err, dragonboat.ErrClosed) {
return nil, errors.WithStack(err)
}
if time.Now().Sub(start) >= operationTimeout {
// If we timeout, then something is seriously wrong
log.Errorf("error in making dragonboat calls %+v", err)
return nil, err
}
log.Debugf("Received retryable error in raft operation %v, will retry", err)
if d.stopSignaller.Get() {
return nil, errors.New("server is closing")
}
var delay time.Duration
if err == dragonboat.ErrTimeout {
// For Raft timeouts we backoff longer before retrying
if err == dragonboat.ErrTimeout || err == dragonboat.ErrSystemBusy {
// For Raft timeouts or system busy we backoff longer before retrying
errTimeoutCount++
delay = d.cnf.RaftCallTimeout
if errTimeoutCount > 5 {
Expand Down
1 change: 1 addition & 0 deletions command/create_mv_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (c *CreateMVCommand) onPhase2() error {
if err := c.e.pushEngine.RegisterMV(c.mv); err != nil {
return errors.WithStack(err)
}

if err := c.e.metaController.RegisterMaterializedView(c.mv.Info, c.mv.InternalTables); err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions dragonboat/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,16 @@ func (n *node) propose(session *client.Session,
return n.pendingProposals.propose(session, cmd, timeout)
}

func (n *node) numPendingProposals() (uint64, error) {
if !n.initialized() {
return 0, ErrClusterNotReady
}
if n.isWitness() {
return 0, ErrInvalidOperation
}
return n.pendingProposals.numPendingProposals(), nil
}

func (n *node) read(timeout uint64) (*RequestState, error) {
if !n.initialized() {
return nil, ErrClusterNotReady
Expand Down
11 changes: 11 additions & 0 deletions dragonboat/nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,17 @@ func (nh *NodeHost) propose(s *client.Session,
return req, err
}

func (nh *NodeHost) NumPendingProposals(clusterID uint64) (uint64, error) {
if atomic.LoadInt32(&nh.closed) != 0 {
return 0, ErrClosed
}
v, ok := nh.getCluster(clusterID)
if !ok {
return 0, ErrClusterNotFound
}
return v.numPendingProposals()
}

func (nh *NodeHost) readIndex(clusterID uint64,
timeout time.Duration) (*RequestState, *node, error) {
if atomic.LoadInt32(&nh.closed) != 0 {
Expand Down
6 changes: 6 additions & 0 deletions dragonboat/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func (q *entryQueue) get(paused bool) []pb.Entry {
return t[:sz]
}

func (q *entryQueue) pendingSize() uint64 {
q.mu.Lock()
defer q.mu.Unlock()
return q.idx
}

type readIndexQueue struct {
size uint64
left []*RequestState
Expand Down
8 changes: 8 additions & 0 deletions dragonboat/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,14 @@ func (p *pendingProposal) propose(session *client.Session,
return pp.propose(session, cmd, key, timeoutTick)
}

func (p *pendingProposal) numPendingProposals() uint64 {
var num uint64
for _, shard := range p.shards {
num += shard.proposals.pendingSize()
}
return num
}

func (p *pendingProposal) close() {
for _, pp := range p.shards {
pp.close()
Expand Down
45 changes: 44 additions & 1 deletion kafka/load/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ type simpleGenerator struct {
uniqueIDsPerPartition int64
}

func (s *simpleGenerator) Init() {
}

func (s *simpleGenerator) GenerateMessage(partitionID int32, offset int64, rnd *rand.Rand) (*kafka.Message, error) {
m := make(map[string]interface{})
customerToken := fmt.Sprintf("customer-token-%d-%d", partitionID, offset%s.uniqueIDsPerPartition)
Expand All @@ -38,6 +41,46 @@ func (s *simpleGenerator) GenerateMessage(partitionID int32, offset int64, rnd *
return msg, nil
}

func (s simpleGenerator) Name() string {
func (s *simpleGenerator) Name() string {
return "simple"
}

type paymentsGenerator struct {
uniqueIDsPerPartition int64
paymentTypes []string
currencies []string
}

func (p *paymentsGenerator) Init() {
p.paymentTypes = []string{"btc", "p2p", "other"}
p.currencies = []string{"gbp", "usd", "eur", "aud"}
}

func (p *paymentsGenerator) GenerateMessage(partitionID int32, offset int64, rnd *rand.Rand) (*kafka.Message, error) {
m := make(map[string]interface{})
// Payment id must be globally unique - so we include partition id and offset in it
paymentID := fmt.Sprintf("payment-%010d-%019d", partitionID, offset)
customerID := fmt.Sprintf("customer-token-%010d-%019d", partitionID, offset%p.uniqueIDsPerPartition)
m["customer_token"] = customerID
m["amount"] = fmt.Sprintf("%.2f", float64(rnd.Int31n(1000000))/10)
m["payment_type"] = p.paymentTypes[int(offset)%len(p.paymentTypes)]
m["currency"] = p.currencies[int(offset)%len(p.currencies)]
json, err := json2.Marshal(&m)
if err != nil {
return nil, errors.WithStack(err)
}
msg := &kafka.Message{
Key: []byte(paymentID),
Value: json,
TimeStamp: time.Now(),
PartInfo: kafka.PartInfo{
PartitionID: partitionID,
Offset: offset,
},
}
return msg, nil
}

func (p *paymentsGenerator) Name() string {
return "payments"
}
8 changes: 8 additions & 0 deletions kafka/load/load_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (l *LoadClientMessageProviderFactory) NewMessageProvider() (kafka.MessagePr
if err != nil {
return nil, err
}
msgGen.Init()
return &LoadClientMessageProvider{
factory: l,
msgs: msgs,
Expand All @@ -119,6 +120,8 @@ func (l *LoadClientMessageProviderFactory) getMessageGenerator(name string) (msg
switch name {
case "simple":
return &simpleGenerator{uniqueIDsPerPartition: l.uniqueIDsPerPartition}, nil
case "payments":
return &paymentsGenerator{uniqueIDsPerPartition: l.uniqueIDsPerPartition}, nil
default:
return nil, errors.Errorf("unknown message generator name %s", name)
}
Expand Down Expand Up @@ -151,6 +154,11 @@ type LoadClientMessageProvider struct {
func (l *LoadClientMessageProvider) GetMessage(pollTimeout time.Duration) (*kafka.Message, error) {
select {
case msg := <-l.msgs:
if msg == nil {
// Messages channel was closed - probably max number of configured messages was exceeded
// In this case we don't want to busy loop, so we introduce a delay
time.Sleep(pollTimeout)
}
return msg, nil
case <-time.After(pollTimeout):
return nil, nil
Expand Down
1 change: 1 addition & 0 deletions msggen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type MessageGenerator interface {
GenerateMessage(scope int32, index int64, rnd *rand.Rand) (*kafka.Message, error)
Name() string
Init()
}

type GenManager struct {
Expand Down
3 changes: 3 additions & 0 deletions msggen/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func (p *PaymentGenerator) Name() string {
return "payments"
}

func (p *PaymentGenerator) Init() {
}

func (p *PaymentGenerator) GenerateMessage(_ int32, index int64, rnd *rand.Rand) (*kafka.Message, error) {

paymentTypes := []string{"btc", "p2p", "other"}
Expand Down
48 changes: 28 additions & 20 deletions push/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,18 @@ func (p *Engine) CreateIndex(indexInfo *common.IndexInfo, fill bool, shardIDs []

consumerName := fmt.Sprintf("%s.%s", te.TableInfo.Name, indexInfo.Name)
if fill {

schedulers := make(map[uint64]*sched.ShardScheduler)
for _, shardID := range shardIDs {
sched := p.schedulers[shardID]
if sched == nil {
return errors.NewPranaErrorf(errors.DdlRetry, "not all shards have leaders")
}
schedulers[shardID] = sched
}

// And fill it with the data from the table - this creates the index
if err := te.FillTo(indexExec, consumerName, indexInfo.ID, shardIDs, p.failInject, interruptor); err != nil {
if err := te.FillTo(indexExec, consumerName, indexInfo.ID, schedulers, p.failInject, interruptor); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -447,19 +457,10 @@ func (p *Engine) processReceiveBatch(batch *receiveBatch) error {

rcVal, ok := p.remoteConsumers.Load(entityID)
if !ok {
// Does the entity exist in storage?
rows, err := p.queryExec.ExecuteQuery("sys", fmt.Sprintf("select id from tables where id=%d", entityID))
if err != nil {
return errors.WithStack(err)
}
if rows.RowCount() == 1 {
// The entity is in storage but not deployed - should not happen - we must throw an error to avoid losing
// data
return errors.Errorf("entity with id %d not registered", entityID)
}
// This could correspond to a source or mv which failed during fill and was never fully created
// but left rows in the receiver table. in this case we can just ignore them
log.Warnf("remote consumer %d not loaded", entityID)
// It can also occur if an MV is dropped while the system is ingesting
log.Debugf("remote consumer %d not loaded - batch will be dropped. this is usually because data is being processed for a dropped entity", entityID)
return nil
}
remoteConsumer := rcVal.(*RemoteConsumer) //nolint:forcetypeassert
Expand Down Expand Up @@ -798,19 +799,26 @@ func (l *loadClientSetRateHandler) HandleMessage(clusterMsg remoting.ClusterMess
if !ok {
panic("not a ConsumerSetRate")
}
l.p.lock.Lock()
defer l.p.lock.Unlock()
sourceInfo, ok := l.p.meta.GetSource(setRate.SchemaName, setRate.SourceName)
source, err := l.p.getSourceFromName(setRate.SchemaName, setRate.SourceName)
if err != nil {
return nil, err
}
return nil, source.SetMaxIngestRate(int(setRate.Rate))
}

func (p *Engine) getSourceFromName(schemaName string, sourceName string) (*source.Source, error) {
p.lock.RLock()
defer p.lock.RUnlock()
sourceInfo, ok := p.meta.GetSource(schemaName, sourceName)
if !ok {
return nil, errors.NewPranaErrorf(errors.UnknownSource, "Unknown source %s.%s", setRate.SchemaName, setRate.SourceName)
return nil, errors.NewPranaErrorf(errors.UnknownSource, "Unknown source %s.%s", schemaName, sourceName)
}
source, ok := l.p.sources[sourceInfo.ID]
source, ok := p.sources[sourceInfo.ID]
if !ok {
// Internal error
return nil, errors.Errorf("can't find source %s.%s", setRate.SchemaName, setRate.SourceName)
return nil, errors.Errorf("can't find source %s.%s", schemaName, sourceName)
}
source.SetMaxIngestRate(int(setRate.Rate))
return nil, nil
return source, nil
}

func (p *Engine) GetLoadClientSetRateHandler() remoting.ClusterMessageHandler {
Expand Down
Loading

0 comments on commit 85cc6d3

Please sign in to comment.