11// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22
3- use async_recursion:: async_recursion;
43use core:: ops:: Range ;
54use std:: str:: FromStr ;
65use std:: sync:: Arc ;
76use std:: u32;
87
8+ use futures:: StreamExt ;
99use log:: debug;
10- use tokio:: sync:: Semaphore ;
11- use tokio:: time:: sleep;
1210
13- use crate :: backoff:: { DEFAULT_REGION_BACKOFF , DEFAULT_STORE_BACKOFF } ;
11+ use crate :: backoff:: DEFAULT_REGION_BACKOFF ;
1412use crate :: common:: Error ;
1513use crate :: config:: Config ;
1614use crate :: pd:: PdClient ;
1715use crate :: pd:: PdRpcClient ;
18- use crate :: proto:: kvrpcpb:: { RawScanRequest , RawScanResponse } ;
1916use crate :: proto:: metapb;
2017use crate :: raw:: lowering:: * ;
18+ use crate :: request:: Collect ;
2119use crate :: request:: CollectSingle ;
2220use crate :: request:: EncodeKeyspace ;
2321use crate :: request:: KeyMode ;
2422use crate :: request:: Keyspace ;
2523use crate :: request:: Plan ;
2624use crate :: request:: TruncateKeyspace ;
27- use crate :: request:: { plan, Collect } ;
28- use crate :: store:: { HasRegionError , RegionStore } ;
2925use crate :: Backoff ;
3026use crate :: BoundRange ;
3127use crate :: ColumnFamily ;
32- use crate :: Error :: RegionError ;
3328use crate :: Key ;
3429use crate :: KvPair ;
3530use crate :: Result ;
@@ -761,42 +756,57 @@ impl<PdC: PdClient> Client<PdC> {
761756 max_limit : MAX_RAW_KV_SCAN_LIMIT ,
762757 } ) ;
763758 }
764- let backoff = DEFAULT_STORE_BACKOFF ;
765- let permits = Arc :: new ( Semaphore :: new ( 16 ) ) ;
766- let range = range. into ( ) . encode_keyspace ( self . keyspace , KeyMode :: Raw ) ;
759+
760+ let mut cur_range = range. into ( ) . encode_keyspace ( self . keyspace , KeyMode :: Raw ) ;
767761 let mut result = Vec :: new ( ) ;
768- let mut current_limit = limit;
769- let ( start_key, end_key) = range. clone ( ) . into_keys ( ) ;
770- let mut current_key: Option < Key > = Some ( start_key) ;
771- while current_limit > 0 {
772- let scan_args = ScanInnerArgs {
773- start_key : current_key. clone ( ) ,
774- range : range. clone ( ) ,
775- limit,
762+ let mut scan_regions = self . rpc . clone ( ) . stores_for_range ( cur_range. clone ( ) ) . boxed ( ) ;
763+ let mut region_store =
764+ scan_regions
765+ . next ( )
766+ . await
767+ . ok_or ( Error :: RegionForRangeNotFound {
768+ range : ( cur_range. clone ( ) ) ,
769+ } ) ??;
770+ let mut cur_limit = limit;
771+
772+ while cur_limit > 0 {
773+ let request = new_raw_scan_request (
774+ cur_range. clone ( ) ,
775+ cur_limit,
776776 key_only,
777777 reverse,
778- permits : permits. clone ( ) ,
779- backoff : backoff. clone ( ) ,
780- } ;
781- let ( res, next_key) = self . retryable_scan ( scan_args) . await ?;
782-
783- let mut kvs = res
784- . map ( |r| r. kvs . into_iter ( ) . map ( Into :: into) . collect :: < Vec < KvPair > > ( ) )
785- . unwrap_or ( Vec :: new ( ) ) ;
786-
787- if !kvs. is_empty ( ) {
788- current_limit -= kvs. len ( ) as u32 ;
789- result. append ( & mut kvs) ;
790- }
791- if end_key
792- . as_ref ( )
793- . map ( |ek| ek <= next_key. as_ref ( ) && !ek. is_empty ( ) )
794- . unwrap_or ( false )
795- || next_key. is_empty ( )
796- {
797- break ;
778+ self . cf . clone ( ) ,
779+ ) ;
780+ let resp = crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . keyspace , request)
781+ . single_region_with_store ( region_store. clone ( ) )
782+ . await ?
783+ . plan ( )
784+ . execute ( )
785+ . await ?;
786+ let mut region_scan_res = resp
787+ . kvs
788+ . into_iter ( )
789+ . map ( Into :: into)
790+ . collect :: < Vec < KvPair > > ( ) ;
791+ let res_len = region_scan_res. len ( ) ;
792+ result. append ( & mut region_scan_res) ;
793+
794+ // if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
795+ if res_len < cur_limit as usize {
796+ region_store = match scan_regions. next ( ) . await {
797+ Some ( Ok ( rs) ) => {
798+ cur_range = BoundRange :: new (
799+ std:: ops:: Bound :: Included ( region_store. region_with_leader . range ( ) . 1 ) ,
800+ cur_range. to ,
801+ ) ;
802+ rs
803+ }
804+ Some ( Err ( e) ) => return Err ( e) ,
805+ None => break ,
806+ } ;
807+ cur_limit -= res_len as u32 ;
798808 } else {
799- current_key = Some ( next_key ) ;
809+ break ;
800810 }
801811 }
802812
@@ -809,61 +819,6 @@ impl<PdC: PdClient> Client<PdC> {
809819 Ok ( result)
810820 }
811821
812- #[ async_recursion]
813- async fn retryable_scan (
814- & self ,
815- mut scan_args : ScanInnerArgs ,
816- ) -> Result < ( Option < RawScanResponse > , Key ) > {
817- let start_key = match scan_args. start_key {
818- None => return Ok ( ( None , Key :: EMPTY ) ) ,
819- Some ( ref sk) => sk,
820- } ;
821- let permit = scan_args. permits . acquire ( ) . await . unwrap ( ) ;
822- let region = self . rpc . clone ( ) . region_for_key ( start_key) . await ?;
823- let store = self . rpc . clone ( ) . store_for_id ( region. id ( ) ) . await ?;
824- let request = new_raw_scan_request (
825- scan_args. range . clone ( ) ,
826- scan_args. limit ,
827- scan_args. key_only ,
828- scan_args. reverse ,
829- self . cf . clone ( ) ,
830- ) ;
831- let resp = self . do_store_scan ( store. clone ( ) , request) . await ;
832- drop ( permit) ;
833- match resp {
834- Ok ( mut r) => {
835- if let Some ( err) = r. region_error ( ) {
836- let status =
837- plan:: handle_region_error ( self . rpc . clone ( ) , err. clone ( ) , store. clone ( ) )
838- . await ?;
839- return if status {
840- self . retryable_scan ( scan_args. clone ( ) ) . await
841- } else if let Some ( duration) = scan_args. backoff . next_delay_duration ( ) {
842- sleep ( duration) . await ;
843- self . retryable_scan ( scan_args. clone ( ) ) . await
844- } else {
845- Err ( RegionError ( Box :: new ( err) ) )
846- } ;
847- }
848- Ok ( ( Some ( r) , region. end_key ( ) ) )
849- }
850- Err ( err) => Err ( err) ,
851- }
852- }
853-
854- async fn do_store_scan (
855- & self ,
856- store : RegionStore ,
857- scan_request : RawScanRequest ,
858- ) -> Result < RawScanResponse > {
859- crate :: request:: PlanBuilder :: new ( self . rpc . clone ( ) , self . keyspace , scan_request)
860- . single_region_with_store ( store. clone ( ) )
861- . await ?
862- . plan ( )
863- . execute ( )
864- . await
865- }
866-
867822 async fn batch_scan_inner (
868823 & self ,
869824 ranges : impl IntoIterator < Item = impl Into < BoundRange > > ,
@@ -910,17 +865,6 @@ impl<PdC: PdClient> Client<PdC> {
910865 }
911866}
912867
913- #[ derive( Clone ) ]
914- struct ScanInnerArgs {
915- start_key : Option < Key > ,
916- range : BoundRange ,
917- limit : u32 ,
918- key_only : bool ,
919- reverse : bool ,
920- permits : Arc < Semaphore > ,
921- backoff : Backoff ,
922- }
923-
924868#[ cfg( test) ]
925869mod tests {
926870 use std:: any:: Any ;
0 commit comments