Skip to content

Commit 0ee4133

Browse files
authored
feat(flow): flow refill state (Part 1) (#5295)
* feat(flow): (Part 1) refill utils * chore: after rebase fix * chore: more rebase * rm refill.rs to reduce pr size * chore: simpler args * refactor: per review * docs: more explain for instant requests * refactor: per review
1 parent 369b59c commit 0ee4133

File tree

17 files changed

+756
-15
lines changed

17 files changed

+756
-15
lines changed

src/common/meta/src/key/table_info.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ impl TableInfoManager {
190190
))
191191
}
192192

193+
/// Checks if the table exists.
194+
pub async fn exists(&self, table_id: TableId) -> Result<bool> {
195+
let key = TableInfoKey::new(table_id);
196+
let raw_key = key.to_bytes();
197+
self.kv_backend.exists(&raw_key).await
198+
}
199+
193200
pub async fn get(
194201
&self,
195202
table_id: TableId,

src/common/recordbatch/src/recordbatch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::DfRecordBatch;
3535
#[derive(Clone, Debug, PartialEq)]
3636
pub struct RecordBatch {
3737
pub schema: SchemaRef,
38-
columns: Vec<VectorRef>,
38+
pub columns: Vec<VectorRef>,
3939
df_record_batch: DfRecordBatch,
4040
}
4141

src/flow/src/adapter.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,8 @@ impl FlowWorkerManager {
804804
}
805805
}
806806

807+
node_ctx.add_flow_plan(flow_id, flow_plan.clone());
808+
807809
let _ = comment;
808810
let _ = flow_options;
809811

src/flow/src/adapter/node_context.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
1818
use std::sync::atomic::{AtomicUsize, Ordering};
1919
use std::sync::Arc;
2020

21+
use common_recordbatch::RecordBatch;
2122
use common_telemetry::trace;
2223
use datatypes::prelude::ConcreteDataType;
2324
use session::context::QueryContext;
@@ -31,6 +32,7 @@ use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
3132
use crate::expr::error::InternalSnafu;
3233
use crate::expr::{Batch, GlobalId};
3334
use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
35+
use crate::plan::TypedPlan;
3436
use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP};
3537

