@@ -3773,11 +3773,6 @@ type reorgPartitionWorker struct {
37733773 writeColOffsetMap map [int64 ]int
37743774 maxOffset int
37753775 reorgedTbl table.PartitionedTable
3776- // Only used for non-clustered tables, since we need to re-generate _tidb_rowid,
3777- // and check if the old _tidb_rowid was already written or not.
3778- // If the old _tidb_rowid already exists, then the row is already backfilled (double written)
3779- // and can be skipped. Otherwise, we will insert it and generate index entries.
3780- oldKeys []kv.Key
37813776}
37823777
37833778func newReorgPartitionWorker (i int , t table.PhysicalTable , decodeColMap map [int64 ]decoder.Column , reorgInfo * reorgInfo , jc * ReorgContext ) (* reorgPartitionWorker , error ) {
@@ -3844,46 +3839,88 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
38443839 // i.e. concurrently written by StateWriteOnly or StateWriteReorganization.
38453840 // and if so, skip it.
38463841 var found map [string ][]byte
3847- if len (w .oldKeys ) > 0 {
3842+ lockKey := make ([]byte , 0 , tablecodec .RecordRowKeyLen )
3843+ lockKey = append (lockKey , handleRange .startKey [:tablecodec .TableSplitKeyLen ]... )
3844+ if ! w .table .Meta ().HasClusteredIndex () && len (w .rowRecords ) > 0 {
3845+ failpoint .InjectCall ("PartitionBackfillNonClustered" , w .rowRecords [0 ].vals )
38483846 // we must check if old IDs already been written,
38493847 // i.e. double written by StateWriteOnly or StateWriteReorganization.
3850- // TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel?
3851- found , err = txn .BatchGet (ctx , w .oldKeys )
3848+
3849+ // TODO: test how to use PresumeKeyNotExists/NeedConstraintCheckInPrewrite/DO_CONSTRAINT_CHECK
3850+ // to delay the check until commit.
3851+ // And handle commit errors and fall back to this method of checking all keys to see if we need to skip any.
3852+ newKeys := make ([]kv.Key , 0 , len (w .rowRecords ))
3853+ for i := range w .rowRecords {
3854+ newKeys = append (newKeys , w .rowRecords [i ].key )
3855+ }
3856+ found , err = txn .BatchGet (ctx , newKeys )
38523857 if err != nil {
38533858 return errors .Trace (err )
38543859 }
3860+
3861+ // TODO: Add test that kills (like `kill -9`) the currently running
3862+ // ddl owner, to see how it handles re-running this backfill when some batches has
3863+ // committed and reorgInfo has not been updated, so it needs to redo some batches.
38553864 }
3865+ tmpRow := make ([]types.Datum , len (w .reorgedTbl .Cols ()))
38563866
3857- for i , prr := range w .rowRecords {
3867+ for _ , prr := range w .rowRecords {
38583868 taskCtx .scanCount ++
38593869 key := prr .key
3870+ lockKey = lockKey [:tablecodec .TableSplitKeyLen ]
3871+ lockKey = append (lockKey , key [tablecodec .TableSplitKeyLen :]... )
3872+ // Lock the *old* key, since there can still be concurrent update happening on
3873+ // the rows from fetchRowColVals(). If we cannot lock the keys in this
3874+ // transaction and succeed when committing, then another transaction did update
3875+ // the same key, and we will fail and retry. When retrying, this key would be found
3876+ // through BatchGet and skipped.
3877+ // TODO: would it help to accumulate the keys in a slice and then only call this once?
3878+ err = txn .LockKeys (context .Background (), new (kv.LockCtx ), lockKey )
3879+ if err != nil {
3880+ return errors .Trace (err )
3881+ }
38603882
3861- // w.oldKeys is only set for non-clustered tables, in w.fetchRowColVals().
3862- if len (w .oldKeys ) > 0 {
3863- if _ , ok := found [string (w .oldKeys [i ])]; ok {
3864- // Already filled, i.e. double written earlier by concurrent DML
3883+ if vals , ok := found [string (key )]; ok {
3884+ if len (vals ) == len (prr .vals ) && bytes .Equal (vals , prr .vals ) {
3885+ // Already backfilled or double written earlier by concurrent DML
38653886 continue
38663887 }
3867-
3868- // Check if we can lock the old key, since there can still be concurrent update
3869- // happening on the rows from fetchRowColVals(), if we cannot lock the keys in this
3870- // transaction and succeed when committing, then another transaction did update
3871- // the same key, and we will fail and retry. When retrying, this key would be found
3872- // through BatchGet and skipped.
3873- err = txn .LockKeys (context .Background (), new (kv.LockCtx ), w .oldKeys [i ])
3888+ // Not same row, due to earlier EXCHANGE PARTITION.
3889+ // Update the current read row by Remove it and Add it back (which will give it a new _tidb_rowid)
3890+ // which then also will be used as unique id in the new partition.
3891+ var h kv.Handle
3892+ var currPartID int64
3893+ currPartID , h , err = tablecodec .DecodeRecordKey (lockKey )
38743894 if err != nil {
38753895 return errors .Trace (err )
38763896 }
3877-
3878- // Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
3879- // Generate new _tidb_rowid.
3880- recordID , err := tables .AllocHandle (w .ctx , w .tblCtx , w .reorgedTbl )
3897+ _ , err = w .rowDecoder .DecodeTheExistedColumnMap (w .exprCtx , h , prr .vals , w .loc , w .rowMap )
38813898 if err != nil {
38823899 return errors .Trace (err )
38833900 }
3884-
3885- // tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2
3886- key = tablecodec .EncodeRecordKey (key [:tablecodec .TableSplitKeyLen + 2 ], recordID )
3901+ for _ , col := range w .table .WritableCols () {
3902+ d , ok := w .rowMap [col .ID ]
3903+ if ! ok {
3904+ return dbterror .ErrUnsupportedReorganizePartition .GenWithStackByArgs ()
3905+ }
3906+ tmpRow [col .Offset ] = d
3907+ }
3908+ // Use RemoveRecord/AddRecord to keep the indexes in-sync!
3909+ pt := w .table .GetPartitionedTable ().GetPartition (currPartID )
3910+ err = pt .RemoveRecord (w .tblCtx , txn , h , tmpRow )
3911+ if err != nil {
3912+ return errors .Trace (err )
3913+ }
3914+ h , err = pt .AddRecord (w .tblCtx , txn , tmpRow )
3915+ if err != nil {
3916+ return errors .Trace (err )
3917+ }
3918+ w .cleanRowMap ()
3919+ // tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2 ("_r")
3920+ key = tablecodec .EncodeRecordKey (key [:tablecodec .TableSplitKeyLen + 2 ], h )
3921+ // OK to only do txn.Set() for the new partition, and defer creating the indexes,
3922+ // since any DML changes the record it will also update or create the indexes,
3923+ // by doing RemoveRecord+UpdateRecord
38873924 }
38883925 err = txn .Set (key , prr .vals )
38893926 if err != nil {
@@ -3900,8 +3937,6 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
39003937
39013938func (w * reorgPartitionWorker ) fetchRowColVals (txn kv.Transaction , taskRange reorgBackfillTask ) (kv.Key , bool , error ) {
39023939 w .rowRecords = w .rowRecords [:0 ]
3903- isClustered := w .reorgedTbl .Meta ().IsCommonHandle || w .reorgedTbl .Meta ().PKIsHandle
3904- w .oldKeys = w .oldKeys [:0 ]
39053940 startTime := time .Now ()
39063941
39073942 // taskDone means that the added handle is out of taskRange.endHandle.
@@ -3944,12 +3979,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
39443979 newKey = append (newKey , recordKey [tablecodec .TableSplitKeyLen :]... )
39453980 w .rowRecords = append (w .rowRecords , & rowRecord {key : newKey , vals : rawRow })
39463981
3947- if ! isClustered {
3948- oldKey := newKey [:tablecodec .TableSplitKeyLen ]
3949- oldKey = append (oldKey , recordKey [tablecodec .TableSplitKeyLen :]... )
3950- w .oldKeys = append (w .oldKeys , oldKey )
3951- }
3952-
39533982 w .cleanRowMap ()
39543983 lastAccessedHandle = recordKey
39553984 if recordKey .Cmp (taskRange .endKey ) == 0 {
0 commit comments