-
Notifications
You must be signed in to change notification settings - Fork 151
Fix batch_put ttl issue #457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
767e930
d6bd8a4
463024c
390e7c7
9c907c9
9d76994
74133f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,10 +7,13 @@ use std::time::Duration; | |||||
|
|
||||||
| use async_trait::async_trait; | ||||||
| use futures::stream::BoxStream; | ||||||
| use futures::StreamExt; | ||||||
|
|
||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||||||
| use tonic::transport::Channel; | ||||||
|
|
||||||
| use super::RawRpcRequest; | ||||||
| use crate::collect_single; | ||||||
| use crate::kv::KvPairTTL; | ||||||
| use crate::pd::PdClient; | ||||||
| use crate::proto::kvrpcpb; | ||||||
| use crate::proto::metapb; | ||||||
|
|
@@ -190,23 +193,28 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest { | |||||
| } | ||||||
|
|
||||||
| impl Shardable for kvrpcpb::RawBatchPutRequest { | ||||||
| type Shard = Vec<kvrpcpb::KvPair>; | ||||||
| type Shard = Vec<(kvrpcpb::KvPair, u64)>; | ||||||
|
|
||||||
| fn shards( | ||||||
| &self, | ||||||
| pd_client: &Arc<impl PdClient>, | ||||||
| ) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> { | ||||||
| let mut pairs = self.pairs.clone(); | ||||||
| pairs.sort_by(|a, b| a.key.cmp(&b.key)); | ||||||
| store_stream_for_keys( | ||||||
| pairs.into_iter().map(Into::<KvPair>::into), | ||||||
| pd_client.clone(), | ||||||
| ) | ||||||
| let kvs = self.pairs.clone(); | ||||||
| let ttls = self.ttls.clone(); | ||||||
| let mut kv_ttl: Vec<KvPairTTL> = kvs | ||||||
| .iter() | ||||||
|
pingyu marked this conversation as resolved.
Outdated
|
||||||
| .zip(ttls) | ||||||
| .map(|(kv, ttl)| KvPairTTL(kv.clone(), ttl)) | ||||||
|
pingyu marked this conversation as resolved.
Outdated
|
||||||
| .collect(); | ||||||
| kv_ttl.sort_by(|a, b| a.0.key.clone().cmp(&b.0.key)); | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. |
||||||
| store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()).boxed() | ||||||
|
pingyu marked this conversation as resolved.
Outdated
|
||||||
| } | ||||||
|
|
||||||
| fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { | ||||||
| let (pairs, ttls) = shard.into_iter().unzip(); | ||||||
| self.set_leader(&store.region_with_leader)?; | ||||||
| self.pairs = shard; | ||||||
| self.pairs = pairs; | ||||||
| self.ttls = ttls; | ||||||
| Ok(()) | ||||||
| } | ||||||
| } | ||||||
|
|
@@ -531,21 +539,35 @@ impl_raw_rpc_request!(RawDeleteRangeRequest); | |||||
| impl_raw_rpc_request!(RawCasRequest); | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawGetResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawBatchGetResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawPutResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawBatchPutResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawDeleteResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawBatchDeleteResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawScanResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawBatchScanResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawDeleteRangeResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawCasResponse {} | ||||||
|
|
||||||
| impl HasLocks for kvrpcpb::RawCoprocessorResponse {} | ||||||
|
|
||||||
| #[cfg(test)] | ||||||
| mod test { | ||||||
| use std::any::Any; | ||||||
| use std::collections::HashMap; | ||||||
| use std::ops::Deref; | ||||||
| use std::sync::Mutex; | ||||||
|
|
||||||
| use super::*; | ||||||
| use crate::backoff::DEFAULT_REGION_BACKOFF; | ||||||
|
|
@@ -555,7 +577,6 @@ mod test { | |||||
| use crate::proto::kvrpcpb; | ||||||
| use crate::request::Keyspace; | ||||||
| use crate::request::Plan; | ||||||
| use crate::Key; | ||||||
|
|
||||||
| #[rstest::rstest] | ||||||
| #[case(Keyspace::Disable)] | ||||||
|
|
@@ -600,4 +621,59 @@ mod test { | |||||
| assert_eq!(scan.len(), 49); | ||||||
| // FIXME test the keys returned. | ||||||
| } | ||||||
|
|
||||||
| #[tokio::test] | ||||||
| async fn test_raw_batch_put() -> Result<()> { | ||||||
| let region1_kvs = vec![KvPair(vec![9].into(), vec![12])]; | ||||||
| let region1_ttls = vec![0]; | ||||||
| let region2_kvs = vec![ | ||||||
| KvPair(vec![11].into(), vec![12]), | ||||||
| KvPair("FFF".to_string().as_bytes().to_vec().into(), vec![12]), | ||||||
| ]; | ||||||
| let region2_ttls = vec![0, 1]; | ||||||
|
|
||||||
| let expected_map = HashMap::from([ | ||||||
| (region1_kvs.clone(), region1_ttls.clone()), | ||||||
| (region2_kvs.clone(), region2_ttls.clone()), | ||||||
| ]); | ||||||
|
|
||||||
| let pairs: Vec<kvrpcpb::KvPair> = [region1_kvs, region2_kvs] | ||||||
| .concat() | ||||||
| .into_iter() | ||||||
| .map(|kv| kv.into()) | ||||||
| .collect(); | ||||||
| let ttls = [region1_ttls, region2_ttls].concat(); | ||||||
| let cf = ColumnFamily::Default; | ||||||
|
|
||||||
| let actual_map: Arc<Mutex<HashMap<Vec<KvPair>, Vec<u64>>>> = | ||||||
| Arc::new(Mutex::new(HashMap::new())); | ||||||
| let fut_actual_map = actual_map.clone(); | ||||||
| let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( | ||||||
| move |req: &dyn Any| { | ||||||
| let req: &kvrpcpb::RawBatchPutRequest = req.downcast_ref().unwrap(); | ||||||
| let kv_pair = req | ||||||
| .pairs | ||||||
| .clone() | ||||||
| .into_iter() | ||||||
| .map(|p| p.into()) | ||||||
| .collect::<Vec<KvPair>>(); | ||||||
| let ttls = req.ttls.clone(); | ||||||
| fut_actual_map.lock().unwrap().insert(kv_pair, ttls); | ||||||
| let resp = kvrpcpb::RawBatchPutResponse::default(); | ||||||
| Ok(Box::new(resp) as Box<dyn Any>) | ||||||
| }, | ||||||
| ))); | ||||||
|
|
||||||
| let batch_put_request = | ||||||
| new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false); | ||||||
| let keyspace = Keyspace::Enable { keyspace_id: 0 }; | ||||||
| let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request) | ||||||
| .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) | ||||||
| .retry_multi_region(DEFAULT_REGION_BACKOFF) | ||||||
| .plan(); | ||||||
| let _ = plan.execute().await; | ||||||
| assert_eq!(actual_map.lock().unwrap().len(), 2); | ||||||
|
pingyu marked this conversation as resolved.
Outdated
|
||||||
| assert_eq!(actual_map.lock().unwrap().deref().clone(), expected_map); | ||||||
|
pingyu marked this conversation as resolved.
Outdated
|
||||||
| Ok(()) | ||||||
| } | ||||||
| } | ||||||
Uh oh!
There was an error while loading. Please reload this page.