Skip to content

Commit a8a73ec

Browse files
author
Jackson Newhouse
committed
feat(processing_engine): respond to PR feedback, put python code behind system-py feature flag
1 parent db46fca commit a8a73ec

File tree

20 files changed

+289
-428
lines changed

20 files changed

+289
-428
lines changed

Cargo.lock

Lines changed: 0 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ members = [
88
"influxdb3_id",
99
"influxdb3_load_generator",
1010
"influxdb3_process",
11-
"influxdb3_processing_engine",
1211
"influxdb3_py_api",
1312
"influxdb3_server",
1413
"influxdb3_telemetry",
@@ -40,7 +39,7 @@ license = "MIT OR Apache-2.0"
4039

4140
[workspace.dependencies]
4241
anyhow = "1.0"
43-
arrow = { version = "53.0.0", features = ["prettyprint", "chrono-tz", "pyarrow"] }
42+
arrow = { version = "53.0.0", features = ["prettyprint", "chrono-tz"] }
4443
arrow-array = "53.0.0"
4544
arrow-buffer = "53.0.0"
4645
arrow-csv = "53.0.0"

influxdb3/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ tokio_console = ["console-subscriber", "tokio/tracing", "observability_deps/rele
7373

7474
# Use jemalloc as the default allocator.
7575
jemalloc_replacing_malloc = ["influxdb3_process/jemalloc_replacing_malloc"]
76+
system-py = ["influxdb3_write/system-py"]
7677

7778
[dev-dependencies]
7879
# Core Crates

influxdb3_catalog/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ iox_time.workspace = true
1515
# Local deps
1616
influxdb3_id = { path = "../influxdb3_id" }
1717
influxdb3_wal = { path = "../influxdb3_wal" }
18-
influxdb3_processing_engine = { path = "../influxdb3_processing_engine" }
1918

2019
# crates.io dependencies
2120
arrow.workspace = true

influxdb3_catalog/src/catalog.rs

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
//! Implementation of the Catalog that sits entirely in memory.
22
33
use crate::catalog::Error::{
4-
ProcessingEngineCallExists, ProcessingEnginePluginNotFound, ProcessingEngineTriggerExists,
5-
TableNotFound,
4+
ProcessingEngineCallExists, ProcessingEngineTriggerExists, TableNotFound,
65
};
76
use bimap::BiHashMap;
87
use hashbrown::HashMap;
98
use indexmap::IndexMap;
109
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
11-
use influxdb3_processing_engine::processing_engine_plugins::{
12-
ProcessingEnginePlugin, ProcessingEngineTrigger,
13-
};
1410
use influxdb3_wal::{
1511
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions,
1612
FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
@@ -519,9 +515,9 @@ pub struct DatabaseSchema {
519515
/// The database is a map of tables
520516
pub tables: SerdeVecMap<TableId, Arc<TableDefinition>>,
521517
pub table_map: BiHashMap<TableId, Arc<str>>,
522-
pub processing_engine_plugins: HashMap<String, ProcessingEnginePlugin>,
518+
pub processing_engine_plugins: HashMap<String, PluginDefinition>,
523519
// TODO: care about performance of triggers
524-
pub processing_engine_triggers: HashMap<String, ProcessingEngineTrigger>,
520+
pub processing_engine_triggers: HashMap<String, TriggerDefinition>,
525521
pub deleted: bool,
526522
}
527523

@@ -758,21 +754,19 @@ impl UpdateDatabaseSchema for PluginDefinition {
758754
&self,
759755
mut schema: Cow<'a, DatabaseSchema>,
760756
) -> Result<Cow<'a, DatabaseSchema>> {
761-
let plugin: ProcessingEnginePlugin = self.into();
762-
763757
match schema.processing_engine_plugins.get(&self.plugin_name) {
764-
Some(current) if plugin.eq(current) => {}
758+
Some(current) if self.eq(current) => {}
765759
Some(_) => {
766760
return Err(ProcessingEngineCallExists {
767761
database_name: schema.name.to_string(),
768-
call_name: plugin.plugin_name.to_string(),
762+
call_name: self.plugin_name.to_string(),
769763
})
770764
}
771765
None => {
772766
schema
773767
.to_mut()
774768
.processing_engine_plugins
775-
.insert(self.plugin_name.to_string(), plugin);
769+
.insert(self.plugin_name.to_string(), self.clone());
776770
}
777771
}
778772

@@ -785,35 +779,19 @@ impl UpdateDatabaseSchema for TriggerDefinition {
785779
&self,
786780
mut schema: Cow<'a, DatabaseSchema>,
787781
) -> Result<Cow<'a, DatabaseSchema>> {
788-
let trigger_name = self.trigger_name.to_string();
789-
let Some(plugin) = schema
790-
.processing_engine_plugins
791-
.get(&self.plugin_name)
792-
.cloned()
793-
else {
794-
return Err(ProcessingEnginePluginNotFound {
795-
plugin_name: self.plugin_name.to_string(),
796-
database_name: schema.name.to_string(),
797-
});
798-
};
799-
let trigger = ProcessingEngineTrigger {
800-
trigger_name: trigger_name.to_string(),
801-
plugin,
802-
trigger: (&self.trigger).into(),
803-
};
804-
if let Some(current) = schema.processing_engine_triggers.get(&trigger_name) {
805-
if current == &trigger {
782+
if let Some(current) = schema.processing_engine_triggers.get(&self.trigger_name) {
783+
if current == self {
806784
return Ok(schema);
807785
}
808786
return Err(ProcessingEngineTriggerExists {
809787
database_name: schema.name.to_string(),
810-
trigger_name,
788+
trigger_name: self.trigger_name.to_string(),
811789
});
812790
}
813791
schema
814792
.to_mut()
815793
.processing_engine_triggers
816-
.insert(trigger_name, trigger);
794+
.insert(self.trigger_name.to_string(), self.clone());
817795
Ok(schema)
818796
}
819797
}

influxdb3_catalog/src/serialize.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@ use influxdb3_id::ColumnId;
88
use influxdb3_id::DbId;
99
use influxdb3_id::SerdeVecMap;
1010
use influxdb3_id::TableId;
11-
use influxdb3_processing_engine::processing_engine_plugins::{
12-
ProcessingEnginePlugin, ProcessingEngineTrigger,
11+
use influxdb3_wal::{
12+
LastCacheDefinition, LastCacheValueColumnsDef, PluginDefinition, TriggerDefinition,
1313
};
14-
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef};
1514
use schema::InfluxColumnType;
1615
use schema::InfluxFieldType;
1716
use schema::TIME_DATA_TIMEZONE;
@@ -95,14 +94,15 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
9594
.into_iter()
9695
.map(|(name, trigger)| {
9796
// TODO: Decide whether to handle errors
98-
let plugin = processing_engine_plugins
97+
let plugin: PluginDefinition = processing_engine_plugins
9998
.get(&trigger.plugin_name)
10099
.cloned()
101100
.expect("should have plugin");
102101
(
103102
name,
104-
ProcessingEngineTrigger {
103+
TriggerDefinition {
105104
trigger_name: trigger.trigger_name,
105+
plugin_name: plugin.plugin_name.to_string(),
106106
plugin,
107107
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
108108
},
@@ -405,8 +405,8 @@ impl From<TableSnapshot> for TableDefinition {
405405
}
406406
}
407407

408-
impl From<&ProcessingEnginePlugin> for ProcessingEnginePluginSnapshot {
409-
fn from(plugin: &ProcessingEnginePlugin) -> Self {
408+
impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot {
409+
fn from(plugin: &PluginDefinition) -> Self {
410410
Self {
411411
plugin_name: plugin.plugin_name.to_string(),
412412
code: plugin.code.to_string(),
@@ -416,7 +416,7 @@ impl From<&ProcessingEnginePlugin> for ProcessingEnginePluginSnapshot {
416416
}
417417
}
418418

419-
impl From<ProcessingEnginePluginSnapshot> for ProcessingEnginePlugin {
419+
impl From<ProcessingEnginePluginSnapshot> for PluginDefinition {
420420
fn from(plugin: ProcessingEnginePluginSnapshot) -> Self {
421421
Self {
422422
plugin_name: plugin.plugin_type.to_string(),
@@ -427,11 +427,11 @@ impl From<ProcessingEnginePluginSnapshot> for ProcessingEnginePlugin {
427427
}
428428
}
429429

430-
impl From<&ProcessingEngineTrigger> for ProcessingEngineTriggerSnapshot {
431-
fn from(trigger: &ProcessingEngineTrigger) -> Self {
430+
impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
431+
fn from(trigger: &TriggerDefinition) -> Self {
432432
ProcessingEngineTriggerSnapshot {
433433
trigger_name: trigger.trigger_name.to_string(),
434-
plugin_name: trigger.plugin.plugin_name.to_string(),
434+
plugin_name: trigger.plugin_name.to_string(),
435435
trigger_specification: serde_json::to_string(&trigger.trigger)
436436
.expect("should be able to serialize trigger specification"),
437437
}

influxdb3_processing_engine/Cargo.toml

Lines changed: 0 additions & 14 deletions
This file was deleted.

influxdb3_processing_engine/src/lib.rs

Lines changed: 0 additions & 1 deletion
This file was deleted.

influxdb3_processing_engine/src/processing_engine_plugins.rs

Lines changed: 0 additions & 102 deletions
This file was deleted.

influxdb3_py_api/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,20 @@ authors.workspace = true
55
edition.workspace = true
66
license.workspace = true
77

8+
9+
[features]
10+
system-py = ["pyo3"]
811
[dependencies]
912
influxdb3_wal = { path = "../influxdb3_wal" }
1013
influxdb3_catalog = {path = "../influxdb3_catalog"}
11-
influxdb-line-protocol = { workspace = true }
1214
schema = { workspace = true }
1315

1416
[dependencies.pyo3]
1517
version = "0.23.3"
1618
# this is necessary to automatically initialize the Python interpreter
1719
features = ["auto-initialize"]
20+
optional = true
21+
1822

1923
[lints]
2024
workspace = true

0 commit comments

Comments
 (0)