From f23aa0d4601225cd1f0e95521cbcd894fd75ceb1 Mon Sep 17 00:00:00 2001 From: praveen-influx Date: Tue, 3 Dec 2024 15:16:28 +0000 Subject: [PATCH] feat: add method to create RecordBatch in SysEventStore (#25610) - This commit allows `RecordBatch` to be created directly from event store. It means we can avoid cloning events and avoids creating intermediate vec. To achieve that, there's a new method `as_record_batch` that's been added with a trait bound `ToRecordBatch` that events are expected to implement. - Minor tidy ups (renaming methods) and added test closes: https://github.com/influxdata/influxdb/issues/25609 --- influxdb3_sys_events/src/lib.rs | 189 ++++++++++++++++++++++++++++---- 1 file changed, 165 insertions(+), 24 deletions(-) diff --git a/influxdb3_sys_events/src/lib.rs b/influxdb3_sys_events/src/lib.rs index 60d525710e9..bb8459fd4d4 100644 --- a/influxdb3_sys_events/src/lib.rs +++ b/influxdb3_sys_events/src/lib.rs @@ -5,15 +5,33 @@ use std::{ sync::Arc, }; +use arrow::{datatypes::Schema, error::ArrowError}; +use arrow_array::RecordBatch; use dashmap::DashMap; use iox_time::TimeProvider; const MAX_CAPACITY: usize = 1000; +/// This trait is not dyn compatible +pub trait ToRecordBatch { + /// set the schema for the event + fn schema() -> Schema; + /// takes reference to `RingBuffer` and creates `RecordBatch` for the events + /// in the buffer. + fn to_record_batch( + items: Option<&RingBuffer>>, + ) -> Option>; +} + /// This store captures the events for different types of instrumentation. /// It is backed by a ring buffer per event type. Every new event type that -/// is added can call [`SysEventStore::add`] directly. And in order to find -/// all the events per event type [`SysEventStore::query`] method can be used. +/// can be added by calling [`SysEventStore::record`]. And in order to find +/// all the events per event type [`SysEventStore::as_vec`] method can +/// be used. This returns a `Vec>` which internally clones to get +/// values out of the `Ref` guard. There is a convenient method, +/// [`SysEventStore::as_record_batch`] in order to get a record batch directly +/// avoiding clones. +/// /// Every time a new event is introduced, the system table had to be setup /// following the same pattern as in `influxdb3_server::system_tables` #[derive(Debug)] @@ -30,7 +48,8 @@ impl SysEventStore { } } - pub fn add(&self, val: E) + /// records an event by adding it to this event store + pub fn record(&self, val: E) where E: 'static + Debug + Sync + Send, { @@ -43,37 +62,57 @@ impl SysEventStore { .entry(TypeId::of::>>()) .or_insert_with(|| Box::new(RingBuffer::>::new(MAX_CAPACITY))); + // unwrap here is fine, we just used the same type above for + // get or insert buf.downcast_mut::>>() .unwrap() .push(wrapped); } - pub fn query(&self) -> Vec> + /// Creates an intermediate `Vec` by cloning events. To + /// create a record batch instead use [`Self::as_record_batch`] + pub fn as_vec(&self) -> Vec> where E: 'static + Clone + Debug + Sync + Send, { - let mut vec = vec![]; - if let Some(buf) = self.events.get(&TypeId::of::>>()) { - let iter = buf - .downcast_ref::>>() - .unwrap() - .in_order(); - for i in iter { - vec.push(i.clone()); - } - }; - vec + self.events + .get(&TypeId::of::>>()) + .map(|buf| { + // unwrap here is fine, we just used the same type above to + // get + buf.downcast_ref::>>() + .unwrap() + .in_order() + .cloned() + .collect() + }) + .unwrap_or_default() + } + + /// Creates record batch for given event type `E`, this avoids + /// any unnecessary allocation but events need to implement + /// [`ToRecordBatch`] trait + pub fn as_record_batch(&self) -> Option> + where + E: 'static + Clone + Debug + Sync + Send + ToRecordBatch, + { + let map_ref = self.events.get(&TypeId::of::>>()); + let buf_ref = map_ref + .as_ref() + // unwrap here is fine, we just used the same type above to get + .map(|buf| buf.downcast_ref::>>().unwrap()); + E::to_record_batch(buf_ref) } } -struct RingBuffer { +pub struct RingBuffer { buf: Vec, max: usize, write_index: usize, } impl RingBuffer { - pub fn new(capacity: usize) -> Self { + fn new(capacity: usize) -> Self { Self { buf: Vec::with_capacity(capacity), max: capacity, @@ -81,7 +120,7 @@ impl RingBuffer { } } - pub fn push(&mut self, val: T) { + fn push(&mut self, val: T) { if !self.reached_max() { self.buf.push(val); } else { @@ -118,10 +157,16 @@ impl Event { mod tests { use std::sync::Arc; + use arrow::{ + array::{StringViewBuilder, StructBuilder, UInt64Builder}, + datatypes::{DataType, Field, Fields, Schema}, + error::ArrowError, + }; + use arrow_array::{ArrayRef, RecordBatch}; use iox_time::{MockProvider, Time}; use observability_deps::tracing::debug; - use crate::{RingBuffer, SysEventStore}; + use crate::{Event, RingBuffer, SysEventStore, ToRecordBatch, MAX_CAPACITY}; #[allow(dead_code)] #[derive(Default, Clone, Debug)] @@ -132,6 +177,57 @@ mod tests { pub random_name: String, } + impl ToRecordBatch for SampleEvent1 { + fn to_record_batch( + items: Option<&RingBuffer>>, + ) -> Option> { + items.map(|buf| { + let iter = buf.in_order(); + let mut event_time_arr = StringViewBuilder::with_capacity(MAX_CAPACITY); + let mut struct_builder = StructBuilder::from_fields( + vec![ + Field::new("time_taken", DataType::UInt64, false), + Field::new("total_fetched", DataType::UInt64, false), + ], + MAX_CAPACITY, + ); + for event in iter { + event_time_arr.append_value("2024-12-01T23:59:59.000Z"); + let time_taken_builder = + struct_builder.field_builder::(0).unwrap(); + time_taken_builder.append_value(event.data.time_taken); + + let num_files_fetched_builder = + struct_builder.field_builder::(1).unwrap(); + num_files_fetched_builder.append_value(event.data.total_fetched); + + struct_builder.append(true); + } + + let columns: Vec = vec![ + Arc::new(event_time_arr.finish()), + Arc::new(struct_builder.finish()), + ]; + RecordBatch::try_new(Arc::new(Self::schema()), columns) + }) + } + + fn schema() -> Schema { + let columns = vec![ + Field::new("event_time", DataType::Utf8View, false), + Field::new( + "event_data", + DataType::Struct(Fields::from(vec![ + Field::new("time_taken", DataType::UInt64, false), + Field::new("total_fetched", DataType::UInt64, false), + ])), + false, + ), + ]; + Schema::new(columns) + } + } + #[allow(dead_code)] #[derive(Default, Clone, Debug)] struct SampleEvent2 { @@ -208,18 +304,63 @@ mod tests { let time_provider = MockProvider::new(Time::from_timestamp_nanos(100)); let event_store = SysEventStore::new(Arc::new(time_provider)); - event_store.add(event_data); + event_store.record(event_data); - event_store.add(event_data2); - event_store.add(event_data3); + event_store.record(event_data2); + event_store.record(event_data3); assert_eq!(2, event_store.events.len()); - let all_events = event_store.query::(); + let all_events = event_store.as_vec::(); assert_eq!(2, all_events.len()); debug!(all_events = ?all_events, "all events in sys events for type SampleEvent1"); - let all_events = event_store.query::(); + let all_events = event_store.as_vec::(); assert_eq!(1, all_events.len()); debug!(all_events = ?all_events, "all events in sys events for type SampleEvent2"); } + + #[test_log::test(test)] + fn test_event_store_2() { + let event_data = SampleEvent1 { + start_time: 0, + time_taken: 10, + total_fetched: 10, + random_name: "foo".to_owned(), + }; + + let event_data2 = SampleEvent2 { + start_time: 0, + time_taken: 10, + generation_id: 100, + }; + + let event_data3 = SampleEvent1 { + start_time: 0, + time_taken: 10, + total_fetched: 10, + random_name: "boo".to_owned(), + }; + + let time_provider = MockProvider::new(Time::from_timestamp_nanos(100)); + + let event_store = SysEventStore::new(Arc::new(time_provider)); + event_store.record(event_data); + + event_store.record(event_data2); + event_store.record(event_data3); + assert_eq!(2, event_store.events.len()); + + let all_events = event_store.as_record_batch::(); + assert_eq!( + 2, + all_events + .as_ref() + .unwrap() + .as_ref() + .unwrap() + .columns() + .len() + ); + debug!(all_events = ?all_events, "all SampleEvent1 events as record batch"); + } }