Skip to content

Commit e55206e

Browse files
pingyuemmanuel-keller
authored andcommitted
transaction: Handle "commit ts expired" error (tikv#491)
Signed-off-by: Ping Yu <[email protected]> (cherry picked from commit ac95421)
1 parent 08c589f commit e55206e

File tree

6 files changed

+70
-16
lines changed

6 files changed

+70
-16
lines changed

src/common/errors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pub enum Error {
103103
#[error("{}", message)]
104104
InternalError { message: String },
105105
#[error("{0}")]
106-
StringError(String),
106+
OtherError(String),
107107
#[error("PessimisticLock error: {:?}", inner)]
108108
PessimisticLockError {
109109
inner: Box<Error>,

src/kv/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub use key::Key;
1414
pub use kvpair::KvPair;
1515
pub use value::Value;
1616

17-
struct HexRepr<'a>(pub &'a [u8]);
17+
pub struct HexRepr<'a>(pub &'a [u8]);
1818

1919
impl<'a> fmt::Display for HexRepr<'a> {
2020
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {

src/region_cache.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl<C: RetryClientTrait> RegionCache<C> {
117117
return self.read_through_region_by_id(id).await;
118118
}
119119
}
120-
Err(Error::StringError(format!(
120+
Err(Error::OtherError(format!(
121121
"Concurrent PD requests failed for {MAX_RETRY_WAITING_CONCURRENT_REQUEST} times"
122122
)))
123123
}
@@ -311,7 +311,7 @@ mod test {
311311
.filter(|(_, r)| r.contains(&key.clone().into()))
312312
.map(|(_, r)| r.clone())
313313
.next()
314-
.ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned()))
314+
.ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned()))
315315
}
316316

317317
async fn get_region_by_id(
@@ -326,7 +326,7 @@ mod test {
326326
.filter(|(id, _)| id == &&region_id)
327327
.map(|(_, r)| r.clone())
328328
.next()
329-
.ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned()))
329+
.ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned()))
330330
}
331331

332332
async fn get_store(

src/store/errors.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::fmt::Display;
4-
53
use crate::proto::kvrpcpb;
64
use crate::Error;
75

@@ -164,11 +162,15 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
164162
}
165163
}
166164

167-
impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
165+
impl<T: HasKeyErrors> HasKeyErrors for Result<T, Error> {
168166
fn key_errors(&mut self) -> Option<Vec<Error>> {
169167
match self {
170168
Ok(x) => x.key_errors(),
171-
Err(e) => Some(vec![Error::StringError(e.to_string())]),
169+
Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)),
170+
Err(e) => Some(vec![std::mem::replace(
171+
e,
172+
Error::OtherError("".to_string()), // placeholder, no use.
173+
)]),
172174
}
173175
}
174176
}

src/transaction/transaction.rs

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ use derive_new::new;
1010
use fail::fail_point;
1111
use futures::prelude::*;
1212
use log::debug;
13+
use log::error;
14+
use log::info;
1315
use log::warn;
1416
use tokio::time::Duration;
1517

