diff --git a/src/common/errors.rs b/src/common/errors.rs index 1798be70..c1ed78bb 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -103,7 +103,7 @@ pub enum Error { #[error("{}", message)] InternalError { message: String }, #[error("{0}")] - StringError(String), + OtherError(String), #[error("PessimisticLock error: {:?}", inner)] PessimisticLockError { inner: Box, diff --git a/src/kv/mod.rs b/src/kv/mod.rs index a10b6fed..60c2a064 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -13,7 +13,7 @@ pub use key::Key; pub use kvpair::KvPair; pub use value::Value; -struct HexRepr<'a>(pub &'a [u8]); +pub struct HexRepr<'a>(pub &'a [u8]); impl fmt::Display for HexRepr<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/src/region_cache.rs b/src/region_cache.rs index cdf30c99..a76864ba 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -117,7 +117,7 @@ impl RegionCache { return self.read_through_region_by_id(id).await; } } - Err(Error::StringError(format!( + Err(Error::OtherError(format!( "Concurrent PD requests failed for {MAX_RETRY_WAITING_CONCURRENT_REQUEST} times" ))) } @@ -315,7 +315,7 @@ mod test { .filter(|(_, r)| r.contains(&key.clone().into())) .map(|(_, r)| r.clone()) .next() - .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) + .ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned())) } async fn get_region_by_id( @@ -330,7 +330,7 @@ mod test { .filter(|(id, _)| id == &®ion_id) .map(|(_, r)| r.clone()) .next() - .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) + .ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned())) } async fn get_store( diff --git a/src/store/errors.rs b/src/store/errors.rs index c9d6c774..e9fc56a9 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -1,7 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use std::fmt::Display; - use crate::proto::kvrpcpb; use crate::Error; @@ -162,11 +160,15 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { } } -impl HasKeyErrors for Result { +impl HasKeyErrors for Result { fn key_errors(&mut self) -> Option> { match self { Ok(x) => x.key_errors(), - Err(e) => Some(vec![Error::StringError(e.to_string())]), + Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)), + Err(e) => Some(vec![std::mem::replace( + e, + Error::OtherError("".to_string()), // placeholder, no use. + )]), } } } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e153889a..ddc32bf9 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -11,12 +11,15 @@ use derive_new::new; use fail::fail_point; use futures::prelude::*; use log::debug; +use log::error; +use log::info; use log::warn; use tokio::time::Duration; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::codec::ApiV1TxnCodec; +use crate::kv::HexRepr; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; @@ -1246,7 +1249,7 @@ impl Committer { let min_commit_ts = self.prewrite().await?; fail_point!("after-prewrite", |_| { - Err(Error::StringError( + Err(Error::OtherError( "failpoint: after-prewrite return error".to_owned(), )) }); @@ -1260,7 +1263,7 @@ impl Committer { // FIXME: min_commit_ts == 0 => fallback to normal 2PC min_commit_ts.unwrap() } else { - match self.commit_primary().await { + match self.commit_primary_with_retry().await { Ok(commit_ts) => commit_ts, Err(e) => { return if self.undetermined { @@ -1365,6 +1368,11 @@ impl Committer { .plan(); plan.execute() .inspect_err(|e| { + debug!( + "commit primary error: {:?}, start_ts: {}", + e, + self.start_version.version() + ); // We don't know whether the transaction is committed or not if we fail to receive // the response. Then, we mark the transaction as undetermined and propagate the // error to the user. @@ -1377,6 +1385,48 @@ impl Committer { Ok(commit_version) } + async fn commit_primary_with_retry(&mut self) -> Result { + loop { + match self.commit_primary().await { + Ok(commit_version) => return Ok(commit_version), + Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { + Some(Error::KeyError(key_err)) => { + if let Some(expired) = key_err.commit_ts_expired { + // Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go + info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}", + self.start_version.version()); + + let primary_key = self.primary_key.as_ref().unwrap(); + if primary_key != expired.key.as_ref() { + error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}", + self.start_version.version(), HexRepr(&expired.key), primary_key); + return Err(Error::OtherError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string())); + } + + // Do not retry for a txn which has a too large min_commit_ts. + // 3600000 << 18 = 943718400000 + if expired + .min_commit_ts + .saturating_sub(expired.attempted_commit_ts) + > 943718400000 + { + let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}", + expired.min_commit_ts, expired.attempted_commit_ts); + return Err(Error::OtherError(msg)); + } + continue; + } else { + return Err(Error::KeyError(key_err)); + } + } + Some(err) => return Err(err), + None => unreachable!(), + }, + Err(err) => return Err(err), + } + } + } + async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> { debug!("committing secondary"); let mutations_len = self.mutations.len(); @@ -1394,7 +1444,7 @@ impl Committer { let percent = percent.unwrap().parse::().unwrap(); new_len = mutations_len * percent / 100; if new_len == 0 { - Err(Error::StringError( + Err(Error::OtherError( "failpoint: before-commit-secondary return error".to_owned(), )) } else { diff --git a/tests/common/ctl.rs b/tests/common/ctl.rs index 092c32bb..32781405 100644 --- a/tests/common/ctl.rs +++ b/tests/common/ctl.rs @@ -10,16 +10,16 @@ use crate::common::Result; pub async fn get_region_count() -> Result { let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0])) .await - .map_err(|e| Error::StringError(e.to_string()))?; + .map_err(|e| Error::OtherError(e.to_string()))?; let body = res .text() .await - .map_err(|e| Error::StringError(e.to_string()))?; + .map_err(|e| Error::OtherError(e.to_string()))?; let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| { panic!("invalid body: {:?}, error: {:?}", body, err); }); value["count"] .as_u64() - .ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned())) + .ok_or_else(|| Error::OtherError("pd region count does not return an integer".to_owned())) }