Skip to content

Partition into 14 streams #10595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 55 additions & 18 deletions crates/store/re_datafusion/src/dataframe_query_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use re_protos::manifest_registry::v1alpha1::DATASET_MANIFEST_ID_FIELD_NAME;
use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::hash::{DefaultHasher, Hash as _, Hasher as _};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand All @@ -33,10 +34,11 @@ use std::task::{Context, Poll};
/// the pitfall of executing a single row at a time, but we will likely want to consider
/// at some point moving to a dynamic sizing.
const DEFAULT_BATCH_SIZE: usize = 2048;
const NUM_OUTPUT_PARTITIONS: usize = 14;

pub struct DataframeQueryTableProvider {
pub schema: SchemaRef,
query_engines: Vec<(String, QueryEngine<StorageEngine>)>,
query_engines: Vec<Vec<(String, QueryEngine<StorageEngine>)>>,
query_expression: QueryExpression,
sort_index: Option<Index>,
}
Expand All @@ -51,9 +53,11 @@ impl Debug for DataframeQueryTableProvider {
}

pub struct DataframePartitionStream {
query_handle: QueryHandle<StorageEngine>,
partition_id: String,
query_engines: Vec<(String, QueryEngine<StorageEngine>)>,
query_expression: QueryExpression,
projected_schema: SchemaRef,
query_handle: Option<QueryHandle<StorageEngine>>,
partition_id: Option<String>,
}

impl DataframeQueryTableProvider {
Expand All @@ -73,11 +77,19 @@ impl DataframeQueryTableProvider {
DATASET_MANIFEST_ID_FIELD_NAME,
));

let query_engines = query_engines.into_iter().collect();
let mut partitioned_query_engines: Vec<Vec<(String, QueryEngine<StorageEngine>)>> =
vec![Vec::new(); NUM_OUTPUT_PARTITIONS];

for (partition_id, engine) in query_engines {
let mut hasher = DefaultHasher::new();
partition_id.hash(&mut hasher);
let hash_value = hasher.finish() as usize % NUM_OUTPUT_PARTITIONS;
partitioned_query_engines[hash_value].push((partition_id, engine));
}

Ok(Self {
schema,
query_engines,
query_engines: partitioned_query_engines,
query_expression: query_expression.to_owned(),
sort_index: query_expression.filtered_index,
})
Expand Down Expand Up @@ -121,19 +133,43 @@ impl TableProvider for DataframeQueryTableProvider {
}
}

impl DataframePartitionStream {
fn update_query_handle(&mut self) {
if self.query_handle.is_none() {
if let Some((partition_id, engine)) = self.query_engines.pop() {
self.query_handle = Some(engine.query(self.query_expression.clone()));
self.partition_id = Some(partition_id);
}
}
}
}

impl Stream for DataframePartitionStream {
type Item = Result<RecordBatch, DataFusionError>;

#[tracing::instrument(level = "info", skip_all)]
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let query_handle = &this.query_handle;

let num_fields = query_handle.schema().fields.len();
this.update_query_handle();

let Some(next_row) = query_handle.next_row() else {
if this.query_handle.is_none() {
return Poll::Ready(None);
}

let query_schema = Arc::clone(this.query_handle.as_ref().unwrap().schema());
let num_fields = query_schema.fields.len();

let mut maybe_next_row = this.query_handle.as_ref().and_then(|qh| qh.next_row());
if maybe_next_row.is_none() {
this.update_query_handle();
maybe_next_row = this.query_handle.as_ref().and_then(|qh| qh.next_row());
}

let Some(next_row) = maybe_next_row else {
return Poll::Ready(None);
};

if next_row.is_empty() {
// Should not happen
return Poll::Ready(None);
Expand All @@ -154,14 +190,13 @@ impl Stream for DataframePartitionStream {
arrays.extend(next_row);

let batch_schema = Arc::new(prepend_string_column_schema(
query_handle.schema(),
&query_schema,
DATASET_MANIFEST_ID_FIELD_NAME,
));

let batch = RecordBatch::try_new(batch_schema, arrays)?;

let output_batch = align_record_batch_to_schema(&batch, &this.projected_schema)?;

Poll::Ready(Some(Ok(output_batch)))
}
}
Expand Down Expand Up @@ -206,7 +241,7 @@ pub fn align_record_batch_to_schema(

struct PartitionStreamExec {
props: PlanProperties,
query_engines: Vec<(String, QueryEngine<StorageEngine>)>,
query_engines: Vec<Vec<(String, QueryEngine<StorageEngine>)>>,
query_expression: QueryExpression,
projected_schema: Arc<Schema>,
}
Expand All @@ -226,7 +261,7 @@ impl PartitionStreamExec {
table_schema: &SchemaRef,
sort_index: Option<Index>,
projection: Option<&Vec<usize>>,
query_engines: Vec<(String, QueryEngine<StorageEngine>)>,
query_engines: Vec<Vec<(String, QueryEngine<StorageEngine>)>>,
query_expression: QueryExpression,
) -> datafusion::common::Result<Self> {
let projected_schema = match projection {
Expand Down Expand Up @@ -261,7 +296,7 @@ impl PartitionStreamExec {
let output_partitioning = if partition_in_output_schema {
Partitioning::Hash(
vec![Arc::new(Column::new(DATASET_MANIFEST_ID_FIELD_NAME, 0))],
query_engines.len(),
NUM_OUTPUT_PARTITIONS,
)
} else {
Partitioning::UnknownPartitioning(query_engines.len())
Expand Down Expand Up @@ -313,16 +348,18 @@ impl ExecutionPlan for PartitionStreamExec {
partition: usize,
_context: Arc<TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
let (partition_id, engine) = self
let query_engines = self
.query_engines
.get(partition)
.ok_or(plan_datafusion_err!("Invalid partition index"))?;
let query_handle = engine.query(self.query_expression.clone());
.ok_or(plan_datafusion_err!("Invalid partition index"))?
.clone();

let stream = DataframePartitionStream {
query_handle,
partition_id: partition_id.clone(),
query_engines,
query_expression: self.query_expression.clone(),
projected_schema: self.projected_schema.clone(),
query_handle: None,
partition_id: None,
};

Ok(Box::pin(stream))
Expand Down
Loading