Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec801a91aa22f9666063d02805f1f60f7c93458a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e5b4e08a3fad208c1c4c5a3b7525d2a85b08a6ef" }
hex = "0.4"
http = "0.2"
humantime = "2.1"
Expand Down
1 change: 1 addition & 0 deletions src/metric-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ object-store.workspace = true
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
smallvec.workspace = true
snafu.workspace = true
store-api.workspace = true
tokio.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::row_modifier::RowModifier;
use crate::utils;

#[cfg_attr(doc, aquamarine::aquamarine)]
Expand Down Expand Up @@ -267,6 +268,7 @@ impl MetricEngine {
data_region,
state: RwLock::default(),
config,
row_modifier: RowModifier::new(),
}),
}
}
Expand Down Expand Up @@ -310,6 +312,7 @@ struct MetricEngineInner {
/// TODO(weny): remove it after the config is used.
#[allow(unused)]
config: EngineConfig,
row_modifier: RowModifier,
}

#[cfg(test)]
Expand Down
8 changes: 7 additions & 1 deletion src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use store_api::metric_engine_consts::{
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME};
use store_api::storage::RegionId;

use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions};
Expand Down Expand Up @@ -363,6 +363,12 @@ impl MetricEngineInner {
column: DATA_SCHEMA_TSID_COLUMN_NAME,
}
);
ensure!(
!name_to_index.contains_key(PRIMARY_KEY_COLUMN_NAME),
InternalColumnOccupiedSnafu {
column: PRIMARY_KEY_COLUMN_NAME,
}
);

// check if required table option is present
ensure!(
Expand Down
113 changes: 42 additions & 71 deletions src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::Hash;

use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType};
use api::v1::{Rows, WriteHint};
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use store_api::codec::PrimaryKeyEncoding;
use store_api::region_request::{AffectedRows, RegionPutRequest};
use store_api::storage::{RegionId, TableId};

Expand All @@ -30,11 +25,9 @@ use crate::error::{
PhysicalRegionNotFoundSnafu, Result,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
use crate::row_modifier::RowsIter;
use crate::utils::to_data_region_id;

// A random number
const TSID_HASH_SEED: u32 = 846793005;

impl MetricEngineInner {
/// Dispatch region put request
pub async fn put_region(
Expand Down Expand Up @@ -82,8 +75,21 @@ impl MetricEngineInner {

// write to data region

// TODO(weny): retrieve the encoding from the metadata region.
let encoding = PrimaryKeyEncoding::Dense;

// TODO: retrieve table name
self.modify_rows(logical_region_id.table_id(), &mut request.rows)?;
self.modify_rows(
physical_region_id,
logical_region_id.table_id(),
&mut request.rows,
encoding,
)?;
if encoding == PrimaryKeyEncoding::Sparse {
request.hint = Some(WriteHint {
primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
});
}
self.data_region.write_data(data_region_id, request).await
}

Expand Down Expand Up @@ -133,67 +139,28 @@ impl MetricEngineInner {
/// Perform metric engine specific logic to incoming rows.
/// - Add table_id column
/// - Generate tsid
fn modify_rows(&self, table_id: TableId, rows: &mut Rows) -> Result<()> {
// gather tag column indices
let tag_col_indices = rows
.schema
.iter()
.enumerate()
.filter_map(|(idx, col)| {
if col.semantic_type == SemanticType::Tag as i32 {
Some((idx, col.column_name.clone()))
} else {
None
}
})
.collect::<Vec<_>>();

// add table_name column
rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint32 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});
// add tsid column
rows.schema.push(ColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64 as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
});

// fill internal columns
for row in &mut rows.rows {
Self::fill_internal_columns(table_id, &tag_col_indices, row);
}

Ok(())
}

/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(
fn modify_rows(
&self,
physical_region_id: RegionId,
table_id: TableId,
tag_col_indices: &[(usize, String)],
row: &mut Row,
) {
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
for (idx, name) in tag_col_indices {
let tag = row.values[*idx].clone();
name.hash(&mut hasher);
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = tag.value_data {
string.hash(&mut hasher);
}
}
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = hasher.finish128();

// fill table id and tsid
row.values.push(ValueData::U32Value(table_id).into());
row.values.push(ValueData::U64Value(hash).into());
rows: &mut Rows,
encoding: PrimaryKeyEncoding,
) -> Result<()> {
let input = std::mem::take(rows);
let iter = {
let state = self.state.read().unwrap();
let name_to_id = state
.physical_region_states()
.get(&physical_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?
.physical_columns();
RowsIter::new(input, name_to_id)
};
let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
*rows = output;
Ok(())
}
}

Expand All @@ -217,6 +184,7 @@ mod tests {
let rows = test_util::build_rows(1, 5);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
});

// write data
Expand Down Expand Up @@ -290,6 +258,7 @@ mod tests {
let rows = test_util::build_rows(3, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
});

// write data
Expand All @@ -311,6 +280,7 @@ mod tests {
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
});

engine
Expand All @@ -330,6 +300,7 @@ mod tests {
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
});

engine
Expand Down
9 changes: 9 additions & 0 deletions src/metric-engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to encode primary key"))]
EncodePrimaryKey {
source: mito2::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Mito write operation fails"))]
MitoWriteOperation {
source: BoxedError,
Expand Down Expand Up @@ -283,6 +290,8 @@ impl ErrorExt for Error {
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. } => source.status_code(),

EncodePrimaryKey { source, .. } => source.status_code(),

CollectRecordBatchStream { source, .. } => source.status_code(),

RegionAlreadyExists { .. } => StatusCode::RegionAlreadyExists,
Expand Down
1 change: 1 addition & 0 deletions src/metric-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod engine;
pub mod error;
mod metadata_region;
mod metrics;
mod row_modifier;
#[cfg(test)]
mod test_util;
mod utils;
2 changes: 1 addition & 1 deletion src/metric-engine/src/metadata_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ impl MetadataRegion {
}],
};

RegionPutRequest { rows }
RegionPutRequest { rows, hint: None }
}

fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
Expand Down
Loading
Loading