@@ -3755,11 +3755,6 @@ type reorgPartitionWorker struct {
37553755 writeColOffsetMap map [int64 ]int
37563756 maxOffset int
37573757 reorgedTbl table.PartitionedTable
3758- // Only used for non-clustered tables, since we need to re-generate _tidb_rowid,
3759- // and check if the old _tidb_rowid was already written or not.
3760- // If the old _tidb_rowid already exists, then the row is already backfilled (double written)
3761- // and can be skipped. Otherwise, we will insert it and generate index entries.
3762- oldKeys []kv.Key
37633758}
37643759
37653760func newReorgPartitionWorker (i int , t table.PhysicalTable , decodeColMap map [int64 ]decoder.Column , reorgInfo * reorgInfo , jc * ReorgContext ) (* reorgPartitionWorker , error ) {
@@ -3826,46 +3821,88 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
38263821 // i.e. concurrently written by StateWriteOnly or StateWriteReorganization.
38273822 // and if so, skip it.
38283823 var found map [string ][]byte
3829- if len (w .oldKeys ) > 0 {
3824+ lockKey := make ([]byte , 0 , tablecodec .RecordRowKeyLen )
3825+ lockKey = append (lockKey , handleRange .startKey [:tablecodec .TableSplitKeyLen ]... )
3826+ if ! w .table .Meta ().HasClusteredIndex () && len (w .rowRecords ) > 0 {
3827+ failpoint .InjectCall ("PartitionBackfillNonClustered" , w .rowRecords [0 ].vals )
38303828 // we must check if old IDs already been written,
38313829 // i.e. double written by StateWriteOnly or StateWriteReorganization.
3832- // TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel?
3833- found , err = txn .BatchGet (ctx , w .oldKeys )
3830+
3831+ // TODO: test how to use PresumeKeyNotExists/NeedConstraintCheckInPrewrite/DO_CONSTRAINT_CHECK
3832+ // to delay the check until commit.
3833+ // And handle commit errors and fall back to this method of checking all keys to see if we need to skip any.
3834+ newKeys := make ([]kv.Key , 0 , len (w .rowRecords ))
3835+ for i := range w .rowRecords {
3836+ newKeys = append (newKeys , w .rowRecords [i ].key )
3837+ }
3838+ found , err = txn .BatchGet (ctx , newKeys )
38343839 if err != nil {
38353840 return errors .Trace (err )
38363841 }
3842+
3843+ // TODO: Add test that kills (like `kill -9`) the currently running
3844+ // ddl owner, to see how it handles re-running this backfill when some batches has
3845+ // committed and reorgInfo has not been updated, so it needs to redo some batches.
38373846 }
3847+ tmpRow := make ([]types.Datum , len (w .reorgedTbl .Cols ()))
38383848
3839- for i , prr := range w .rowRecords {
3849+ for _ , prr := range w .rowRecords {
38403850 taskCtx .scanCount ++
38413851 key := prr .key
3852+ lockKey = lockKey [:tablecodec .TableSplitKeyLen ]
3853+ lockKey = append (lockKey , key [tablecodec .TableSplitKeyLen :]... )
3854+ // Lock the *old* key, since there can still be concurrent update happening on
3855+ // the rows from fetchRowColVals(). If we cannot lock the keys in this
3856+ // transaction and succeed when committing, then another transaction did update
3857+ // the same key, and we will fail and retry. When retrying, this key would be found
3858+ // through BatchGet and skipped.
3859+ // TODO: would it help to accumulate the keys in a slice and then only call this once?
3860+ err = txn .LockKeys (context .Background (), new (kv.LockCtx ), lockKey )
3861+ if err != nil {
3862+ return errors .Trace (err )
3863+ }
38423864
3843- // w.oldKeys is only set for non-clustered tables, in w.fetchRowColVals().
3844- if len (w .oldKeys ) > 0 {
3845- if _ , ok := found [string (w .oldKeys [i ])]; ok {
3846- // Already filled, i.e. double written earlier by concurrent DML
3865+ if vals , ok := found [string (key )]; ok {
3866+ if len (vals ) == len (prr .vals ) && bytes .Equal (vals , prr .vals ) {
3867+ // Already backfilled or double written earlier by concurrent DML
38473868 continue
38483869 }
3849-
3850- // Check if we can lock the old key, since there can still be concurrent update
3851- // happening on the rows from fetchRowColVals(), if we cannot lock the keys in this
3852- // transaction and succeed when committing, then another transaction did update
3853- // the same key, and we will fail and retry. When retrying, this key would be found
3854- // through BatchGet and skipped.
3855- err = txn .LockKeys (context .Background (), new (kv.LockCtx ), w .oldKeys [i ])
3870+ // Not same row, due to earlier EXCHANGE PARTITION.
3871+ // Update the current read row by Remove it and Add it back (which will give it a new _tidb_rowid)
3872+ // which then also will be used as unique id in the new partition.
3873+ var h kv.Handle
3874+ var currPartID int64
3875+ currPartID , h , err = tablecodec .DecodeRecordKey (lockKey )
38563876 if err != nil {
38573877 return errors .Trace (err )
38583878 }
3859-
3860- // Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
3861- // Generate new _tidb_rowid.
3862- recordID , err := tables .AllocHandle (w .ctx , w .tblCtx , w .reorgedTbl )
3879+ _ , err = w .rowDecoder .DecodeTheExistedColumnMap (w .exprCtx , h , prr .vals , w .loc , w .rowMap )
38633880 if err != nil {
38643881 return errors .Trace (err )
38653882 }
3866-
3867- // tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2
3868- key = tablecodec .EncodeRecordKey (key [:tablecodec .TableSplitKeyLen + 2 ], recordID )
3883+ for _ , col := range w .table .WritableCols () {
3884+ d , ok := w .rowMap [col .ID ]
3885+ if ! ok {
3886+ return dbterror .ErrUnsupportedReorganizePartition .GenWithStackByArgs ()
3887+ }
3888+ tmpRow [col .Offset ] = d
3889+ }
3890+ // Use RemoveRecord/AddRecord to keep the indexes in-sync!
3891+ pt := w .table .GetPartitionedTable ().GetPartition (currPartID )
3892+ err = pt .RemoveRecord (w .tblCtx , txn , h , tmpRow )
3893+ if err != nil {
3894+ return errors .Trace (err )
3895+ }
3896+ h , err = pt .AddRecord (w .tblCtx , txn , tmpRow )
3897+ if err != nil {
3898+ return errors .Trace (err )
3899+ }
3900+ w .cleanRowMap ()
3901+ // tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2 ("_r")
3902+ key = tablecodec .EncodeRecordKey (key [:tablecodec .TableSplitKeyLen + 2 ], h )
3903+ // OK to only do txn.Set() for the new partition, and defer creating the indexes,
3904+ // since any DML changes the record it will also update or create the indexes,
3905+ // by doing RemoveRecord+UpdateRecord
38693906 }
38703907 err = txn .Set (key , prr .vals )
38713908 if err != nil {
@@ -3882,8 +3919,6 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
38823919
38833920func (w * reorgPartitionWorker ) fetchRowColVals (txn kv.Transaction , taskRange reorgBackfillTask ) (kv.Key , bool , error ) {
38843921 w .rowRecords = w .rowRecords [:0 ]
3885- isClustered := w .reorgedTbl .Meta ().IsCommonHandle || w .reorgedTbl .Meta ().PKIsHandle
3886- w .oldKeys = w .oldKeys [:0 ]
38873922 startTime := time .Now ()
38883923
38893924 // taskDone means that the added handle is out of taskRange.endHandle.
@@ -3926,12 +3961,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
39263961 newKey = append (newKey , recordKey [tablecodec .TableSplitKeyLen :]... )
39273962 w .rowRecords = append (w .rowRecords , & rowRecord {key : newKey , vals : rawRow })
39283963
3929- if ! isClustered {
3930- oldKey := newKey [:tablecodec .TableSplitKeyLen ]
3931- oldKey = append (oldKey , recordKey [tablecodec .TableSplitKeyLen :]... )
3932- w .oldKeys = append (w .oldKeys , oldKey )
3933- }
3934-
39353964 w .cleanRowMap ()
39363965 lastAccessedHandle = recordKey
39373966 if recordKey .Cmp (taskRange .endKey ) == 0 {
0 commit comments