1618
use crate::backoff::Backoff;
1719
use crate::backoff::DEFAULT_REGION_BACKOFF;
20+
use crate::kv::HexRepr;
1821
use crate::pd::PdClient;
1922
use crate::pd::PdRpcClient;
2023
use crate::proto::kvrpcpb;
@@ -1274,7 +1277,7 @@ impl<PdC: PdClient> Committer<PdC> {
12741277
let min_commit_ts = self.prewrite().await?;
12751278

12761279
fail_point!("after-prewrite", |_| {
1277-
Err(Error::StringError(
1280+
Err(Error::OtherError(
12781281
"failpoint: after-prewrite return error".to_owned(),
12791282
))
12801283
});
@@ -1288,7 +1291,7 @@ impl<PdC: PdClient> Committer<PdC> {
12881291
// FIXME: min_commit_ts == 0 => fallback to normal 2PC
12891292
min_commit_ts.unwrap()
12901293
} else {
1291-
match self.commit_primary().await {
1294+
match self.commit_primary_with_retry().await {
12921295
Ok(commit_ts) => commit_ts,
12931296
Err(e) => {
12941297
return if self.undetermined {
@@ -1391,6 +1394,11 @@ impl<PdC: PdClient> Committer<PdC> {
13911394
.plan();
13921395
plan.execute()
13931396
.inspect_err(|e| {
1397+
debug!(
1398+
"commit primary error: {:?}, start_ts: {}",
1399+
e,
1400+
self.start_version.version()
1401+
);
13941402
// We don't know whether the transaction is committed or not if we fail to receive
13951403
// the response. Then, we mark the transaction as undetermined and propagate the
13961404
// error to the user.
@@ -1403,6 +1411,48 @@ impl<PdC: PdClient> Committer<PdC> {
14031411
Ok(commit_version)
14041412
}
14051413

1414+
async fn commit_primary_with_retry(&mut self) -> Result<Timestamp> {
1415+
loop {
1416+
match self.commit_primary().await {
1417+
Ok(commit_version) => return Ok(commit_version),
1418+
Err(Error::ExtractedErrors(mut errors)) => match errors.pop() {
1419+
Some(Error::KeyError(key_err)) => {
1420+
if let Some(expired) = key_err.commit_ts_expired {
1421+
// Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go
1422+
info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}",
1423+
self.start_version.version());
1424+
1425+
let primary_key = self.primary_key.as_ref().unwrap();
1426+
if primary_key != expired.key.as_ref() {
1427+
error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}",
1428+
self.start_version.version(), HexRepr(&expired.key), primary_key);
1429+
return Err(Error::OtherError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string()));
1430+
}
1431+
1432+
// Do not retry for a txn which has a too large min_commit_ts.
1433+
// 3600000 << 18 = 943718400000
1434+
if expired
1435+
.min_commit_ts
1436+
.saturating_sub(expired.attempted_commit_ts)
1437+
> 943718400000
1438+
{
1439+
let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}",
1440+
expired.min_commit_ts, expired.attempted_commit_ts);
1441+
return Err(Error::OtherError(msg));
1442+
}
1443+
continue;
1444+
} else {
1445+
return Err(Error::KeyError(key_err));
1446+
}
1447+
}
1448+
Some(err) => return Err(err),
1449+
None => unreachable!(),
1450+
},
1451+
Err(err) => return Err(err),
1452+
}
1453+
}
1454+
}
1455+
14061456
async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
14071457
debug!("committing secondary");
14081458
let mutations_len = self.mutations.len();
@@ -1420,7 +1470,7 @@ impl<PdC: PdClient> Committer<PdC> {
14201470
let percent = percent.unwrap().parse::<usize>().unwrap();
14211471
new_len = mutations_len * percent / 100;
14221472
if new_len == 0 {
1423-
Err(Error::StringError(
1473+
Err(Error::OtherError(
14241474
"failpoint: before-commit-secondary return error".to_owned(),
14251475
))
14261476
} else {

tests/common/ctl.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ use crate::common::Result;
1010
pub async fn get_region_count() -> Result<u64> {
1111
let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0]))
1212
.await
13-
.map_err(|e| Error::StringError(e.to_string()))?;
13+
.map_err(|e| Error::OtherError(e.to_string()))?;
1414

1515
let body = res
1616
.text()
1717
.await
18-
.map_err(|e| Error::StringError(e.to_string()))?;
19-
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap();
18+
.map_err(|e| Error::OtherError(e.to_string()))?;
19+
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| {
20+
panic!("invalid body: {:?}, error: {:?}", body, err);
21+
});
2022
value["count"]
2123
.as_u64()
22-
.ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned()))
24+
.ok_or_else(|| Error::OtherError("pd region count does not return an integer".to_owned()))
2325
}

0 commit comments

Comments
 (0)