@@ -10,11 +10,14 @@ use derive_new::new;
1010use fail:: fail_point;
1111use futures:: prelude:: * ;
1212use log:: debug;
13+ use log:: error;
14+ use log:: info;
1315use log:: warn;
1416use tokio:: time:: Duration ;
1517
1618use crate :: backoff:: Backoff ;
1719use crate :: backoff:: DEFAULT_REGION_BACKOFF ;
20+ use crate :: kv:: HexRepr ;
1821use crate :: pd:: PdClient ;
1922use crate :: pd:: PdRpcClient ;
2023use crate :: proto:: kvrpcpb;
@@ -1278,7 +1281,7 @@ impl<PdC: PdClient> Committer<PdC> {
12781281 let min_commit_ts = self . prewrite ( ) . await ?;
12791282
12801283 fail_point ! ( "after-prewrite" , |_| {
1281- Err ( Error :: StringError (
1284+ Err ( Error :: OtherError (
12821285 "failpoint: after-prewrite return error" . to_owned( ) ,
12831286 ) )
12841287 } ) ;
@@ -1292,7 +1295,7 @@ impl<PdC: PdClient> Committer<PdC> {
12921295 // FIXME: min_commit_ts == 0 => fallback to normal 2PC
12931296 min_commit_ts. unwrap ( )
12941297 } else {
1295- match self . commit_primary ( ) . await {
1298+ match self . commit_primary_with_retry ( ) . await {
12961299 Ok ( commit_ts) => commit_ts,
12971300 Err ( e) => {
12981301 return if self . undetermined {
@@ -1395,6 +1398,11 @@ impl<PdC: PdClient> Committer<PdC> {
13951398 . plan ( ) ;
13961399 plan. execute ( )
13971400 . inspect_err ( |e| {
1401+ debug ! (
1402+ "commit primary error: {:?}, start_ts: {}" ,
1403+ e,
1404+ self . start_version. version( )
1405+ ) ;
13981406 // We don't know whether the transaction is committed or not if we fail to receive
13991407 // the response. Then, we mark the transaction as undetermined and propagate the
14001408 // error to the user.
@@ -1407,6 +1415,48 @@ impl<PdC: PdClient> Committer<PdC> {
14071415 Ok ( commit_version)
14081416 }
14091417
1418+ async fn commit_primary_with_retry ( & mut self ) -> Result < Timestamp > {
1419+ loop {
1420+ match self . commit_primary ( ) . await {
1421+ Ok ( commit_version) => return Ok ( commit_version) ,
1422+ Err ( Error :: ExtractedErrors ( mut errors) ) => match errors. pop ( ) {
1423+ Some ( Error :: KeyError ( key_err) ) => {
1424+ if let Some ( expired) = key_err. commit_ts_expired {
1425+ // Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go
1426+ info ! ( "2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}" ,
1427+ self . start_version. version( ) ) ;
1428+
1429+ let primary_key = self . primary_key . as_ref ( ) . unwrap ( ) ;
1430+ if primary_key != expired. key . as_ref ( ) {
1431+ error ! ( "2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}" ,
1432+ self . start_version. version( ) , HexRepr ( & expired. key) , primary_key) ;
1433+ return Err ( Error :: OtherError ( "2PC commitTS rejected by TiKV, but the key is not the primary key" . to_string ( ) ) ) ;
1434+ }
1435+
1436+ // Do not retry for a txn which has a too large min_commit_ts.
1437+ // 3600000 << 18 = 943718400000
1438+ if expired
1439+ . min_commit_ts
1440+ . saturating_sub ( expired. attempted_commit_ts )
1441+ > 943718400000
1442+ {
1443+ let msg = format ! ( "2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}" ,
1444+ expired. min_commit_ts, expired. attempted_commit_ts) ;
1445+ return Err ( Error :: OtherError ( msg) ) ;
1446+ }
1447+ continue ;
1448+ } else {
1449+ return Err ( Error :: KeyError ( key_err) ) ;
1450+ }
1451+ }
1452+ Some ( err) => return Err ( err) ,
1453+ None => unreachable ! ( ) ,
1454+ } ,
1455+ Err ( err) => return Err ( err) ,
1456+ }
1457+ }
1458+ }
1459+
14101460 async fn commit_secondary ( self , commit_version : Timestamp ) -> Result < ( ) > {
14111461 debug ! ( "committing secondary" ) ;
14121462 let mutations_len = self . mutations . len ( ) ;
@@ -1424,7 +1474,7 @@ impl<PdC: PdClient> Committer<PdC> {
14241474 let percent = percent. unwrap( ) . parse:: <usize >( ) . unwrap( ) ;
14251475 new_len = mutations_len * percent / 100 ;
14261476 if new_len == 0 {
1427- Err ( Error :: StringError (
1477+ Err ( Error :: OtherError (
14281478 "failpoint: before-commit-secondary return error" . to_owned( ) ,
14291479 ) )
14301480 } else {
0 commit comments