Skip to content

Commit

Permalink
feat: sys events store added (#25603)
Browse files Browse the repository at this point in the history
This commit introduces basic store for sys events and the backing ring
buffer. Since the buffer needs to hold arbitrary data, it uses `Box<dyn
Any>`

closes: #25581
  • Loading branch information
praveen-influx authored Dec 2, 2024
1 parent 81d1ff1 commit 43755c2
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 1 deletion.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ influxdb3_server = { path = "../influxdb3_server" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }

# Crates.io dependencies
anyhow.workspace = true
Expand Down
5 changes: 4 additions & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use influxdb3_server::{
query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl},
serve, CommonServerState,
};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
Expand Down Expand Up @@ -381,9 +382,10 @@ pub async fn command(config: Config) -> Result<()> {
// Construct a token to trigger clean shutdown
let frontend_shutdown = CancellationToken::new();

let time_provider = Arc::new(SystemProvider::new());
let sys_events_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider) as _));
let object_store: Arc<dyn ObjectStore> =
make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?;
let time_provider = Arc::new(SystemProvider::new());

let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache {
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
Expand Down Expand Up @@ -516,6 +518,7 @@ pub async fn command(config: Config) -> Result<()> {
concurrent_query_limit: 10,
query_log_size: config.query_log_size,
telemetry_store: Arc::clone(&telemetry_store),
sys_events_store: Arc::clone(&sys_events_store),
}));

let listener = TcpListener::bind(*config.http_bind_address)
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_wal = { path = "../influxdb3_wal"}
influxdb3_write = { path = "../influxdb3_write" }
iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }

# crates.io Dependencies
Expand Down
5 changes: 5 additions & 0 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ mod tests {
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_id::{DbId, TableId};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::WalConfig;
use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle;
Expand Down Expand Up @@ -809,6 +810,9 @@ mod tests {
.unwrap(),
);

let sys_events_store = Arc::new(SysEventStore::new(Arc::<MockProvider>::clone(
&time_provider,
)));
let parquet_metrics_provider: Arc<PersistedFiles> =
Arc::clone(&write_buffer_impl.persisted_files());
let sample_telem_store =
Expand All @@ -830,6 +834,7 @@ mod tests {
concurrent_query_limit: 10,
query_log_size: 10,
telemetry_store: Arc::clone(&sample_telem_store),
sys_events_store: Arc::clone(&sys_events_store),
});

// bind to port 0 will assign a random available port:
Expand Down
13 changes: 13 additions & 0 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::last_cache::LastCacheFunction;
use influxdb3_write::WriteBuffer;
Expand Down Expand Up @@ -56,6 +57,7 @@ pub struct QueryExecutorImpl {
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
query_log: Arc<QueryLog>,
telemetry_store: Arc<TelemetryStore>,
sys_events_store: Arc<SysEventStore>,
}

/// Arguments for [`QueryExecutorImpl::new`]
Expand All @@ -69,6 +71,7 @@ pub struct CreateQueryExecutorArgs {
pub concurrent_query_limit: usize,
pub query_log_size: usize,
pub telemetry_store: Arc<TelemetryStore>,
pub sys_events_store: Arc<SysEventStore>,
}

impl QueryExecutorImpl {
Expand All @@ -82,6 +85,7 @@ impl QueryExecutorImpl {
concurrent_query_limit,
query_log_size,
telemetry_store,
sys_events_store,
}: CreateQueryExecutorArgs,
) -> Self {
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
Expand All @@ -102,6 +106,7 @@ impl QueryExecutorImpl {
query_execution_semaphore,
query_log,
telemetry_store,
sys_events_store,
}
}
}
Expand Down Expand Up @@ -341,6 +346,7 @@ impl QueryDatabase for QueryExecutorImpl {
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
Arc::clone(&self.query_log),
Arc::clone(&self.sys_events_store),
))))
}

Expand Down Expand Up @@ -373,11 +379,13 @@ impl Database {
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_log: Arc<QueryLog>,
sys_events_store: Arc<SysEventStore>,
) -> Self {
let system_schema_provider = Arc::new(SystemSchemaProvider::new(
Arc::clone(&db_schema),
Arc::clone(&query_log),
Arc::clone(&write_buffer),
Arc::clone(&sys_events_store),
));
Self {
db_schema,
Expand Down Expand Up @@ -617,6 +625,7 @@ mod tests {
use futures::TryStreamExt;
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
Expand Down Expand Up @@ -698,6 +707,9 @@ mod tests {

let persisted_files: Arc<PersistedFiles> = Arc::clone(&write_buffer_impl.persisted_files());
let telemetry_store = TelemetryStore::new_without_background_runners(persisted_files);
let sys_events_store = Arc::new(SysEventStore::new(Arc::<MockProvider>::clone(
&time_provider,
)));
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
let metrics = Arc::new(Registry::new());
let datafusion_config = Arc::new(Default::default());
Expand All @@ -710,6 +722,7 @@ mod tests {
concurrent_query_limit: 10,
query_log_size: 10,
telemetry_store,
sys_events_store,
});

(write_buffer, query_executor, time_provider)
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_server/src/system_tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{any::Any, collections::HashMap, sync::Arc};

use datafusion::{catalog::SchemaProvider, datasource::TableProvider, error::DataFusionError};
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_sys_events::SysEventStore;
use influxdb3_write::WriteBuffer;
use iox_query::query_log::QueryLog;
use iox_system_tables::SystemTableProvider;
Expand Down Expand Up @@ -45,6 +46,7 @@ impl SystemSchemaProvider {
db_schema: Arc<DatabaseSchema>,
query_log: Arc<QueryLog>,
buffer: Arc<dyn WriteBuffer>,
_sys_events_store: Arc<SysEventStore>,
) -> Self {
let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new();
let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new(
Expand Down
25 changes: 25 additions & 0 deletions influxdb3_sys_events/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "influxdb3_sys_events"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true


[dependencies]
# core crates
iox_time.workspace = true
iox_system_tables.workspace = true
observability_deps.workspace = true

# crates.io deps
arrow.workspace = true
arrow-array.workspace = true
async-trait.workspace = true
chrono.workspace = true
dashmap.workspace = true
datafusion.workspace = true

[dev-dependencies]
test-log.workspace = true
proptest.workspace = true
Loading

0 comments on commit 43755c2

Please sign in to comment.