Skip to content

transaction: Handle "commit ts expired" error #491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum Error {
#[error("{}", message)]
InternalError { message: String },
#[error("{0}")]
StringError(String),
OtherError(String),
#[error("PessimisticLock error: {:?}", inner)]
PessimisticLockError {
inner: Box<Error>,
Expand Down
2 changes: 1 addition & 1 deletion src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use key::Key;
pub use kvpair::KvPair;
pub use value::Value;

struct HexRepr<'a>(pub &'a [u8]);
pub struct HexRepr<'a>(pub &'a [u8]);

impl fmt::Display for HexRepr<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down
6 changes: 3 additions & 3 deletions src/region_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<C: RetryClientTrait> RegionCache<C> {
return self.read_through_region_by_id(id).await;
}
}
Err(Error::StringError(format!(
Err(Error::OtherError(format!(
"Concurrent PD requests failed for {MAX_RETRY_WAITING_CONCURRENT_REQUEST} times"
)))
}
Expand Down Expand Up @@ -315,7 +315,7 @@ mod test {
.filter(|(_, r)| r.contains(&key.clone().into()))
.map(|(_, r)| r.clone())
.next()
.ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned()))
.ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned()))
}

async fn get_region_by_id(
Expand All @@ -330,7 +330,7 @@ mod test {
.filter(|(id, _)| id == &&region_id)
.map(|(_, r)| r.clone())
.next()
.ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned()))
.ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned()))
}

async fn get_store(
Expand Down
10 changes: 6 additions & 4 deletions src/store/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::fmt::Display;

use crate::proto::kvrpcpb;
use crate::Error;

Expand Down Expand Up @@ -162,11 +160,15 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
}
}

impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
impl<T: HasKeyErrors> HasKeyErrors for Result<T, Error> {
fn key_errors(&mut self) -> Option<Vec<Error>> {
match self {
Ok(x) => x.key_errors(),
Err(e) => Some(vec![Error::StringError(e.to_string())]),
Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)),
Err(e) => Some(vec![std::mem::replace(
e,
Error::OtherError("".to_string()), // placeholder, no use.
)]),
}
}
}
Expand Down
56 changes: 53 additions & 3 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ use derive_new::new;
use fail::fail_point;
use futures::prelude::*;
use log::debug;
use log::error;
use log::info;
use log::warn;
use tokio::time::Duration;

use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::codec::ApiV1TxnCodec;
use crate::kv::HexRepr;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
Expand Down Expand Up @@ -1246,7 +1249,7 @@ impl<PdC: PdClient> Committer<PdC> {
let min_commit_ts = self.prewrite().await?;

fail_point!("after-prewrite", |_| {
Err(Error::StringError(
Err(Error::OtherError(
"failpoint: after-prewrite return error".to_owned(),
))
});
Expand All @@ -1260,7 +1263,7 @@ impl<PdC: PdClient> Committer<PdC> {
// FIXME: min_commit_ts == 0 => fallback to normal 2PC
min_commit_ts.unwrap()
} else {
match self.commit_primary().await {
match self.commit_primary_with_retry().await {
Ok(commit_ts) => commit_ts,
Err(e) => {
return if self.undetermined {
Expand Down Expand Up @@ -1365,6 +1368,11 @@ impl<PdC: PdClient> Committer<PdC> {
.plan();
plan.execute()
.inspect_err(|e| {
debug!(
"commit primary error: {:?}, start_ts: {}",
e,
self.start_version.version()
);
// We don't know whether the transaction is committed or not if we fail to receive
// the response. Then, we mark the transaction as undetermined and propagate the
// error to the user.
Expand All @@ -1377,6 +1385,48 @@ impl<PdC: PdClient> Committer<PdC> {
Ok(commit_version)
}

async fn commit_primary_with_retry(&mut self) -> Result<Timestamp> {
loop {
match self.commit_primary().await {
Ok(commit_version) => return Ok(commit_version),
Err(Error::ExtractedErrors(mut errors)) => match errors.pop() {
Some(Error::KeyError(key_err)) => {
if let Some(expired) = key_err.commit_ts_expired {
// Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go
info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}",
self.start_version.version());

let primary_key = self.primary_key.as_ref().unwrap();
if primary_key != expired.key.as_ref() {
error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}",
self.start_version.version(), HexRepr(&expired.key), primary_key);
return Err(Error::OtherError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string()));
}

// Do not retry for a txn which has a too large min_commit_ts.
// 3600000 << 18 = 943718400000
if expired
.min_commit_ts
.saturating_sub(expired.attempted_commit_ts)
> 943718400000
{
let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}",
expired.min_commit_ts, expired.attempted_commit_ts);
return Err(Error::OtherError(msg));
}
continue;
} else {
return Err(Error::KeyError(key_err));
}
}
Some(err) => return Err(err),
None => unreachable!(),
},
Err(err) => return Err(err),
}
}
}

async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
debug!("committing secondary");
let mutations_len = self.mutations.len();
Expand All @@ -1394,7 +1444,7 @@ impl<PdC: PdClient> Committer<PdC> {
let percent = percent.unwrap().parse::<usize>().unwrap();
new_len = mutations_len * percent / 100;
if new_len == 0 {
Err(Error::StringError(
Err(Error::OtherError(
"failpoint: before-commit-secondary return error".to_owned(),
))
} else {
Expand Down
6 changes: 3 additions & 3 deletions tests/common/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use crate::common::Result;
pub async fn get_region_count() -> Result<u64> {
let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0]))
.await
.map_err(|e| Error::StringError(e.to_string()))?;
.map_err(|e| Error::OtherError(e.to_string()))?;

let body = res
.text()
.await
.map_err(|e| Error::StringError(e.to_string()))?;
.map_err(|e| Error::OtherError(e.to_string()))?;
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| {
panic!("invalid body: {:?}, error: {:?}", body, err);
});
value["count"]
.as_u64()
.ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned()))
.ok_or_else(|| Error::OtherError("pd region count does not return an integer".to_owned()))
}