Skip to content

Commit 86bc5d9

Browse files
WenyXuevenyag
authored andcommitted
feat(metric-engine): introduce RowModifier for MetricEngine (#5380)
* feat(metric-engine): store physical table ColumnIds in `MetricEngineState` * feat(metric-engine): introduce `RowModifier` for MetricEngine * chore: upgrade greptime-proto * feat: introduce `WriteHint` to `RegionPutRequest` * chore: apply suggestions from CR * chore: udpate greptime-proto * chore: apply suggestions from CR * chore: add comments * chore: update proto
1 parent b8e7fb5 commit 86bc5d9

25 files changed

+640
-91
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ etcd-client = "0.14"
125125
fst = "0.4.7"
126126
futures = "0.3"
127127
futures-util = "0.3"
128-
# Branch: chore/update-prost
129-
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8a63580f65453bd688db6662f70616a62a7214f3" }
128+
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "683e9d10ae7f3dfb8aaabd89082fc600c17e3795" }
130129
hex = "0.4"
131130
http = "1"
132131
humantime = "2.1"

src/metric-engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object-store.workspace = true
2929
prometheus.workspace = true
3030
serde.workspace = true
3131
serde_json.workspace = true
32+
smallvec.workspace = true
3233
snafu.workspace = true
3334
store-api.workspace = true
3435
tokio.workspace = true

src/metric-engine/src/engine.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use crate::config::EngineConfig;
5050
use crate::data_region::DataRegion;
5151
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
5252
use crate::metadata_region::MetadataRegion;
53+
use crate::row_modifier::RowModifier;
5354
use crate::utils;
5455

5556
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -267,6 +268,7 @@ impl MetricEngine {
267268
data_region,
268269
state: RwLock::default(),
269270
config,
271+
row_modifier: RowModifier::new(),
270272
}),
271273
}
272274
}
@@ -310,6 +312,7 @@ struct MetricEngineInner {
310312
/// TODO(weny): remove it after the config is used.
311313
#[allow(unused)]
312314
config: EngineConfig,
315+
row_modifier: RowModifier,
313316
}
314317

315318
#[cfg(test)]

src/metric-engine/src/engine/put.rs

Lines changed: 42 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::hash::Hash;
16-
17-
use api::v1::value::ValueData;
18-
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType};
15+
use api::v1::{Rows, WriteHint};
1916
use common_telemetry::{error, info};
2017
use snafu::{ensure, OptionExt};
21-
use store_api::metric_engine_consts::{
22-
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
23-
};
18+
use store_api::codec::PrimaryKeyEncoding;
2419
use store_api::region_request::{AffectedRows, RegionPutRequest};
2520
use store_api::storage::{RegionId, TableId};
2621

@@ -30,11 +25,9 @@ use crate::error::{
3025
PhysicalRegionNotFoundSnafu, Result,
3126
};
3227
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
28+
use crate::row_modifier::RowsIter;
3329
use crate::utils::to_data_region_id;
3430

