Skip to content

Commit

Permalink
Ignore DDL commands that come in after a cancel (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Sep 13, 2022
1 parent 85cc6d3 commit 69a4fd9
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 26 deletions.
69 changes: 61 additions & 8 deletions command/ddl_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ const (

func NewDDLCommandRunner(ce *Executor) *DDLCommandRunner {
return &DDLCommandRunner{
ce: ce,
idSeq: -1,
ce: ce,
idSeq: -1,
cancellationSequences: make(map[cancelledCommandsKey]int64),
}
}

Expand Down Expand Up @@ -106,6 +107,16 @@ type DDLCommandRunner struct {
ce *Executor
commands sync.Map
idSeq int64

handleCancelLock sync.Mutex
// Cancel can be received on a node during or *before* the command has been received
// so we keep a map of the sequences for each schema and originating node before which commands should be cancelled
cancellationSequences map[cancelledCommandsKey]int64
}

type cancelledCommandsKey struct {
schemaName string
originatingNodeID int64
}

func (d *DDLCommandRunner) generateCommandKey(origNodeID uint64, commandID uint64) string {
Expand Down Expand Up @@ -144,10 +155,12 @@ func (d *DDLCommandRunner) HandleCancelMessage(clusterMsg remoting.ClusterMessag
if !ok {
panic("not a cancel msg")
}
return d.cancelCommandsForSchema(cancelMsg.SchemaName)
return d.cancelCommandsForSchema(cancelMsg.SchemaName, cancelMsg.CommandId, cancelMsg.OriginatingNodeId)
}

func (d *DDLCommandRunner) cancelCommandsForSchema(schemaName string) error {
// cancel commands for the schema up to and including the commandID
func (d *DDLCommandRunner) cancelCommandsForSchema(schemaName string, commandID int64, originatingNodeID int64) error {
found := false
d.commands.Range(func(key, value interface{}) bool {
command, ok := value.(DDLCommand)
if !ok {
Expand All @@ -157,9 +170,21 @@ func (d *DDLCommandRunner) cancelCommandsForSchema(schemaName string) error {
d.commands.Delete(key)
command.Cancel()
command.Cleanup()
found = true
}
return true
})
if !found {
d.handleCancelLock.Lock()
defer d.handleCancelLock.Unlock()
// If we didn't find the command to delete it's possible that the cancel has arrived before the original command
// so we add it to a map so we know to ignore the command if it arrives later
key := cancelledCommandsKey{
schemaName: schemaName,
originatingNodeID: originatingNodeID,
}
d.cancellationSequences[key] = commandID
}
return nil
}

Expand Down Expand Up @@ -219,7 +244,9 @@ func (d *DDLCommandRunner) HandleDdlMessage(ddlMsg remoting.ClusterMessage) erro
}
com = NewDDLCommand(d.ce, DDLCommandType(ddlInfo.CommandType), ddlInfo.GetSchemaName(), ddlInfo.GetSql(),
ddlInfo.GetTableSequences(), ddlInfo.GetExtraData())
d.commands.Store(skey, com)
if !d.storeIfNotCancelled(skey, ddlInfo.CommandId, ddlInfo.GetOriginatingNodeId(), com) {
return nil
}
}
} else if !ok {
// This can happen if ddlMsg comes in after commands are cancelled
Expand All @@ -232,13 +259,32 @@ func (d *DDLCommandRunner) HandleDdlMessage(ddlMsg remoting.ClusterMessage) erro
com.Cleanup()
}
log.Debugf("Running phase %d for DDL message %d %s returned err %v", phase, com.CommandType(), ddlInfo.Sql, err)
if phase == int32(com.NumPhases()-1) {
// Final phase so delete the command
if phase == int32(com.NumPhases()-1) || err != nil {
// Final phase or err so delete the command
d.commands.Delete(skey)
}
return err
}

func (d *DDLCommandRunner) storeIfNotCancelled(skey string, commandID int64, originatingNodeID int64, com DDLCommand) bool {
d.handleCancelLock.Lock()
defer d.handleCancelLock.Unlock()
// We first check if we have already received a cancel for commands up to this id - cancels can come in before the
// original command was fielded
key := cancelledCommandsKey{
schemaName: com.SchemaName(),
originatingNodeID: originatingNodeID,
}
cid, ok := d.cancellationSequences[key]
if ok && cid >= commandID {
log.Debugf("ddl command arrived after cancellation, it will be ignored command id %d cid %d", commandID, cid)
return false
}

d.commands.Store(skey, com)
return true
}

func (d *DDLCommandRunner) RunCommand(ctx context.Context, command DDLCommand) error {
log.Debugf("Attempting to run DDL command %d", command.CommandType())
lockName := getLockName(command.SchemaName())
Expand Down Expand Up @@ -297,6 +343,7 @@ func (d *DDLCommandRunner) RunWithLock(commandKey string, command DDLCommand, dd
}
if err != nil {
log.Debugf("Error return from broadcasting phase %d for DDL command %d %s %v cancel will be broadcast", phase, command.CommandType(), ddlInfo.Sql, err)
d.commands.Delete(commandKey)
// Broadcast a cancel to clean up command state across the cluster
if err2 := d.broadcastCancel(command.SchemaName()); err2 != nil {
// Ignore
Expand All @@ -309,7 +356,11 @@ func (d *DDLCommandRunner) RunWithLock(commandKey string, command DDLCommand, dd
}

func (d *DDLCommandRunner) broadcastCancel(schemaName string) error {
return d.ce.ddlResetClient.Broadcast(&clustermsgs.DDLCancelMessage{SchemaName: schemaName})
return d.ce.ddlResetClient.Broadcast(&clustermsgs.DDLCancelMessage{
SchemaName: schemaName,
CommandId: atomic.LoadInt64(&d.idSeq),
OriginatingNodeId: int64(d.ce.config.NodeID),
})
}

func (d *DDLCommandRunner) broadcastDDL(phase int32, ddlInfo *clustermsgs.DDLStatementInfo) error {
Expand Down Expand Up @@ -340,8 +391,10 @@ func (d *DDLCommandRunner) getLock(lockName string) error {
}

func (d *DDLCommandRunner) empty() bool {
log.Debug("DDLCommand Runner state:")
count := 0
d.commands.Range(func(key, value interface{}) bool {
log.Debugf("DDLCommand runner has command: %s", value.(DDLCommand).SQL()) //nolint:forcetypeassert
count++
return true
})
Expand Down
4 changes: 1 addition & 3 deletions kafkatest/kafka_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build integration
// +build integration

package kafkatest

import (
Expand Down Expand Up @@ -194,6 +191,7 @@ func stopPranaCluster(t *testing.T, cluster []*server.Server) {
}

func waitUntilRowsInPayments(t *testing.T, numRows int, cli *client.Client) {
t.Helper()
ok, err := commontest.WaitUntilWithError(func() (bool, error) {
ch, err := cli.ExecuteStatement("select * from payments order by payment_id", nil, nil)
require.NoError(t, err)
Expand Down
4 changes: 1 addition & 3 deletions msggen/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func (p *PaymentGenerator) GenerateMessage(_ int32, index int64, rnd *rand.Rand)

paymentTypes := []string{"btc", "p2p", "other"}
currencies := []string{"gbp", "usd", "eur", "aud"}
// timestamp needs to be in the future - otherwise, if it's in the past Kafka might start deleting log entries
// thinking they're past log retention time.
timestamp := time.Date(2100, time.Month(4), 12, 9, 0, 0, 0, time.UTC)
timestamp := time.Now()

m := make(map[string]interface{})
paymentID := fmt.Sprintf("payment%06d", index)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@


6squareup/cash/pranadb/clustermsgs/v1/clustermsgs.proto$squareup.cash.pranadb.clustermsgs.v1"�
DDLStatementInfo.
Expand All @@ -13,10 +13,13 @@ schemaName
sql ( Rsql'
table_sequences (RtableSequences

extra_data ( R extraData"3
DDLCancelMessage
schema_name ( R
schemaName"
extra_data ( R extraData"�
DDLCancelMessage.
originating_node_id (RoriginatingNodeId
schema_name ( R
schemaName

command_id (R commandId"
ReloadProtobuf"U
ClusterProposeRequest
shard_id (RshardId!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ message DDLStatementInfo {
}

message DDLCancelMessage {
string schema_name = 1;
int64 originating_node_id = 1;
string schema_name = 2;
int64 command_id = 3;
}

message ReloadProtobuf {
Expand Down
31 changes: 26 additions & 5 deletions protos/squareup/cash/pranadb/v1/clustermsgs/clustermsgs.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sqltest/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ func TestSQLClusteredThreeNodes(t *testing.T) {
t.Skip("-short: skipped")
}
log.Info("Running TestSQLClusteredThreeNodes")
testSQL(t, false, 3, 3, false, false, tlsKeysInfo)
testSQL(t, false, 3, 3, false, true, tlsKeysInfo)
}

0 comments on commit 69a4fd9

Please sign in to comment.