Skip to content

Commit 60e0c54

Browse files
Revert "adding retryable to scan" (#1)
This reverts commit dddf519.
1 parent dddf519 commit 60e0c54

File tree

2 files changed

+147
-198
lines changed

2 files changed

+147
-198
lines changed

src/raw/client.rs

Lines changed: 50 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,30 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use async_recursion::async_recursion;
43
use core::ops::Range;
54
use std::str::FromStr;
65
use std::sync::Arc;
76
use std::u32;
87

8+
use futures::StreamExt;
99
use log::debug;
10-
use tokio::sync::Semaphore;
11-
use tokio::time::sleep;
1210

13-
use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
11+
use crate::backoff::DEFAULT_REGION_BACKOFF;
1412
use crate::common::Error;
1513
use crate::config::Config;
1614
use crate::pd::PdClient;
1715
use crate::pd::PdRpcClient;
18-
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
1916
use crate::proto::metapb;
2017
use crate::raw::lowering::*;
18+
use crate::request::Collect;
2119
use crate::request::CollectSingle;
2220
use crate::request::EncodeKeyspace;
2321
use crate::request::KeyMode;
2422
use crate::request::Keyspace;
2523
use crate::request::Plan;
2624
use crate::request::TruncateKeyspace;
27-
use crate::request::{plan, Collect};
28-
use crate::store::{HasRegionError, RegionStore};
2925
use crate::Backoff;
3026
use crate::BoundRange;
3127
use crate::ColumnFamily;
32-
use crate::Error::RegionError;
3328
use crate::Key;
3429
use crate::KvPair;
3530
use crate::Result;
@@ -761,42 +756,57 @@ impl<PdC: PdClient> Client<PdC> {
761756
max_limit: MAX_RAW_KV_SCAN_LIMIT,
762757
});
763758
}
764-
let backoff = DEFAULT_STORE_BACKOFF;
765-
let permits = Arc::new(Semaphore::new(16));
766-
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
759+
760+
let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
767761
let mut result = Vec::new();
768-
let mut current_limit = limit;
769-
let (start_key, end_key) = range.clone().into_keys();
770-
let mut current_key: Option<Key> = Some(start_key);
771-
while current_limit > 0 {
772-
let scan_args = ScanInnerArgs {
773-
start_key: current_key.clone(),
774-
range: range.clone(),
775-
limit,
762+
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
763+
let mut region_store =
764+
scan_regions
765+
.next()
766+
.await
767+
.ok_or(Error::RegionForRangeNotFound {
768+
range: (cur_range.clone()),
769+
})??;
770+
let mut cur_limit = limit;
771+
772+
while cur_limit > 0 {
773+
let request = new_raw_scan_request(
774+
cur_range.clone(),
775+
cur_limit,
776776
key_only,
777777
reverse,
778-
permits: permits.clone(),
779-
backoff: backoff.clone(),
780-
};
781-
let (res, next_key) = self.retryable_scan(scan_args).await?;
782-
783-
let mut kvs = res
784-
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
785-
.unwrap_or(Vec::new());
786-
787-
if !kvs.is_empty() {
788-
current_limit -= kvs.len() as u32;
789-
result.append(&mut kvs);
790-
}
791-
if end_key
792-
.as_ref()
793-
.map(|ek| ek <= next_key.as_ref() && !ek.is_empty())
794-
.unwrap_or(false)
795-
|| next_key.is_empty()
796-
{
797-
break;
778+
self.cf.clone(),
779+
);
780+
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
781+
.single_region_with_store(region_store.clone())
782+
.await?
783+
.plan()
784+
.execute()
785+
.await?;
786+
let mut region_scan_res = resp
787+
.kvs
788+
.into_iter()
789+
.map(Into::into)
790+
.collect::<Vec<KvPair>>();
791+
let res_len = region_scan_res.len();
792+
result.append(&mut region_scan_res);
793+
794+
// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
795+
if res_len < cur_limit as usize {
796+
region_store = match scan_regions.next().await {
797+
Some(Ok(rs)) => {
798+
cur_range = BoundRange::new(
799+
std::ops::Bound::Included(region_store.region_with_leader.range().1),
800+
cur_range.to,
801+
);
802+
rs
803+
}
804+
Some(Err(e)) => return Err(e),
805+
None => break,
806+
};
807+
cur_limit -= res_len as u32;
798808
} else {
799-
current_key = Some(next_key);
809+
break;
800810
}
801811
}
802812

@@ -809,61 +819,6 @@ impl<PdC: PdClient> Client<PdC> {
809819
Ok(result)
810820
}
811821

812-
#[async_recursion]
813-
async fn retryable_scan(
814-
&self,
815-
mut scan_args: ScanInnerArgs,
816-
) -> Result<(Option<RawScanResponse>, Key)> {
817-
let start_key = match scan_args.start_key {
818-
None => return Ok((None, Key::EMPTY)),
819-
Some(ref sk) => sk,
820-
};
821-
let permit = scan_args.permits.acquire().await.unwrap();
822-
let region = self.rpc.clone().region_for_key(start_key).await?;
823-
let store = self.rpc.clone().store_for_id(region.id()).await?;
824-
let request = new_raw_scan_request(
825-
scan_args.range.clone(),
826-
scan_args.limit,
827-
scan_args.key_only,
828-
scan_args.reverse,
829-
self.cf.clone(),
830-
);
831-
let resp = self.do_store_scan(store.clone(), request).await;
832-
drop(permit);
833-
match resp {
834-
Ok(mut r) => {
835-
if let Some(err) = r.region_error() {
836-
let status =
837-
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
838-
.await?;
839-
return if status {
840-
self.retryable_scan(scan_args.clone()).await
841-
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
842-
sleep(duration).await;
843-
self.retryable_scan(scan_args.clone()).await
844-
} else {
845-
Err(RegionError(Box::new(err)))
846-
};
847-
}
848-
Ok((Some(r), region.end_key()))
849-
}
850-
Err(err) => Err(err),
851-
}
852-
}
853-
854-
async fn do_store_scan(
855-
&self,
856-
store: RegionStore,
857-
scan_request: RawScanRequest,
858-
) -> Result<RawScanResponse> {
859-
crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request)
860-
.single_region_with_store(store.clone())
861-
.await?
862-
.plan()
863-
.execute()
864-
.await
865-
}
866-
867822
async fn batch_scan_inner(
868823
&self,
869824
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
@@ -910,17 +865,6 @@ impl<PdC: PdClient> Client<PdC> {
910865
}
911866
}
912867

913-
#[derive(Clone)]
914-
struct ScanInnerArgs {
915-
start_key: Option<Key>,
916-
range: BoundRange,
917-
limit: u32,
918-
key_only: bool,
919-
reverse: bool,
920-
permits: Arc<Semaphore>,
921-
backoff: Backoff,
922-
}
923-
924868
#[cfg(test)]
925869
mod tests {
926870
use std::any::Any;

0 commit comments

Comments
 (0)