Skip to content

Commit

Permalink
refactor: pull column filling logic out of mito worker loop (#5455)
Browse files Browse the repository at this point in the history
* avoid duplicated req catagorisation

Signed-off-by: Ruihang Xia <[email protected]>

* pull column filling up

Signed-off-by: Ruihang Xia <[email protected]>

* fill columns instead of fill column

Signed-off-by: Ruihang Xia <[email protected]>

* add test with metadata

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Feb 6, 2025
1 parent 6341fb8 commit 116bdaf
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 106 deletions.
4 changes: 3 additions & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ impl EngineInner {
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
let (request, receiver) = WorkerRequest::try_from_region_request(region_id, request)?;
let region_metadata = self.get_metadata(region_id).ok();
let (request, receiver) =
WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
self.workers.submit_to_worker(region_id, request).await?;

receiver.await.context(RecvSnafu)?
Expand Down
165 changes: 119 additions & 46 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,20 @@ pub struct WriteRequest {
has_null: Vec<bool>,
/// Write hint.
pub hint: Option<WriteHint>,
/// Region metadata on the time of this request is created.
pub(crate) region_metadata: Option<RegionMetadataRef>,
}

impl WriteRequest {
/// Creates a new request.
///
/// Returns `Err` if `rows` are invalid.
pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> Result<WriteRequest> {
pub fn new(
region_id: RegionId,
op_type: OpType,
rows: Rows,
region_metadata: Option<RegionMetadataRef>,
) -> Result<WriteRequest> {
let mut name_to_index = HashMap::with_capacity(rows.schema.len());
for (index, column) in rows.schema.iter().enumerate() {
ensure!(
Expand Down Expand Up @@ -116,6 +123,7 @@ impl WriteRequest {
name_to_index,
has_null,
hint: None,
region_metadata,
})
}

Expand Down Expand Up @@ -248,46 +256,67 @@ impl WriteRequest {
pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
debug_assert_eq!(self.region_id, metadata.region_id);

let mut columns_to_fill = vec![];
for column in &metadata.column_metadatas {
if !self.name_to_index.contains_key(&column.column_schema.name) {
self.fill_column(column)?;
columns_to_fill.push(column);
}
}
self.fill_columns(columns_to_fill)?;

Ok(())
}

/// Fills default value for specific `column`.
fn fill_column(&mut self, column: &ColumnMetadata) -> Result<()> {
// Need to add a default value for this column.
let proto_value = self.column_default_value(column)?;
/// Checks the schema and fill missing columns.
pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
if let Err(e) = self.check_schema(metadata) {
if e.is_fill_default() {
// TODO(yingwen): Add metrics for this case.
// We need to fill default value. The write request may be a request
// sent before changing the schema.
self.fill_missing_columns(metadata)?;
} else {
return Err(e);
}
}

if proto_value.value_data.is_none() {
return Ok(());
Ok(())
}

/// Fills default value for specific `columns`.
fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
let mut default_values = Vec::with_capacity(columns.len());
let mut columns_to_fill = Vec::with_capacity(columns.len());
for column in columns {
let default_value = self.column_default_value(column)?;
if default_value.value_data.is_some() {
default_values.push(default_value);
columns_to_fill.push(column);
}
}

// Insert default value to each row.
for row in &mut self.rows.rows {
row.values.push(proto_value.clone());
row.values.extend(default_values.iter().cloned());
}

// Insert column schema.
let (datatype, datatype_ext) =
ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
.with_context(|_| ConvertColumnDataTypeSnafu {
reason: format!(
"no protobuf type for column {} ({:?})",
column.column_schema.name, column.column_schema.data_type
),
})?
.to_parts();
self.rows.schema.push(ColumnSchema {
column_name: column.column_schema.name.clone(),
datatype: datatype as i32,
semantic_type: column.semantic_type as i32,
datatype_extension: datatype_ext,
options: options_from_column_schema(&column.column_schema),
});
for column in columns_to_fill {
let (datatype, datatype_ext) =
ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
.with_context(|_| ConvertColumnDataTypeSnafu {
reason: format!(
"no protobuf type for column {} ({:?})",
column.column_schema.name, column.column_schema.data_type
),
})?
.to_parts();
self.rows.schema.push(ColumnSchema {
column_name: column.column_schema.name.clone(),
datatype: datatype as i32,
semantic_type: column.semantic_type as i32,
datatype_extension: datatype_ext,
options: options_from_column_schema(&column.column_schema),
});
}

Ok(())
}
Expand Down Expand Up @@ -559,19 +588,32 @@ impl WorkerRequest {
pub(crate) fn try_from_region_request(
region_id: RegionId,
value: RegionRequest,
region_metadata: Option<RegionMetadataRef>,
) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
let (sender, receiver) = oneshot::channel();
let worker_request = match value {
RegionRequest::Put(v) => {
let write_request =
WriteRequest::new(region_id, OpType::Put, v.rows)?.with_hint(v.hint);
let mut write_request =
WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
.with_hint(v.hint);
if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
&& let Some(region_metadata) = &region_metadata
{
write_request.maybe_fill_missing_columns(region_metadata)?;
}
WorkerRequest::Write(SenderWriteRequest {
sender: sender.into(),
request: write_request,
})
}
RegionRequest::Delete(v) => {
let write_request = WriteRequest::new(region_id, OpType::Delete, v.rows)?;
let mut write_request =
WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
&& let Some(region_metadata) = &region_metadata
{
write_request.maybe_fill_missing_columns(region_metadata)?;
}
WorkerRequest::Write(SenderWriteRequest {
sender: sender.into(),
request: write_request,
Expand Down Expand Up @@ -875,7 +917,7 @@ mod tests {
rows: vec![],
};

let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err();
let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
check_invalid_request(&err, "duplicate column c0");
}

Expand All @@ -891,7 +933,7 @@ mod tests {
}],
};

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
assert_eq!(0, request.column_index_by_name("c0").unwrap());
assert_eq!(1, request.column_index_by_name("c1").unwrap());
assert_eq!(None, request.column_index_by_name("c2"));
Expand All @@ -909,7 +951,7 @@ mod tests {
}],
};

let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err();
let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
check_invalid_request(&err, "row has 3 columns but schema has 2");
}

Expand Down Expand Up @@ -955,7 +997,7 @@ mod tests {
};
let metadata = new_region_metadata();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
request.check_schema(&metadata).unwrap();
}

Expand All @@ -972,7 +1014,7 @@ mod tests {
};
let metadata = new_region_metadata();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
}
Expand All @@ -994,7 +1036,7 @@ mod tests {
};
let metadata = new_region_metadata();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
}
Expand All @@ -1016,7 +1058,7 @@ mod tests {
};
let metadata = new_region_metadata();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "column ts is not null but input has null");
}
Expand All @@ -1035,7 +1077,7 @@ mod tests {
};
let metadata = new_region_metadata();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "missing column ts");
}
Expand All @@ -1058,7 +1100,7 @@ mod tests {
};
let metadata = new_region_metadata();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
}
Expand Down Expand Up @@ -1104,7 +1146,7 @@ mod tests {
builder.build().unwrap()
};

