Skip to content

Commit 767e930

Browse files
fixing the shard issue with batch_put
Signed-off-by: limbooverlambda <[email protected]>
1 parent 54fd720 commit 767e930

File tree

4 files changed

+129
-5
lines changed

4 files changed

+129
-5
lines changed

src/kv/kvpair.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::proto::kvrpcpb;
2525
///
2626
/// Many functions which accept a `KvPair` accept an `Into<KvPair>`, which means all of the above
2727
/// types (Like a `(Key, Value)`) can be passed directly to those functions.
28-
#[derive(Default, Clone, Eq, PartialEq)]
28+
#[derive(Default, Clone, Eq, PartialEq, Hash)]
2929
#[cfg_attr(test, derive(Arbitrary))]
3030
pub struct KvPair(pub Key, pub Value);
3131

src/raw/client.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,36 @@ mod tests {
876876
use crate::proto::kvrpcpb;
877877
use crate::Result;
878878

879+
#[tokio::test]
880+
async fn test_batch_put_with_ttl() -> Result<()> {
881+
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
882+
move |req: &dyn Any| {
883+
if let Some(_) = req.downcast_ref::<kvrpcpb::RawBatchPutRequest>() {
884+
let resp = kvrpcpb::RawBatchPutResponse {
885+
..Default::default()
886+
};
887+
Ok(Box::new(resp) as Box<dyn Any>)
888+
} else {
889+
unreachable!()
890+
}
891+
},
892+
)));
893+
let client = Client {
894+
rpc: pd_client,
895+
cf: Some(ColumnFamily::Default),
896+
backoff: DEFAULT_REGION_BACKOFF,
897+
atomic: false,
898+
keyspace: Keyspace::Enable { keyspace_id: 0 },
899+
};
900+
let pairs = vec![
901+
KvPair(vec![11].into(), vec![12].into()),
902+
KvPair(vec![11].into(), vec![12].into()),
903+
];
904+
let ttls = vec![0, 0];
905+
assert!(client.batch_put_with_ttl(pairs, ttls).await.is_ok());
906+
Ok(())
907+
}
908+
879909
#[tokio::test]
880910
async fn test_raw_coprocessor() -> Result<()> {
881911
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(

src/raw/requests.rs

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use std::any::Any;
4+
use std::collections::HashMap;
45
use std::ops::Range;
56
use std::sync::Arc;
67
use std::time::Duration;
78

89
use async_trait::async_trait;
910
use futures::stream::BoxStream;
11+
use futures::StreamExt;
1012
use tonic::transport::Channel;
1113

1214
use super::RawRpcRequest;
@@ -190,23 +192,44 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
190192
}
191193

192194
impl Shardable for kvrpcpb::RawBatchPutRequest {
193-
type Shard = Vec<kvrpcpb::KvPair>;
195+
type Shard = Vec<(kvrpcpb::KvPair, u64)>;
194196

195197
fn shards(
196198
&self,
197199
pd_client: &Arc<impl PdClient>,
198200
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
201+
// Maintain a map of the pair and its associated ttl
202+
let kvs = self.pairs.clone();
203+
let kv_pair = kvs.into_iter().map(Into::<KvPair>::into);
204+
let kv_ttl = kv_pair.zip(self.ttls.clone()).collect::<HashMap<_, _>>();
199205
let mut pairs = self.pairs.clone();
200206
pairs.sort_by(|a, b| a.key.cmp(&b.key));
201207
store_stream_for_keys(
202208
pairs.into_iter().map(Into::<KvPair>::into),
203209
pd_client.clone(),
204210
)
211+
.map(move |r| {
212+
let s = r.map(|(kv, store)| {
213+
let kv_ttls = kv
214+
.into_iter()
215+
.map(|k: KvPair| {
216+
let kv: kvrpcpb::KvPair = k.clone().into();
217+
let ttl = *kv_ttl.get(&k).unwrap();
218+
(kv, ttl)
219+
})
220+
.collect::<Vec<_>>();
221+
(kv_ttls, store)
222+
});
223+
s
224+
})
225+
.boxed()
205226
}
206227

207228
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
229+
let (pairs, ttls) = shard.into_iter().unzip();
208230
self.set_leader(&store.region_with_leader)?;
209-
self.pairs = shard;
231+
self.pairs = pairs;
232+
self.ttls = ttls;
210233
Ok(())
211234
}
212235
}
@@ -531,21 +554,34 @@ impl_raw_rpc_request!(RawDeleteRangeRequest);
531554
impl_raw_rpc_request!(RawCasRequest);
532555

