-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement index for buffer #24954
Conversation
@@ -319,6 +319,19 @@ impl TableDefinition { | |||
self.schema = schema; | |||
} | |||
|
|||
pub(crate) fn index_columns(&self) -> Vec<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we'll want to continue to default to having all tag columns be indexed. We'll need a new API & CLI to set the columns explicitly that a user wants indexed. We may find that by default we don't want to index anything.
@@ -303,10 +329,22 @@ impl DatabaseBuffer { | |||
segment_key: &PartitionKey, | |||
table_batch: TableBatch, | |||
) { | |||
if !self.table_buffers.contains_key(&table_name) { | |||
// TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created #24955 to follow up and check what's going on here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this happen without your changes at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without my changes, the limit test passes, but that's because the buffer doesn't look for the table in the db_schema. So it looks to me like the limit check is hit, but it still calls to buffer the write, which passes on the old code path because it doesn't look into the catalog. That would be a bug because the buffered write would make it into the WAL and into the in-memory buffer without a corresponding catalog update. At least, that's what I think is going on here.
This implements an index for the data in the table buffers. For now, by default, it indexes all tags, keeping a mapping of tag key/value pair to the row ids that it has in the buffer. When queries ask for record batches from the table buffer, the filter expression is evaluated to determine if a record batch can be built on the fly using only the row ids that match the index. If we don't have it in the index, the entire record batch from the buffer will be returned. This also updates the logic in segment state to only request a record batch with the projection. The query executor was updated so that it pushes the filter and projection down to the request to get table chunks. While implementing this, I believe I uncovered a bug where when limits are hit, a write still attempts to get buffered. I'll log a follow up to look at that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have quite a few suggestions to make the code easier to read and understand. I'll need to probably do another pass to make sure I'm fully understanding all of these changes.
filters | ||
.iter() | ||
.map(|_f| Ok(TableProviderFilterPushDown::Inexact)) | ||
.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not using the exprs here at all nor does it need to be a Result since this will always be Ok. I think you can just do this:
filters | |
.iter() | |
.map(|_f| Ok(TableProviderFilterPushDown::Inexact)) | |
.collect() | |
vec![TableProviderFilterPushDown::Inexact; filters.len()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll have to be a result because that's the trait signature, but good call on the vec.
if !buffered_data.database_buffers.contains_key(db_name) { | ||
let db_schema = catalog.db_schema(db_name).expect("db schema should exist"); | ||
buffered_data.database_buffers.insert( | ||
db_name.clone(), | ||
DatabaseBuffer { | ||
table_buffers: HashMap::new(), | ||
db_schema, | ||
}, | ||
); | ||
} | ||
let db_buffer = buffered_data.database_buffers.get_mut(db_name).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can keep the initial form with using the entry API like so sans borrow check issues:
if !buffered_data.database_buffers.contains_key(db_name) { | |
let db_schema = catalog.db_schema(db_name).expect("db schema should exist"); | |
buffered_data.database_buffers.insert( | |
db_name.clone(), | |
DatabaseBuffer { | |
table_buffers: HashMap::new(), | |
db_schema, | |
}, | |
); | |
} | |
let db_buffer = buffered_data.database_buffers.get_mut(db_name).unwrap(); | |
let db_buffer = buffered_data | |
.database_buffers | |
.entry(db_name) | |
.or_insert_with(|| { | |
DatabaseBuffer { | |
table_buffers: HashMap::new(), | |
db_schema: catalog | |
.db_schema(db_name) | |
.expect("db schema should exist"); | |
} | |
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the PR it seems you had done something similar earlier. Was there a reason you didn't do so here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that doesn't work because db_name
is a borrowed string, which doesn't work with the entry API. So I did it this way to avoid a string clone, rather than use the entry API.
@@ -303,10 +329,22 @@ impl DatabaseBuffer { | |||
segment_key: &PartitionKey, | |||
table_batch: TableBatch, | |||
) { | |||
if !self.table_buffers.contains_key(&table_name) { | |||
// TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this happen without your changes at all?
@@ -77,6 +77,9 @@ pub enum Error { | |||
|
|||
#[error("walop in file {0} contained data for more than one segment, which is invalid")] | |||
WalOpForMultipleSegments(String), | |||
|
|||
#[error("error: {0}")] | |||
Other(#[from] anyhow::Error), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for us using just a catch all here rather than specific variants?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change them into specific variants, I wasn't sure what to do here. These errors are ones that actually shouldn't happen, so the receiver can't do anything about it. I was just tossing it into a catch all so that it would at least get logged and get some sort of message back to the user. Previously they were doing a panic, which I wanted to remove. I can add specific thiserror variants if you think this is gross 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to have a specific error, incoming...
if !self.data.contains_key(&f.name) { | ||
let mut tag_builder = StringDictionaryBuilder::new(); | ||
// append nulls for all previous rows | ||
for _ in 0..(row_index + self.row_count) { | ||
tag_builder.append_null(); | ||
} | ||
Builder::Tag(tag_builder) | ||
}); | ||
self.data.insert(f.name.clone(), Builder::Tag(tag_builder)); | ||
} | ||
let b = self | ||
.data | ||
.get_mut(&f.name) | ||
.expect("tag builder should exist"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could simplified to this I believe:
let b = self.data.entry(&f.name)
.or_insert_with(|| {
let mut tag_builder = StringDictionaryBuilder::new();
// append nulls for all previous rows
for _ in 0..(row_index + self.row_count) {
tag_builder.append_null();
}
self.data.insert(f.name.clone(), Builder::Tag(tag_builder));
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
entry API won't take a borrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah dang. That's unfortunate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One day we might want to consider this option, but it feels very arm twisting to get the compiler to accept this:
https://idubrov.name/rust/2018/06/01/tricking-the-hashmap.html
&self, | ||
schema: SchemaRef, | ||
filter: &[Expr], | ||
) -> Result<Vec<RecordBatch>, anyhow::Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only ever return one RecordBatch. Why return a Vec here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory it was going to potentially return many as other things were added to table buffer like splitting up the writes based on time. I can change it to a single and then add it back in later if that's how things progress?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes more sense. Better to do what we have now and change the type sig later if need be.
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr { | ||
if *op == datafusion::logical_expr::Operator::Eq { | ||
if let Expr::Column(c) = left.as_ref() { | ||
if let Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v))) = | ||
right.as_ref() | ||
{ | ||
return self.columns.get(c.name.as_str()).and_then(|m| m.get(v)); | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can simplify the code like so, but maybe not.
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr { | |
if *op == datafusion::logical_expr::Operator::Eq { | |
if let Expr::Column(c) = left.as_ref() { | |
if let Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v))) = | |
right.as_ref() | |
{ | |
return self.columns.get(c.name.as_str()).and_then(|m| m.get(v)); | |
} | |
} | |
} | |
} | |
} | |
if let Expr::BinaryExpr(BinaryExpr { | |
left: Expr::Column(c), | |
op: datafusion::logical_expr::Operator::Eq, | |
right: Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some(v))) | |
}) = expr { | |
return self.columns.get(c.name.as_str()).and_then(|m| m.get(v)); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem like I can because of the Box
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah dang Box patterns would have solved this problem and they've been unstable since 1.0 basically 😭
Alternate Title: The DB Schema only ever has one table This is a story of subtle bugs, gnashing of teeth, and hair pulling. Gather round as I tell you the tale of of an Arc that pointed to an outdated schema. In #24954 we introduced an Index for the database as this will allow us to perform faster queries. When we added that code this check was added: ```rust if !self.table_buffers.contains_key(&table_name) { // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog // and we've gotten here, it means we're dropping a write. if let Some(table) = self.db_schema.get_table(&table_name) { self.table_buffers.insert( table_name.clone(), TableBuffer::new(segment_key.clone(), &table.index_columns()), ); } else { return; } } ``` Adding the return there let us continue on with our day and make the tests pass. However, just because these tests passed didn't mean the code was correct as I would soon find out. With a follow up ticket of #24955 created we merged the changes and I began to debug the issue. Note we had the assumption of dropping a single write due to limits because the limits test is what failed. What began was a chase of a few days to prove that the limits weren't what was failing. This was a bit long but the conclusion was that the limits weren't causing it, but it did expose the fact that a Database only ever had one table which was weird. I then began to dig into this a bit more. Why would there only be one table? We weren't just dropping one write, we were dropping all but *one* write or so it seemed. Many printlns/hours later it became clear that we were actually updating the schema! It existed in the Catalog, but not in the pointer to the schema in the DatabaseBuffer struct so what gives? Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541). In the `validate_or_insert_schema_and_partitions` function for the WriteBuffer we have this bit of code: ```rust // The (potentially updated) DatabaseSchema to return to the caller. let mut schema = Cow::Borrowed(schema); ``` As we pass in a reference to the schema in the catalog. However, when we [go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568) we see this code: ```rust let schema = match schema { Cow::Owned(s) => Some(s), Cow::Borrowed(_) => None, }; ``` What this means is that if we make a change we clone the original and update it. We *aren't* making a change to the original schema. When we go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460): ```rust if let Some(schema) = result.schema.take() { debug!("replacing schema for {:?}", schema); catalog.replace_database(sequence, Arc::new(schema))?; } ``` We are updating the catalog with the new schema, but how does that work? ```rust inner.databases.insert(db.name.clone(), db); ``` Oh. Oh no. We're just overwriting it. Which means that the DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which means that the buffer will get the first copy of the schema with the first new table, but *none* of the other ones. The solution is to make sure that the buffer has a pointer to the Catalog instead which means the DatabaseBuffer will have the most up to date schema and instead lets only the Catalog handle the schema itself. This commit makes those changes to make sure it works. This was a very very subtle mutability/pointer bug given the intersection of valid borrow checking and some writes making it in, but luckily we caught it. It does mean though that until this fix is in, we can consider changes between the Index PR and now are subtly broken and shouldn't be used for anything beyond writing to a signle table per DB. TL;DR We should ask the Catalog what the schema is as it contains the up to date version of it. Closes #24955
Alternate Title: The DB Schema only ever has one table This is a story of subtle bugs, gnashing of teeth, and hair pulling. Gather round as I tell you the tale of of an Arc that pointed to an outdated schema. In #24954 we introduced an Index for the database as this will allow us to perform faster queries. When we added that code this check was added: ```rust if !self.table_buffers.contains_key(&table_name) { // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog // and we've gotten here, it means we're dropping a write. if let Some(table) = self.db_schema.get_table(&table_name) { self.table_buffers.insert( table_name.clone(), TableBuffer::new(segment_key.clone(), &table.index_columns()), ); } else { return; } } ``` Adding the return there let us continue on with our day and make the tests pass. However, just because these tests passed didn't mean the code was correct as I would soon find out. With a follow up ticket of #24955 created we merged the changes and I began to debug the issue. Note we had the assumption of dropping a single write due to limits because the limits test is what failed. What began was a chase of a few days to prove that the limits weren't what was failing. This was a bit long but the conclusion was that the limits weren't causing it, but it did expose the fact that a Database only ever had one table which was weird. I then began to dig into this a bit more. Why would there only be one table? We weren't just dropping one write, we were dropping all but *one* write or so it seemed. Many printlns/hours later it became clear that we were actually updating the schema! It existed in the Catalog, but not in the pointer to the schema in the DatabaseBuffer struct so what gives? Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541). In the `validate_or_insert_schema_and_partitions` function for the WriteBuffer we have this bit of code: ```rust // The (potentially updated) DatabaseSchema to return to the caller. let mut schema = Cow::Borrowed(schema); ``` As we pass in a reference to the schema in the catalog. However, when we [go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568) we see this code: ```rust let schema = match schema { Cow::Owned(s) => Some(s), Cow::Borrowed(_) => None, }; ``` What this means is that if we make a change we clone the original and update it. We *aren't* making a change to the original schema. When we go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460): ```rust if let Some(schema) = result.schema.take() { debug!("replacing schema for {:?}", schema); catalog.replace_database(sequence, Arc::new(schema))?; } ``` We are updating the catalog with the new schema, but how does that work? ```rust inner.databases.insert(db.name.clone(), db); ``` Oh. Oh no. We're just overwriting it. Which means that the DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which means that the buffer will get the first copy of the schema with the first new table, but *none* of the other ones. The solution is to make sure that the buffer has a pointer to the Catalog instead which means the DatabaseBuffer will have the most up to date schema and instead lets only the Catalog handle the schema itself. This commit makes those changes to make sure it works. This was a very very subtle mutability/pointer bug given the intersection of valid borrow checking and some writes making it in, but luckily we caught it. It does mean though that until this fix is in, we can consider changes between the Index PR and now are subtly broken and shouldn't be used for anything beyond writing to a signle table per DB. TL;DR We should ask the Catalog what the schema is as it contains the up to date version of it. Closes #24955
Alternate Title: The DB Schema only ever has one table This is a story of subtle bugs, gnashing of teeth, and hair pulling. Gather round as I tell you the tale of of an Arc that pointed to an outdated schema. In #24954 we introduced an Index for the database as this will allow us to perform faster queries. When we added that code this check was added: ```rust if !self.table_buffers.contains_key(&table_name) { // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog // and we've gotten here, it means we're dropping a write. if let Some(table) = self.db_schema.get_table(&table_name) { self.table_buffers.insert( table_name.clone(), TableBuffer::new(segment_key.clone(), &table.index_columns()), ); } else { return; } } ``` Adding the return there let us continue on with our day and make the tests pass. However, just because these tests passed didn't mean the code was correct as I would soon find out. With a follow up ticket of #24955 created we merged the changes and I began to debug the issue. Note we had the assumption of dropping a single write due to limits because the limits test is what failed. What began was a chase of a few days to prove that the limits weren't what was failing. This was a bit long but the conclusion was that the limits weren't causing it, but it did expose the fact that a Database only ever had one table which was weird. I then began to dig into this a bit more. Why would there only be one table? We weren't just dropping one write, we were dropping all but *one* write or so it seemed. Many printlns/hours later it became clear that we were actually updating the schema! It existed in the Catalog, but not in the pointer to the schema in the DatabaseBuffer struct so what gives? Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541). In the `validate_or_insert_schema_and_partitions` function for the WriteBuffer we have this bit of code: ```rust // The (potentially updated) DatabaseSchema to return to the caller. let mut schema = Cow::Borrowed(schema); ``` As we pass in a reference to the schema in the catalog. However, when we [go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568) we see this code: ```rust let schema = match schema { Cow::Owned(s) => Some(s), Cow::Borrowed(_) => None, }; ``` What this means is that if we make a change we clone the original and update it. We *aren't* making a change to the original schema. When we go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460): ```rust if let Some(schema) = result.schema.take() { debug!("replacing schema for {:?}", schema); catalog.replace_database(sequence, Arc::new(schema))?; } ``` We are updating the catalog with the new schema, but how does that work? ```rust inner.databases.insert(db.name.clone(), db); ``` Oh. Oh no. We're just overwriting it. Which means that the DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which means that the buffer will get the first copy of the schema with the first new table, but *none* of the other ones. The solution is to make sure that the buffer has a pointer to the Catalog instead which means the DatabaseBuffer will have the most up to date schema and instead lets only the Catalog handle the schema itself. This commit makes those changes to make sure it works. This was a very very subtle mutability/pointer bug given the intersection of valid borrow checking and some writes making it in, but luckily we caught it. It does mean though that until this fix is in, we can consider changes between the Index PR and now are subtly broken and shouldn't be used for anything beyond writing to a signle table per DB. TL;DR We should ask the Catalog what the schema is as it contains the up to date version of it. Closes #24955
Alternate Title: The DB Schema only ever has one table This is a story of subtle bugs, gnashing of teeth, and hair pulling. Gather round as I tell you the tale of of an Arc that pointed to an outdated schema. In #24954 we introduced an Index for the database as this will allow us to perform faster queries. When we added that code this check was added: ```rust if !self.table_buffers.contains_key(&table_name) { // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog // and we've gotten here, it means we're dropping a write. if let Some(table) = self.db_schema.get_table(&table_name) { self.table_buffers.insert( table_name.clone(), TableBuffer::new(segment_key.clone(), &table.index_columns()), ); } else { return; } } ``` Adding the return there let us continue on with our day and make the tests pass. However, just because these tests passed didn't mean the code was correct as I would soon find out. With a follow up ticket of #24955 created we merged the changes and I began to debug the issue. Note we had the assumption of dropping a single write due to limits because the limits test is what failed. What began was a chase of a few days to prove that the limits weren't what was failing. This was a bit long but the conclusion was that the limits weren't causing it, but it did expose the fact that a Database only ever had one table which was weird. I then began to dig into this a bit more. Why would there only be one table? We weren't just dropping one write, we were dropping all but *one* write or so it seemed. Many printlns/hours later it became clear that we were actually updating the schema! It existed in the Catalog, but not in the pointer to the schema in the DatabaseBuffer struct so what gives? Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541). In the `validate_or_insert_schema_and_partitions` function for the WriteBuffer we have this bit of code: ```rust // The (potentially updated) DatabaseSchema to return to the caller. let mut schema = Cow::Borrowed(schema); ``` As we pass in a reference to the schema in the catalog. However, when we [go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568) we see this code: ```rust let schema = match schema { Cow::Owned(s) => Some(s), Cow::Borrowed(_) => None, }; ``` What this means is that if we make a change we clone the original and update it. We *aren't* making a change to the original schema. When we go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460): ```rust if let Some(schema) = result.schema.take() { debug!("replacing schema for {:?}", schema); catalog.replace_database(sequence, Arc::new(schema))?; } ``` We are updating the catalog with the new schema, but how does that work? ```rust inner.databases.insert(db.name.clone(), db); ``` Oh. Oh no. We're just overwriting it. Which means that the DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which means that the buffer will get the first copy of the schema with the first new table, but *none* of the other ones. The solution is to make sure that the buffer is passed the current schema so that it can use the most up to date version from the catalog. This commit makes those changes to make sure it works. This was a very very subtle mutability/pointer bug given the intersection of valid borrow checking and some writes making it in, but luckily we caught it. It does mean though that until this fix is in, we can consider changes between the Index PR and now are subtly broken and shouldn't be used for anything beyond writing to a signle table per DB. TL;DR We should ask the Catalog what the schema is as it contains the up to date version of it. Closes #24955
This implements an index for the data in the table buffers. For now, by default, it indexes all tags, keeping a mapping of tag key/value pair to the row ids that it has in the buffer. When queries ask for record batches from the table buffer, the filter expression is evaluated to determine if a record batch can be built on the fly using only the row ids that match the index. If we don't have it in the index, the entire record batch from the buffer will be returned.
This also updates the logic in segment state to only request a record batch with the projection. The query executor was updated so that it pushes the filter and projection down to the request to get table chunks.
While implementing this, I believe I uncovered a bug where when limits are hit, a write still attempts to get buffered. I'll log a follow up to look at that.