Skip to content

Commit f33e4b5

Browse files
committed
feat: introduce WriteHint to RegionPutRequest
1 parent 40b474a commit f33e4b5

File tree

10 files changed

+70
-13
lines changed

10 files changed

+70
-13
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/metric-engine/src/engine/put.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use api::v1::Rows;
1616
use common_telemetry::{error, info};
1717
use snafu::{ensure, OptionExt};
1818
use store_api::codec::PrimaryKeyEncoding;
19+
use store_api::region_engine::WriteHint;
1920
use store_api::region_request::{AffectedRows, RegionPutRequest};
2021
use store_api::storage::{RegionId, TableId};
2122

@@ -89,6 +90,9 @@ impl MetricEngineInner {
8990
&mut request.rows,
9091
encoding,
9192
)?;
93+
if encoding == PrimaryKeyEncoding::Sparse {
94+
request.hint = WriteHint::PRIMARY_KEY_ENCODED | WriteHint::SPARSE_KEY_ENCODING;
95+
}
9296
self.data_region.write_data(data_region_id, request).await
9397
}
9498

@@ -165,7 +169,7 @@ impl MetricEngineInner {
165169
#[cfg(test)]
166170
mod tests {
167171
use common_recordbatch::RecordBatches;
168-
use store_api::region_engine::RegionEngine;
172+
use store_api::region_engine::{RegionEngine, WriteHint};
169173
use store_api::region_request::RegionRequest;
170174
use store_api::storage::ScanRequest;
171175

@@ -182,6 +186,7 @@ mod tests {
182186
let rows = test_util::build_rows(1, 5);
183187
let request = RegionRequest::Put(RegionPutRequest {
184188
rows: Rows { schema, rows },
189+
hint: WriteHint::empty(),
185190
});
186191

187192
// write data
@@ -255,6 +260,7 @@ mod tests {
255260
let rows = test_util::build_rows(3, 100);
256261
let request = RegionRequest::Put(RegionPutRequest {
257262
rows: Rows { schema, rows },
263+
hint: WriteHint::empty(),
258264
});
259265

260266
// write data
@@ -276,6 +282,7 @@ mod tests {
276282
let rows = test_util::build_rows(1, 100);
277283
let request = RegionRequest::Put(RegionPutRequest {
278284
rows: Rows { schema, rows },
285+
hint: WriteHint::empty(),
279286
});
280287

281288
engine
@@ -295,6 +302,7 @@ mod tests {
295302
let rows = test_util::build_rows(1, 100);
296303
let request = RegionRequest::Put(RegionPutRequest {
297304
rows: Rows { schema, rows },
305+
hint: WriteHint::empty(),
298306
});
299307

300308
engine

src/metric-engine/src/metadata_region.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use store_api::metric_engine_consts::{
2929
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
3030
METADATA_SCHEMA_VALUE_COLUMN_NAME,
3131
};
32-
use store_api::region_engine::RegionEngine;
32+
use store_api::region_engine::{RegionEngine, WriteHint};
3333
use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
3434
use store_api::storage::{RegionId, ScanRequest};
3535
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
@@ -512,7 +512,10 @@ impl MetadataRegion {
512512
}],
513513
};
514514

515-
RegionPutRequest { rows }
515+
RegionPutRequest {
516+
rows,
517+
hint: WriteHint::empty(),
518+
}
516519
}
517520

518521
fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {

src/mito2/src/engine/basic_test.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use datatypes::schema::ColumnSchema;
2828
use rstest::rstest;
2929
use rstest_reuse::{self, apply};
3030
use store_api::metadata::ColumnMetadata;
31+
use store_api::region_engine::WriteHint;
3132
use store_api::region_request::{RegionCreateRequest, RegionOpenRequest, RegionPutRequest};
3233
use store_api::storage::RegionId;
3334

@@ -530,7 +531,13 @@ async fn test_absent_and_invalid_columns() {
530531
rows,
531532
};
532533
let err = engine
533-
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
534+
.handle_request(
535+
region_id,
536+
RegionRequest::Put(RegionPutRequest {
537+
rows,
538+
hint: WriteHint::empty(),
539+
}),
540+
)
534541
.await
535542
.unwrap_err();
536543
assert_eq!(StatusCode::InvalidArguments, err.status_code());

src/mito2/src/engine/open_test.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use api::v1::Rows;
1919
use common_error::ext::ErrorExt;
2020
use common_error::status_code::StatusCode;
2121
use common_recordbatch::RecordBatches;
22-
use store_api::region_engine::{RegionEngine, RegionRole};
22+
use store_api::region_engine::{RegionEngine, RegionRole, WriteHint};
2323
use store_api::region_request::{
2424
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
2525
};
@@ -129,7 +129,10 @@ async fn test_engine_open_readonly() {
129129
let err = engine
130130
.handle_request(
131131
region_id,
132-
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
132+
RegionRequest::Put(RegionPutRequest {
133+
rows: rows.clone(),
134+
hint: WriteHint::empty(),
135+
}),
133136
)
134137
.await
135138
.unwrap_err();

src/mito2/src/engine/set_role_state_test.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use api::v1::Rows;
1616
use common_error::ext::ErrorExt;
1717
use common_error::status_code::StatusCode;
1818
use store_api::region_engine::{
19-
RegionEngine, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
19+
RegionEngine, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState, WriteHint,
2020
};
2121
use store_api::region_request::{RegionPutRequest, RegionRequest};
2222
use store_api::storage::RegionId;
@@ -74,7 +74,10 @@ async fn test_set_role_state_gracefully() {
7474
let error = engine
7575
.handle_request(
7676
region_id,
77-
RegionRequest::Put(RegionPutRequest { rows: rows.clone() }),
77+
RegionRequest::Put(RegionPutRequest {
78+
rows: rows.clone(),
79+
hint: WriteHint::empty(),
80+
}),
7881
)
7982
.await
8083
.unwrap_err();
@@ -152,7 +155,13 @@ async fn test_write_downgrading_region() {
152155
rows: build_rows(0, 42),
153156
};
154157
let err = engine
155-
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
158+
.handle_request(
159+
region_id,
160+
RegionRequest::Put(RegionPutRequest {
161+
rows: rows.clone(),
162+
hint: WriteHint::empty(),
163+
}),
164+
)
156165
.await
157166
.unwrap_err();
158167
assert_eq!(err.status_code(), StatusCode::RegionNotReady)

src/mito2/src/test_util.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use rskafka::client::{Client, ClientBuilder};
5757
use rskafka::record::Record;
5858
use rstest_reuse::template;
5959
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
60-
use store_api::region_engine::{RegionEngine, RegionRole};
60+
use store_api::region_engine::{RegionEngine, RegionRole, WriteHint};
6161
use store_api::region_request::{
6262
RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest,
6363
RegionOpenRequest, RegionPutRequest, RegionRequest,
@@ -1049,7 +1049,13 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec<api::v1::ColumnS
10491049
pub async fn put_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
10501050
let num_rows = rows.rows.len();
10511051
let result = engine
1052-
.handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows }))
1052+
.handle_request(
1053+
region_id,
1054+
RegionRequest::Put(RegionPutRequest {
1055+
rows,
1056+
hint: WriteHint::empty(),
1057+
}),
1058+
)
10531059
.await
10541060
.unwrap();
10551061
assert_eq!(num_rows, result.affected_rows);

src/store-api/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ workspace = true
1111
api.workspace = true
1212
aquamarine.workspace = true
1313
async-trait.workspace = true
14+
bitflags.workspace = true
1415
common-base.workspace = true
1516
common-error.workspace = true
1617
common-macro.workspace = true

src/store-api/src/region_engine.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::{Arc, Mutex};
2121
use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
2222
use api::region::RegionResponse;
2323
use async_trait::async_trait;
24+
use bitflags::bitflags;
2425
use common_error::ext::{BoxedError, PlainError};
2526
use common_error::status_code::StatusCode;
2627
use common_recordbatch::SendableRecordBatchStream;
@@ -36,6 +37,14 @@ use crate::metadata::RegionMetadataRef;
3637
use crate::region_request::{RegionOpenRequest, RegionRequest};
3738
use crate::storage::{RegionId, ScanRequest};
3839

40+
bitflags! {
41+
#[derive(Debug, Clone, Copy)]
42+
pub struct WriteHint: u8 {
43+
const PRIMARY_KEY_ENCODED = 1;
44+
const SPARSE_KEY_ENCODING = 1 << 1;
45+
}
46+
}
47+
3948
/// The settable region role state.
4049
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
4150
pub enum SettableRegionRoleState {

src/store-api/src/region_request.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::mito_engine_options::{
4545
TWCS_TIME_WINDOW,
4646
};
4747
use crate::path_utils::region_dir;
48+
use crate::region_engine::WriteHint;
4849
use crate::storage::{ColumnId, RegionId, ScanRequest};
4950

5051
#[derive(Debug, IntoStaticStr)]
@@ -95,8 +96,15 @@ fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequ
9596
.into_iter()
9697
.filter_map(|r| {
9798
let region_id = r.region_id.into();
98-
r.rows
99-
.map(|rows| (region_id, RegionRequest::Put(RegionPutRequest { rows })))
99+
r.rows.map(|rows| {
100+
(
101+
region_id,
102+
RegionRequest::Put(RegionPutRequest {
103+
rows,
104+
hint: WriteHint::empty(),
105+
}),
106+
)
107+
})
100108
})
101109
.collect();
102110
Ok(requests)
@@ -232,6 +240,8 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, Regi
232240
pub struct RegionPutRequest {
233241
/// Rows to put.
234242
pub rows: Rows,
243+
/// Write hint.
244+
pub hint: WriteHint,
235245
}
236246

237247
#[derive(Debug)]

0 commit comments

Comments
 (0)