diff --git a/Cargo.lock b/Cargo.lock index f0b659d66e14..210d96a8c504 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4450,7 +4450,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec801a91aa22f9666063d02805f1f60f7c93458a#ec801a91aa22f9666063d02805f1f60f7c93458a" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=6cee3db98a552f1dd848dec3eefcce8f26343748#6cee3db98a552f1dd848dec3eefcce8f26343748" dependencies = [ "prost 0.12.6", "serde", @@ -6492,6 +6492,7 @@ dependencies = [ "prometheus", "serde", "serde_json", + "smallvec", "snafu 0.8.5", "store-api", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 461ba7b8db8d..d582054b434f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,7 +124,7 @@ etcd-client = "0.13" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec801a91aa22f9666063d02805f1f60f7c93458a" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6cee3db98a552f1dd848dec3eefcce8f26343748" } hex = "0.4" http = "0.2" humantime = "2.1" diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 64cbb51b38e8..f8d21c35d400 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -29,6 +29,7 @@ object-store.workspace = true prometheus.workspace = true serde.workspace = true serde_json.workspace = true +smallvec.workspace = true snafu.workspace = true store-api.workspace = true tokio.workspace = true diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 5b777d34d7e0..b4f22ccff91b 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -50,6 +50,7 @@ use crate::config::EngineConfig; use crate::data_region::DataRegion; use crate::error::{self, Result, UnsupportedRegionRequestSnafu}; use crate::metadata_region::MetadataRegion; +use crate::row_modifier::RowModifier; use crate::utils; #[cfg_attr(doc, aquamarine::aquamarine)] @@ -267,6 +268,7 @@ impl MetricEngine { data_region, state: RwLock::default(), config, + row_modifier: RowModifier::new(), }), } } @@ -310,6 +312,7 @@ struct MetricEngineInner { /// TODO(weny): remove it after the config is used. #[allow(unused)] config: EngineConfig, + row_modifier: RowModifier, } #[cfg(test)] diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 4aad27234b47..a2d53d087639 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -12,15 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::hash::Hash; - -use api::v1::value::ValueData; -use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; +use api::v1::{Rows, WriteHint}; use common_telemetry::{error, info}; use snafu::{ensure, OptionExt}; -use store_api::metric_engine_consts::{ - DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, -}; +use store_api::codec::PrimaryKeyEncoding; use store_api::region_request::{AffectedRows, RegionPutRequest}; use store_api::storage::{RegionId, TableId}; @@ -30,11 +25,9 @@ use crate::error::{ PhysicalRegionNotFoundSnafu, Result, }; use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED}; +use crate::row_modifier::RowsIter; use crate::utils::to_data_region_id; -// A random number -const TSID_HASH_SEED: u32 = 846793005; - impl MetricEngineInner { /// Dispatch region put request pub async fn put_region( @@ -82,8 +75,21 @@ impl MetricEngineInner { // write to data region + // TODO(weny): retrieve the encoding from the metadata region. + let encoding = PrimaryKeyEncoding::Dense; + // TODO: retrieve table name - self.modify_rows(logical_region_id.table_id(), &mut request.rows)?; + self.modify_rows( + physical_region_id, + logical_region_id.table_id(), + &mut request.rows, + encoding, + )?; + if encoding == PrimaryKeyEncoding::Sparse { + request.hint = Some(WriteHint { + primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(), + }); + } self.data_region.write_data(data_region_id, request).await } @@ -133,67 +139,28 @@ impl MetricEngineInner { /// Perform metric engine specific logic to incoming rows. /// - Add table_id column /// - Generate tsid - fn modify_rows(&self, table_id: TableId, rows: &mut Rows) -> Result<()> { - // gather tag column indices - let tag_col_indices = rows - .schema - .iter() - .enumerate() - .filter_map(|(idx, col)| { - if col.semantic_type == SemanticType::Tag as i32 { - Some((idx, col.column_name.clone())) - } else { - None - } - }) - .collect::>(); - - // add table_name column - rows.schema.push(ColumnSchema { - column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint32 as i32, - semantic_type: SemanticType::Tag as _, - datatype_extension: None, - options: None, - }); - // add tsid column - rows.schema.push(ColumnSchema { - column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint64 as i32, - semantic_type: SemanticType::Tag as _, - datatype_extension: None, - options: None, - }); - - // fill internal columns - for row in &mut rows.rows { - Self::fill_internal_columns(table_id, &tag_col_indices, row); - } - - Ok(()) - } - - /// Fills internal columns of a row with table name and a hash of tag values. - fn fill_internal_columns( + fn modify_rows( + &self, + physical_region_id: RegionId, table_id: TableId, - tag_col_indices: &[(usize, String)], - row: &mut Row, - ) { - let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED); - for (idx, name) in tag_col_indices { - let tag = row.values[*idx].clone(); - name.hash(&mut hasher); - // The type is checked before. So only null is ignored. - if let Some(ValueData::StringValue(string)) = tag.value_data { - string.hash(&mut hasher); - } - } - // TSID is 64 bits, simply truncate the 128 bits hash - let (hash, _) = hasher.finish128(); - - // fill table id and tsid - row.values.push(ValueData::U32Value(table_id).into()); - row.values.push(ValueData::U64Value(hash).into()); + rows: &mut Rows, + encoding: PrimaryKeyEncoding, + ) -> Result<()> { + let input = std::mem::take(rows); + let iter = { + let state = self.state.read().unwrap(); + let name_to_id = state + .physical_region_states() + .get(&physical_region_id) + .with_context(|| PhysicalRegionNotFoundSnafu { + region_id: physical_region_id, + })? + .physical_columns(); + RowsIter::new(input, name_to_id) + }; + let output = self.row_modifier.modify_rows(iter, table_id, encoding)?; + *rows = output; + Ok(()) } } @@ -217,6 +184,7 @@ mod tests { let rows = test_util::build_rows(1, 5); let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, + hint: None, }); // write data @@ -290,6 +258,7 @@ mod tests { let rows = test_util::build_rows(3, 100); let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, + hint: None, }); // write data @@ -311,6 +280,7 @@ mod tests { let rows = test_util::build_rows(1, 100); let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, + hint: None, }); engine @@ -330,6 +300,7 @@ mod tests { let rows = test_util::build_rows(1, 100); let request = RegionRequest::Put(RegionPutRequest { rows: Rows { schema, rows }, + hint: None, }); engine diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index be9655e71842..674989db9f12 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -105,6 +105,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to encode primary key"))] + EncodePrimaryKey { + source: mito2::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Mito write operation fails"))] MitoWriteOperation { source: BoxedError, @@ -283,6 +290,8 @@ impl ErrorExt for Error { | MitoCatchupOperation { source, .. } | MitoFlushOperation { source, .. } => source.status_code(), + EncodePrimaryKey { source, .. } => source.status_code(), + CollectRecordBatchStream { source, .. } => source.status_code(), RegionAlreadyExists { .. } => StatusCode::RegionAlreadyExists, diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 89b98351282c..b6acab47a251 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -58,6 +58,7 @@ pub mod engine; pub mod error; mod metadata_region; mod metrics; +mod row_modifier; #[cfg(test)] mod test_util; mod utils; diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 4b924ec3006c..70709a84ae26 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -512,7 +512,7 @@ impl MetadataRegion { }], }; - RegionPutRequest { rows } + RegionPutRequest { rows, hint: None } } fn build_delete_request(keys: &[String]) -> RegionDeleteRequest { diff --git a/src/metric-engine/src/row_modifier.rs b/src/metric-engine/src/row_modifier.rs new file mode 100644 index 000000000000..5fcf3c0d23eb --- /dev/null +++ b/src/metric-engine/src/row_modifier.rs @@ -0,0 +1,495 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; +use std::hash::Hash; + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; +use datatypes::value::ValueRef; +use mito2::row_converter::SparsePrimaryKeyCodec; +use smallvec::SmallVec; +use snafu::ResultExt; +use store_api::codec::PrimaryKeyEncoding; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, +}; +use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME}; +use store_api::storage::{ColumnId, TableId}; + +use crate::error::{EncodePrimaryKeySnafu, Result}; + +// A random number +const TSID_HASH_SEED: u32 = 846793005; + +/// A row modifier modifies [`Rows`]. +/// +/// - For [`PrimaryKeyEncoding::Sparse`] encoding, +/// it replaces the primary key columns with the encoded primary key column(`__primary_key`). +/// +/// - For [`PrimaryKeyEncoding::Dense`] encoding, +/// it adds two columns(`__table_id`, `__tsid`) to the row. +pub struct RowModifier { + codec: SparsePrimaryKeyCodec, +} + +impl RowModifier { + pub fn new() -> Self { + Self { + codec: SparsePrimaryKeyCodec::schemaless(), + } + } + + /// Modify rows with the given primary key encoding. + pub fn modify_rows( + &self, + iter: RowsIter, + table_id: TableId, + encoding: PrimaryKeyEncoding, + ) -> Result { + match encoding { + PrimaryKeyEncoding::Sparse => self.modify_rows_sparse(iter, table_id), + PrimaryKeyEncoding::Dense => self.modify_rows_dense(iter, table_id), + } + } + + /// Modifies rows with sparse primary key encoding. + /// It replaces the primary key columns with the encoded primary key column(`__primary_key`). + fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result { + let num_column = iter.rows.schema.len(); + let num_primary_key_column = iter.index.num_primary_key_column; + // num_output_column = remaining columns(fields columns + timestamp column) + 1 (encoded primary key column) + let num_output_column = num_column - num_primary_key_column + 1; + + let mut buffer = vec![]; + for mut iter in iter.iter_mut() { + let (table_id, tsid) = self.fill_internal_columns(table_id, &iter); + let mut values = Vec::with_capacity(num_output_column); + buffer.clear(); + let internal_columns = [ + ( + ReservedColumnId::table_id(), + api::helper::pb_value_to_value_ref(&table_id, &None), + ), + ( + ReservedColumnId::tsid(), + api::helper::pb_value_to_value_ref(&tsid, &None), + ), + ]; + self.codec + .encode_to_vec(internal_columns.into_iter(), &mut buffer) + .context(EncodePrimaryKeySnafu)?; + self.codec + .encode_to_vec(iter.primary_keys(), &mut buffer) + .context(EncodePrimaryKeySnafu)?; + + values.push(ValueData::BinaryValue(buffer.clone()).into()); + values.extend(iter.remaining()); + // Replace the row with the encoded row + *iter.row = Row { values }; + } + + // Update the schema + let mut schema = Vec::with_capacity(num_output_column); + schema.push(ColumnSchema { + column_name: PRIMARY_KEY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Binary as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }); + schema.extend(iter.remaining_columns()); + iter.rows.schema = schema; + + Ok(iter.rows) + } + + /// Modifies rows with dense primary key encoding. + /// It adds two columns(`__table_id`, `__tsid`) to the row. + fn modify_rows_dense(&self, mut iter: RowsIter, table_id: TableId) -> Result { + // add table_name column + iter.rows.schema.push(ColumnSchema { + column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint32 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }); + // add tsid column + iter.rows.schema.push(ColumnSchema { + column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }); + for iter in iter.iter_mut() { + let (table_id, tsid) = self.fill_internal_columns(table_id, &iter); + iter.row.values.push(table_id); + iter.row.values.push(tsid); + } + + Ok(iter.rows) + } + + /// Fills internal columns of a row with table name and a hash of tag values. + fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) { + let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED); + for (name, value) in iter.primary_keys_with_name() { + // The type is checked before. So only null is ignored. + if let Some(ValueData::StringValue(string)) = &value.value_data { + name.hash(&mut hasher); + string.hash(&mut hasher); + } + } + // TSID is 64 bits, simply truncate the 128 bits hash + let (hash, _) = hasher.finish128(); + + ( + ValueData::U32Value(table_id).into(), + ValueData::U64Value(hash).into(), + ) + } +} + +/// Index of a value. +#[derive(Debug, Clone, Copy)] +struct ValueIndex { + column_id: ColumnId, + index: usize, +} + +/// Index of a row. +struct IterIndex { + indices: Vec, + num_primary_key_column: usize, +} + +impl IterIndex { + fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap) -> Self { + let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new(); + // Uses BTreeMap to keep the primary key column name order (lexicographical) + let mut primary_key_indices = BTreeMap::new(); + let mut field_indices = SmallVec::<[ValueIndex; 1]>::new(); + let mut ts_index = None; + for (idx, col) in row_schema.iter().enumerate() { + match col.semantic_type() { + SemanticType::Tag => match col.column_name.as_str() { + DATA_SCHEMA_TABLE_ID_COLUMN_NAME => { + reserved_indices.push(ValueIndex { + column_id: ReservedColumnId::table_id(), + index: idx, + }); + } + DATA_SCHEMA_TSID_COLUMN_NAME => { + reserved_indices.push(ValueIndex { + column_id: ReservedColumnId::tsid(), + index: idx, + }); + } + _ => { + // Inserts primary key column name follower the column name order (lexicographical) + primary_key_indices.insert( + col.column_name.as_str(), + ValueIndex { + column_id: *name_to_column_id.get(&col.column_name).unwrap(), + index: idx, + }, + ); + } + }, + SemanticType::Field => { + field_indices.push(ValueIndex { + column_id: *name_to_column_id.get(&col.column_name).unwrap(), + index: idx, + }); + } + SemanticType::Timestamp => { + ts_index = Some(ValueIndex { + column_id: *name_to_column_id.get(&col.column_name).unwrap(), + index: idx, + }); + } + } + } + let num_primary_key_column = primary_key_indices.len() + reserved_indices.len(); + let indices = reserved_indices + .into_iter() + .chain(primary_key_indices.values().cloned()) + .chain(ts_index) + .chain(field_indices) + .collect(); + IterIndex { + indices, + num_primary_key_column, + } + } +} + +/// Iterator of rows. +pub(crate) struct RowsIter { + rows: Rows, + index: IterIndex, +} + +impl RowsIter { + pub fn new(rows: Rows, name_to_column_id: &HashMap) -> Self { + let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id); + Self { rows, index } + } + + /// Returns the iterator of rows. + fn iter_mut(&mut self) -> impl Iterator { + self.rows.rows.iter_mut().map(|row| RowIter { + row, + index: &self.index, + schema: &self.rows.schema, + }) + } + + /// Returns the remaining columns. + fn remaining_columns(&mut self) -> impl Iterator + '_ { + self.index.indices[self.index.num_primary_key_column..] + .iter() + .map(|idx| std::mem::take(&mut self.rows.schema[idx.index])) + } +} + +/// Iterator of a row. +struct RowIter<'a> { + row: &'a mut Row, + index: &'a IterIndex, + schema: &'a Vec, +} + +impl RowIter<'_> { + /// Returns the primary keys with their names. + fn primary_keys_with_name(&self) -> impl Iterator { + self.index.indices[..self.index.num_primary_key_column] + .iter() + .map(|idx| { + ( + &self.schema[idx.index].column_name, + &self.row.values[idx.index], + ) + }) + } + + /// Returns the primary keys. + fn primary_keys(&self) -> impl Iterator { + self.index.indices[..self.index.num_primary_key_column] + .iter() + .map(|idx| { + ( + idx.column_id, + api::helper::pb_value_to_value_ref( + &self.row.values[idx.index], + &self.schema[idx.index].datatype_extension, + ), + ) + }) + } + + /// Returns the remaining columns. + fn remaining(&mut self) -> impl Iterator + '_ { + self.index.indices[self.index.num_primary_key_column..] + .iter() + .map(|idx| std::mem::take(&mut self.row.values[idx.index])) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use api::v1::{Row, Rows}; + + use super::*; + + fn test_schema() -> Vec { + vec![ + ColumnSchema { + column_name: "namespace".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ColumnSchema { + column_name: "host".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ] + } + + fn test_row(v1: &str, v2: &str) -> Row { + Row { + values: vec![ + ValueData::StringValue(v1.to_string()).into(), + ValueData::StringValue(v2.to_string()).into(), + ], + } + } + + fn test_name_to_column_id() -> HashMap { + HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)]) + } + + #[test] + fn test_encode_sparse() { + let name_to_column_id = test_name_to_column_id(); + let encoder = RowModifier::new(); + let table_id = 1025; + let schema = test_schema(); + let row = test_row("greptimedb", "127.0.0.1"); + let rows = Rows { + schema, + rows: vec![row], + }; + let rows_iter = RowsIter::new(rows, &name_to_column_id); + let result = encoder.modify_rows_sparse(rows_iter, table_id).unwrap(); + assert_eq!(result.rows[0].values.len(), 1); + let encoded_primary_key = vec![ + 128, 0, 0, 4, 1, 0, 0, 4, 1, 128, 0, 0, 3, 1, 131, 9, 166, 190, 173, 37, 39, 240, 0, 0, + 0, 2, 1, 1, 49, 50, 55, 46, 48, 46, 48, 46, 9, 49, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, + 1, 1, 103, 114, 101, 112, 116, 105, 109, 101, 9, 100, 98, 0, 0, 0, 0, 0, 0, 2, + ]; + assert_eq!( + result.rows[0].values[0], + ValueData::BinaryValue(encoded_primary_key).into() + ); + assert_eq!(result.schema, expected_sparse_schema()); + } + + fn expected_sparse_schema() -> Vec { + vec![ColumnSchema { + column_name: PRIMARY_KEY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Binary as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }] + } + + fn expected_dense_schema() -> Vec { + vec![ + ColumnSchema { + column_name: "namespace".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ColumnSchema { + column_name: "host".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ColumnSchema { + column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint32 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ColumnSchema { + column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64 as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ] + } + + #[test] + fn test_encode_dense() { + let name_to_column_id = test_name_to_column_id(); + let encoder = RowModifier::new(); + let table_id = 1025; + let schema = test_schema(); + let row = test_row("greptimedb", "127.0.0.1"); + let rows = Rows { + schema, + rows: vec![row], + }; + let rows_iter = RowsIter::new(rows, &name_to_column_id); + let result = encoder.modify_rows_dense(rows_iter, table_id).unwrap(); + assert_eq!( + result.rows[0].values[0], + ValueData::StringValue("greptimedb".to_string()).into() + ); + assert_eq!( + result.rows[0].values[1], + ValueData::StringValue("127.0.0.1".to_string()).into() + ); + assert_eq!(result.rows[0].values[2], ValueData::U32Value(1025).into()); + assert_eq!( + result.rows[0].values[3], + ValueData::U64Value(9442261431637846000).into() + ); + assert_eq!(result.schema, expected_dense_schema()); + } + + #[test] + fn test_fill_internal_columns() { + let name_to_column_id = test_name_to_column_id(); + let encoder = RowModifier::new(); + let table_id = 1025; + let schema = test_schema(); + let row = test_row("greptimedb", "127.0.0.1"); + let rows = Rows { + schema, + rows: vec![row], + }; + let mut rows_iter = RowsIter::new(rows, &name_to_column_id); + let row_iter = rows_iter.iter_mut().next().unwrap(); + let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter); + assert_eq!(encoded_table_id, ValueData::U32Value(1025).into()); + assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into()); + + // Change the column order + let schema = vec![ + ColumnSchema { + column_name: "host".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ColumnSchema { + column_name: "namespace".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ]; + let row = test_row("127.0.0.1", "greptimedb"); + let rows = Rows { + schema, + rows: vec![row], + }; + let mut rows_iter = RowsIter::new(rows, &name_to_column_id); + let row_iter = rows_iter.iter_mut().next().unwrap(); + let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter); + assert_eq!(encoded_table_id, ValueData::U32Value(1025).into()); + assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into()); + } +} diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index b0c6a550b2cf..aca8ad4652f5 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -275,6 +275,7 @@ impl CpuDataGenerator { schema: self.column_schemas.clone(), rows, }), + write_hint: None, }; KeyValues::new(&self.metadata, mutation).unwrap() diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 10be1d269fff..43409a2c9caf 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -530,7 +530,10 @@ async fn test_absent_and_invalid_columns() { rows, }; let err = engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .handle_request( + region_id, + RegionRequest::Put(RegionPutRequest { rows, hint: None }), + ) .await .unwrap_err(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 32b963b5067b..93f3963e0bea 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -129,7 +129,10 @@ async fn test_engine_open_readonly() { let err = engine .handle_request( region_id, - RegionRequest::Put(RegionPutRequest { rows: rows.clone() }), + RegionRequest::Put(RegionPutRequest { + rows: rows.clone(), + hint: None, + }), ) .await .unwrap_err(); diff --git a/src/mito2/src/engine/set_role_state_test.rs b/src/mito2/src/engine/set_role_state_test.rs index 1d7a46f43647..2a4cb9f9ca31 100644 --- a/src/mito2/src/engine/set_role_state_test.rs +++ b/src/mito2/src/engine/set_role_state_test.rs @@ -74,7 +74,10 @@ async fn test_set_role_state_gracefully() { let error = engine .handle_request( region_id, - RegionRequest::Put(RegionPutRequest { rows: rows.clone() }), + RegionRequest::Put(RegionPutRequest { + rows: rows.clone(), + hint: None, + }), ) .await .unwrap_err(); @@ -152,7 +155,13 @@ async fn test_write_downgrading_region() { rows: build_rows(0, 42), }; let err = engine - .handle_request(region_id, RegionRequest::Put(RegionPutRequest { rows })) + .handle_request( + region_id, + RegionRequest::Put(RegionPutRequest { + rows: rows.clone(), + hint: None, + }), + ) .await .unwrap_err(); assert_eq!(err.status_code(), StatusCode::RegionNotReady) diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 73013920e65b..1567a6b6d8da 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -323,6 +323,7 @@ mod tests { op_type: OpType::Put as i32, sequence: START_SEQ, rows: Some(rows), + write_hint: None, } } @@ -360,6 +361,7 @@ mod tests { op_type: OpType::Put as i32, sequence: 100, rows: None, + write_hint: None, }; let kvs = KeyValues::new(&meta, mutation); assert!(kvs.is_none()); diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 41a5a1351627..4bcd432d414a 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -741,6 +741,7 @@ mod tests { schema: column_schema, rows, }), + write_hint: None, }; KeyValues::new(metadata.as_ref(), mutation).unwrap() } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index d9bc44815f89..a7c41648f39f 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -1165,6 +1165,7 @@ mod tests { schema: column_schema, rows, }), + write_hint: None, }; KeyValues::new(schema.as_ref(), mutation).unwrap() } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index e86ff77ca2f1..4d1cef23407b 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -137,6 +137,7 @@ impl RegionWriteCtx { op_type, sequence: self.next_sequence, rows, + write_hint: None, }); let notify = WriteNotify::new(tx, num_rows); diff --git a/src/mito2/src/row_converter/sparse.rs b/src/mito2/src/row_converter/sparse.rs index d1eaa7a15d25..6beca6412a6b 100644 --- a/src/mito2/src/row_converter/sparse.rs +++ b/src/mito2/src/row_converter/sparse.rs @@ -42,7 +42,9 @@ struct SparsePrimaryKeyCodecInner { // User defined label field label_field: SortField, // Columns in primary key - columns: HashSet, + // + // None means all unknown columns is primary key(`Self::label_field`). + columns: Option>, } /// Sparse values representation. @@ -85,18 +87,37 @@ impl SparsePrimaryKeyCodec { table_id_field: SortField::new(ConcreteDataType::uint32_datatype()), tsid_field: SortField::new(ConcreteDataType::uint64_datatype()), label_field: SortField::new(ConcreteDataType::string_datatype()), - columns: region_metadata - .primary_key_columns() - .map(|c| c.column_id) - .collect(), + columns: Some( + region_metadata + .primary_key_columns() + .map(|c| c.column_id) + .collect(), + ), + }), + } + } + + /// Returns a new [`SparsePrimaryKeyCodec`] instance. + /// + /// It treats all unknown columns as primary key(label field). + pub fn schemaless() -> Self { + Self { + inner: Arc::new(SparsePrimaryKeyCodecInner { + table_id_field: SortField::new(ConcreteDataType::uint32_datatype()), + tsid_field: SortField::new(ConcreteDataType::uint64_datatype()), + label_field: SortField::new(ConcreteDataType::string_datatype()), + columns: None, }), } } /// Returns the field of the given column id. fn get_field(&self, column_id: ColumnId) -> Option<&SortField> { - if !self.inner.columns.contains(&column_id) { - return None; + // if the `columns` is not specified, all unknown columns is primary key(label field). + if let Some(columns) = &self.inner.columns { + if !columns.contains(&column_id) { + return None; + } } match column_id { @@ -107,7 +128,7 @@ impl SparsePrimaryKeyCodec { } /// Encodes the given bytes into a [`SparseValues`]. - pub(crate) fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> where I: Iterator)>, { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 59f81987bc0d..9232f478c2ff 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -1049,7 +1049,10 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec Result Result, } #[derive(Debug)]