35-
// A random number
36-
const TSID_HASH_SEED: u32 = 846793005;
37-
3831
impl MetricEngineInner {
3932
/// Dispatch region put request
4033
pub async fn put_region(
@@ -82,8 +75,21 @@ impl MetricEngineInner {
8275

8376
// write to data region
8477

78+
// TODO(weny): retrieve the encoding from the metadata region.
79+
let encoding = PrimaryKeyEncoding::Dense;
80+
8581
// TODO: retrieve table name
86-
self.modify_rows(logical_region_id.table_id(), &mut request.rows)?;
82+
self.modify_rows(
83+
physical_region_id,
84+
logical_region_id.table_id(),
85+
&mut request.rows,
86+
encoding,
87+
)?;
88+
if encoding == PrimaryKeyEncoding::Sparse {
89+
request.hint = Some(WriteHint {
90+
primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
91+
});
92+
}
8793
self.data_region.write_data(data_region_id, request).await
8894
}
8995

@@ -133,67 +139,28 @@ impl MetricEngineInner {
133139
/// Perform metric engine specific logic to incoming rows.
134140
/// - Add table_id column
135141
/// - Generate tsid
136-
fn modify_rows(&self, table_id: TableId, rows: &mut Rows) -> Result<()> {
137-
// gather tag column indices
138-
let tag_col_indices = rows
139-
.schema
140-
.iter()
141-
.enumerate()
142-
.filter_map(|(idx, col)| {
143-
if col.semantic_type == SemanticType::Tag as i32 {
144-
Some((idx, col.column_name.clone()))
145-
} else {
146-
None
147-
}
148-
})
149-
.collect::<Vec<_>>();
150-
151-
// add table_name column
152-
rows.schema.push(ColumnSchema {
153-
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
154-
datatype: ColumnDataType::Uint32 as i32,
155-
semantic_type: SemanticType::Tag as _,
156-
datatype_extension: None,
157-
options: None,
158-
});
159-
// add tsid column
160-
rows.schema.push(ColumnSchema {
161-
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
162-
datatype: ColumnDataType::Uint64 as i32,
163-
semantic_type: SemanticType::Tag as _,
164-
datatype_extension: None,
165-
options: None,
166-
});
167-
168-
// fill internal columns
169-
for row in &mut rows.rows {
170-
Self::fill_internal_columns(table_id, &tag_col_indices, row);
171-
}
172-
173-
Ok(())
174-
}
175-
176-
/// Fills internal columns of a row with table name and a hash of tag values.
177-
fn fill_internal_columns(
142+
fn modify_rows(
143+
&self,
144+
physical_region_id: RegionId,
178145
table_id: TableId,
179-
tag_col_indices: &[(usize, String)],
180-
row: &mut Row,
181-
) {
182-
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
183-
for (idx, name) in tag_col_indices {
184-
let tag = row.values[*idx].clone();
185-
name.hash(&mut hasher);
186-
// The type is checked before. So only null is ignored.
187-
if let Some(ValueData::StringValue(string)) = tag.value_data {
188-
string.hash(&mut hasher);
189-
}
190-
}
191-
// TSID is 64 bits, simply truncate the 128 bits hash
192-
let (hash, _) = hasher.finish128();
193-
194-
// fill table id and tsid
195-
row.values.push(ValueData::U32Value(table_id).into());
196-
row.values.push(ValueData::U64Value(hash).into());
146+
rows: &mut Rows,
147+
encoding: PrimaryKeyEncoding,
148+
) -> Result<()> {
149+
let input = std::mem::take(rows);
150+
let iter = {
151+
let state = self.state.read().unwrap();
152+
let name_to_id = state
153+
.physical_region_states()
154+
.get(&physical_region_id)
155+
.with_context(|| PhysicalRegionNotFoundSnafu {
156+
region_id: physical_region_id,
157+
})?
158+
.physical_columns();
159+
RowsIter::new(input, name_to_id)
160+
};
161+
let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
162+
*rows = output;
163+
Ok(())
197164
}
198165
}
199166

@@ -217,6 +184,7 @@ mod tests {
217184
let rows = test_util::build_rows(1, 5);
218185
let request = RegionRequest::Put(RegionPutRequest {
219186
rows: Rows { schema, rows },
187+
hint: None,
220188
});
221189

222190
// write data
@@ -290,6 +258,7 @@ mod tests {
290258
let rows = test_util::build_rows(3, 100);
291259
let request = RegionRequest::Put(RegionPutRequest {
292260
rows: Rows { schema, rows },
261+
hint: None,
293262
});
294263

295264
// write data
@@ -311,6 +280,7 @@ mod tests {
311280
let rows = test_util::build_rows(1, 100);
312281
let request = RegionRequest::Put(RegionPutRequest {
313282
rows: Rows { schema, rows },
283+
hint: None,
314284
});
315285

316286
engine
@@ -330,6 +300,7 @@ mod tests {
330300
let rows = test_util::build_rows(1, 100);
331301
let request = RegionRequest::Put(RegionPutRequest {
332302
rows: Rows { schema, rows },
303+
hint: None,
333304
});
334305

335306
engine

src/metric-engine/src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ pub enum Error {
105105
location: Location,
106106
},
107107

108+
#[snafu(display("Failed to encode primary key"))]
109+
EncodePrimaryKey {
110+
source: mito2::error::Error,
111+
#[snafu(implicit)]
112+
location: Location,
113+
},
114+
108115
#[snafu(display("Mito write operation fails"))]
109116
MitoWriteOperation {
110117
source: BoxedError,
@@ -283,6 +290,8 @@ impl ErrorExt for Error {
283290
| MitoCatchupOperation { source, .. }
284291
| MitoFlushOperation { source, .. } => source.status_code(),
285292

293+
EncodePrimaryKey { source, .. } => source.status_code(),
294+
286295
CollectRecordBatchStream { source, .. } => source.status_code(),
287296

288297
RegionAlreadyExists { .. } => StatusCode::RegionAlreadyExists,

src/metric-engine/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub mod engine;
5858
pub mod error;
5959
mod metadata_region;
6060
mod metrics;
61+
mod row_modifier;
6162
#[cfg(test)]
6263
mod test_util;
6364
mod utils;

src/metric-engine/src/metadata_region.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ impl MetadataRegion {
512512
}],
513513
};
514514

515-
RegionPutRequest { rows }
515+
RegionPutRequest { rows, hint: None }
516516
}
517517

518518
fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {

0 commit comments

Comments
 (0)