3638
/// A context that holds the information of the dataflow
@@ -40,6 +42,7 @@ pub struct FlownodeContext {
4042
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
4143
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
4244
pub flow_to_sink: BTreeMap<FlowId, TableName>,
45+
pub flow_plans: BTreeMap<FlowId, TypedPlan>,
4346
pub sink_to_flow: BTreeMap<TableName, FlowId>,
4447
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
4548
///
@@ -63,6 +66,7 @@ impl FlownodeContext {
6366
Self {
6467
source_to_tasks: Default::default(),
6568
flow_to_sink: Default::default(),
69+
flow_plans: Default::default(),
6670
sink_to_flow: Default::default(),
6771
source_sender: Default::default(),
6872
sink_receiver: Default::default(),
@@ -179,6 +183,22 @@ impl SourceSender {
179183

180184
Ok(0)
181185
}
186+
187+
/// send record batch
188+
pub async fn send_record_batch(&self, batch: RecordBatch) -> Result<usize, Error> {
189+
let row_cnt = batch.num_rows();
190+
let batch = Batch::from(batch);
191+
192+
self.send_buf_row_cnt.fetch_add(row_cnt, Ordering::SeqCst);
193+
194+
self.send_buf_tx.send(batch).await.map_err(|e| {
195+
crate::error::InternalSnafu {
196+
reason: format!("Failed to send batch, error = {:?}", e),
197+
}
198+
.build()
199+
})?;
200+
Ok(row_cnt)
201+
}
182202
}
183203

184204
impl FlownodeContext {
@@ -200,6 +220,16 @@ impl FlownodeContext {
200220
sender.send_rows(rows, batch_datatypes).await
201221
}
202222

223+
pub async fn send_rb(&self, table_id: TableId, batch: RecordBatch) -> Result<usize, Error> {
224+
let sender = self
225+
.source_sender
226+
.get(&table_id)
227+
.with_context(|| TableNotFoundSnafu {
228+
name: table_id.to_string(),
229+
})?;
230+
sender.send_record_batch(batch).await
231+
}
232+
203233
/// flush all sender's buf
204234
///
205235
/// return numbers being sent
@@ -235,6 +265,15 @@ impl FlownodeContext {
235265
self.sink_to_flow.insert(sink_table_name, task_id);
236266
}
237267

268+
/// add flow plan to worker context
269+
pub fn add_flow_plan(&mut self, task_id: FlowId, plan: TypedPlan) {
270+
self.flow_plans.insert(task_id, plan);
271+
}
272+
273+
pub fn get_flow_plan(&self, task_id: &FlowId) -> Option<TypedPlan> {
274+
self.flow_plans.get(task_id).cloned()
275+
}
276+
238277
/// remove flow from worker context
239278
pub fn remove_flow(&mut self, task_id: FlowId) {
240279
if let Some(sink_table_name) = self.flow_to_sink.remove(&task_id) {
@@ -246,6 +285,7 @@ impl FlownodeContext {
246285
self.source_sender.remove(source_table_id);
247286
}
248287
}
288+
self.flow_plans.remove(&task_id);
249289
}
250290

251291
/// try add source sender, if already exist, do nothing

src/flow/src/adapter/table_source.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,31 @@ impl ManagedTableSource {
8282
}
8383
}
8484

85+
/// Get the time index column from table id
86+
pub async fn get_time_index_column_from_table_id(
87+
&self,
88+
table_id: TableId,
89+
) -> Result<(usize, datatypes::schema::ColumnSchema), Error> {
90+
let info = self
91+
.table_info_manager
92+
.get(table_id)
93+
.await
94+
.map_err(BoxedError::new)
95+
.context(ExternalSnafu)?
96+
.context(UnexpectedSnafu {
97+
reason: format!("Table id = {:?}, couldn't found table info", table_id),
98+
})?;
99+
let raw_schema = &info.table_info.meta.schema;
100+
let Some(ts_index) = raw_schema.timestamp_index else {
101+
UnexpectedSnafu {
102+
reason: format!("Table id = {:?}, couldn't found timestamp index", table_id),
103+
}
104+
.fail()?
105+
};
106+
let col_schema = raw_schema.column_schemas[ts_index].clone();
107+
Ok((ts_index, col_schema))
108+
}
109+
85110
pub async fn get_table_id_from_proto_name(
86111
&self,
87112
name: &greptime_proto::v1::TableName,
@@ -168,6 +193,14 @@ impl ManagedTableSource {
168193
let desc = table_info_value_to_relation_desc(table_info_value)?;
169194
Ok((table_name, desc))
170195
}
196+
197+
pub async fn check_table_exist(&self, table_id: &TableId) -> Result<bool, Error> {
198+
self.table_info_manager
199+
.exists(*table_id)
200+
.await
201+
.map_err(BoxedError::new)
202+
.context(ExternalSnafu)
203+
}
171204
}
172205

173206
impl std::fmt::Debug for ManagedTableSource {

src/flow/src/error.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
2121
use common_macro::stack_trace_debug;
2222
use common_telemetry::common_error::ext::ErrorExt;
2323
use common_telemetry::common_error::status_code::StatusCode;
24-
use snafu::{Location, Snafu};
24+
use snafu::{Location, ResultExt, Snafu};
2525
use tonic::metadata::MetadataMap;
2626

2727
use crate::adapter::FlowId;
@@ -259,3 +259,9 @@ impl ErrorExt for Error {
259259
}
260260

261261
define_into_tonic_status!(Error);
262+
263+
impl From<EvalError> for Error {
264+
fn from(e: EvalError) -> Self {
265+
Err::<(), _>(e).context(EvalSnafu).unwrap_err()
266+
}
267+
}

src/flow/src/expr.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod linear;
2222
pub(crate) mod relation;
2323
mod scalar;
2424
mod signature;
25+
pub(crate) mod utils;
2526

2627
use arrow::compute::FilterBuilder;
2728
use datatypes::prelude::{ConcreteDataType, DataType};
@@ -54,6 +55,16 @@ pub struct Batch {
5455
diffs: Option<VectorRef>,
5556
}
5657

58+
impl From<common_recordbatch::RecordBatch> for Batch {
59+
fn from(value: common_recordbatch::RecordBatch) -> Self {
60+
Self {
61+
row_count: value.num_rows(),
62+
batch: value.columns,
63+
diffs: None,
64+
}
65+
}
66+
}
67+
5768
impl PartialEq for Batch {
5869
fn eq(&self, other: &Self) -> bool {
5970
let mut batch_eq = true;

src/flow/src/expr/linear.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,30 @@ impl MapFilterProject {
9494
}
9595
}
9696

97+
pub fn get_nth_expr(&self, n: usize) -> Option<ScalarExpr> {
98+
let idx = *self.projection.get(n)?;
99+
if idx < self.input_arity {
100+
Some(ScalarExpr::Column(idx))
101+
} else {
102+
// find direct ref to input's expr
103+
104+
let mut expr = self.expressions.get(idx - self.input_arity)?;
105+
loop {
106+
match expr {
107+
ScalarExpr::Column(prev) => {
108+
if *prev < self.input_arity {
109+
return Some(ScalarExpr::Column(*prev));
110+
} else {
111+
expr = self.expressions.get(*prev - self.input_arity)?;
112+
continue;
113+
}
114+
}
115+
_ => return Some(expr.clone()),
116+
}
117+
}
118+
}
119+
}
120+
97121
/// The number of columns expected in the output row.
98122
pub fn output_arity(&self) -> usize {
99123
self.projection.len()

src/flow/src/expr/scalar.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,9 @@ impl ScalarExpr {
311311
}
312312

313313
/// Eval this expression with the given values.
314+
///
315+
/// TODO(discord9): add tests to make sure `eval_batch` is the same as `eval` in
316+
/// most cases
314317
pub fn eval(&self, values: &[Value]) -> Result<Value, EvalError> {
315318
match self {
316319
ScalarExpr::Column(index) => Ok(values[*index].clone()),

0 commit comments

Comments
 (0)