let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
assert!(request
Expand All @@ -1128,7 +1170,7 @@ mod tests {
};
let metadata = new_region_metadata();

let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
request.fill_missing_columns(&metadata).unwrap();
Expand Down Expand Up @@ -1214,7 +1256,8 @@ mod tests {
};
let metadata = region_metadata_two_fields();

let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap();
let mut request =
WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "delete requests need column k0");
let err = request.fill_missing_columns(&metadata).unwrap_err();
Expand All @@ -1233,7 +1276,8 @@ mod tests {
values: vec![i64_value(100), ts_ms_value(1)],
}],
};
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap();
let mut request =
WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
request.fill_missing_columns(&metadata).unwrap();
Expand Down Expand Up @@ -1296,7 +1340,8 @@ mod tests {
values: vec![i64_value(100), ts_ms_value(1)],
}],
};
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows).unwrap();
let mut request =
WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
request.fill_missing_columns(&metadata).unwrap();
Expand Down Expand Up @@ -1333,7 +1378,7 @@ mod tests {
};
let metadata = new_region_metadata();

let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.fill_missing_columns(&metadata).unwrap_err();
check_invalid_request(&err, "column ts does not have default value");
}
Expand Down Expand Up @@ -1363,11 +1408,39 @@ mod tests {
};
let metadata = region_metadata_two_fields();

let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(
&err,
"column f1 expect type Int64(Int64Type), given: STRING(12)",
);
}

#[test]
fn test_write_request_metadata() {
let rows = Rows {
schema: vec![
new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![i64_value(1), i64_value(2)],
}],
};

let metadata = Arc::new(new_region_metadata());
let request = WriteRequest::new(
RegionId::new(1, 1),
OpType::Put,
rows,
Some(metadata.clone()),
)
.unwrap();

assert!(request.region_metadata.is_some());
assert_eq!(
request.region_metadata.unwrap().region_id,
RegionId::new(1, 1)
);
}
}
Loading

0 comments on commit 116bdaf

Please sign in to comment.