533556
impl HasLocks for kvrpcpb::RawGetResponse {}
557+
534558
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
559+
535560
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
561+
536562
impl HasLocks for kvrpcpb::RawPutResponse {}
563+
537564
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
565+
538566
impl HasLocks for kvrpcpb::RawDeleteResponse {}
567+
539568
impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
569+
540570
impl HasLocks for kvrpcpb::RawScanResponse {}
571+
541572
impl HasLocks for kvrpcpb::RawBatchScanResponse {}
573+
542574
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
575+
543576
impl HasLocks for kvrpcpb::RawCasResponse {}
577+
544578
impl HasLocks for kvrpcpb::RawCoprocessorResponse {}
545579

546580
#[cfg(test)]
547581
mod test {
548582
use std::any::Any;
583+
use std::ops::Deref;
584+
use std::sync::Mutex;
549585

550586
use super::*;
551587
use crate::backoff::DEFAULT_REGION_BACKOFF;
@@ -555,7 +591,7 @@ mod test {
555591
use crate::proto::kvrpcpb;
556592
use crate::request::Keyspace;
557593
use crate::request::Plan;
558-
use crate::Key;
594+
559595

560596
#[rstest::rstest]
561597
#[case(Keyspace::Disable)]
@@ -600,4 +636,62 @@ mod test {
600636
assert_eq!(scan.len(), 49);
601637
// FIXME test the keys returned.
602638
}
639+
640+
#[tokio::test]
641+
async fn test_raw_batch_put() -> Result<()> {
642+
let region1_kvs = vec![KvPair(vec![9].into(), vec![12].into())];
643+
let region1_ttls = vec![0];
644+
let region2_kvs = vec![
645+
KvPair(vec![11].into(), vec![12].into()),
646+
KvPair(
647+
"FFF".to_string().as_bytes().to_vec().into(),
648+
vec![12].into(),
649+
),
650+
];
651+
let region2_ttls = vec![0, 1];
652+
653+
let expected_map = HashMap::from([
654+
(region1_kvs.clone(), region1_ttls.clone()),
655+
(region2_kvs.clone(), region2_ttls.clone()),
656+
]);
657+
658+
let pairs: Vec<kvrpcpb::KvPair> = [region1_kvs, region2_kvs]
659+
.concat()
660+
.into_iter()
661+
.map(|kv| kv.into())
662+
.collect();
663+
let ttls = [region1_ttls, region2_ttls].concat();
664+
let cf = ColumnFamily::Default;
665+
666+
let actual_map: Arc<Mutex<HashMap<Vec<KvPair>, Vec<u64>>>> =
667+
Arc::new(Mutex::new(HashMap::new()));
668+
let fut_actual_map = actual_map.clone();
669+
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
670+
move |req: &dyn Any| {
671+
let req: &kvrpcpb::RawBatchPutRequest = req.downcast_ref().unwrap();
672+
let kv_pair = req
673+
.pairs
674+
.clone()
675+
.into_iter()
676+
.map(|p| p.into())
677+
.collect::<Vec<KvPair>>();
678+
let ttls = req.ttls.clone();
679+
fut_actual_map.lock().unwrap().insert(kv_pair, ttls);
680+
let resp = kvrpcpb::RawBatchPutResponse::default();
681+
Ok(Box::new(resp) as Box<dyn Any>)
682+
},
683+
)));
684+
685+
let batch_put_request =
686+
new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false);
687+
let keyspace = Keyspace::Enable { keyspace_id: 0 };
688+
let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request)
689+
.resolve_lock(OPTIMISTIC_BACKOFF, keyspace)
690+
.retry_multi_region(DEFAULT_REGION_BACKOFF)
691+
.plan();
692+
let _ = plan.execute().await;
693+
assert_eq!(actual_map.lock().unwrap().len(), 2);
694+
assert_eq!(actual_map.lock().unwrap().deref().clone(), expected_map);
695+
Ok(())
696+
}
603697
}

src/store/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub struct RegionStore {
3636
pub struct Store {
3737
pub client: Arc<dyn KvClient + Send + Sync>,
3838
}
39-
39+
#[allow(dead_code)]
4040
/// Maps keys to a stream of stores. `key_data` must be sorted in increasing order
4141
pub fn store_stream_for_keys<K, KOut, PdC>(
4242
key_data: impl Iterator<Item = K> + Send + Sync + 'static,

0 commit comments

Comments
 (0)