Skip to content

Commit 0dc65f5

Browse files
committed
create CubeStoreResultWrapper class and switch to lazy evaluation of results set
1 parent 0c0b5fe commit 0dc65f5

File tree

5 files changed

+166
-63
lines changed

5 files changed

+166
-63
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import { getCubestoreResult, ResultRow } from './index';
2+
3+
export class CubeStoreResultWrapper {
4+
private readonly proxy: any;
5+
6+
private cache: any;
7+
8+
public constructor(private readonly nativeReference: any) {
9+
this.proxy = new Proxy(this, {
10+
get: (target, prop: string | symbol) => {
11+
// intercept indexes
12+
if (typeof prop === 'string' && !Number.isNaN(Number(prop))) {
13+
const array = this.getArray();
14+
return array[Number(prop)];
15+
}
16+
17+
// intercept array methods
18+
if (typeof prop === 'string' && prop in Array.prototype) {
19+
const arrayMethod = (Array.prototype as any)[prop];
20+
if (typeof arrayMethod === 'function') {
21+
return (...args: any[]) => this.invokeArrayMethod(prop, ...args);
22+
}
23+
}
24+
25+
// intercept isNative
26+
if (prop === 'isNative') {
27+
return true;
28+
}
29+
30+
// intercept array length
31+
if (prop === 'length') {
32+
return this.getArray().length;
33+
}
34+
35+
// intercept JSON.stringify or toJSON()
36+
if (prop === 'toJSON') {
37+
return () => this.getArray();
38+
}
39+
40+
return (target as any)[prop];
41+
},
42+
43+
// intercept array length
44+
getOwnPropertyDescriptor: (target, prop) => {
45+
if (prop === 'length') {
46+
const array = this.getArray();
47+
return {
48+
configurable: true,
49+
enumerable: true,
50+
value: array.length,
51+
writable: false
52+
};
53+
}
54+
return Object.getOwnPropertyDescriptor(target, prop);
55+
},
56+
57+
ownKeys: (target) => {
58+
const array = this.getArray();
59+
return [...Object.keys(target), ...Object.keys(array), 'length', 'isNative'];
60+
}
61+
});
62+
63+
return this.proxy;
64+
}
65+
66+
private getArray(): ResultRow[] {
67+
if (!this.cache) {
68+
this.cache = getCubestoreResult(this.nativeReference);
69+
}
70+
return this.cache;
71+
}
72+
73+
private invokeArrayMethod(method: string, ...args: any[]): any {
74+
const array = this.getArray();
75+
return (array as any)[method](...args);
76+
}
77+
}

packages/cubejs-backend-native/js/index.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import fs from 'fs';
33
import path from 'path';
44
import { Writable } from 'stream';
55
import type { Request as ExpressRequest } from 'express';
6+
import { CubeStoreResultWrapper } from './CubeStoreResultWrapper';
7+
8+
export * from './CubeStoreResultWrapper';
69

710
export interface BaseMeta {
811
// postgres or mysql
@@ -350,10 +353,17 @@ export const buildSqlAndParams = (cubeEvaluator: any): String => {
350353

351354
export type ResultRow = Record<string, string>;
352355

353-
export const parseCubestoreResultMessage = (message: ArrayBuffer): ResultRow[] => {
356+
export const parseCubestoreResultMessage = (message: ArrayBuffer): CubeStoreResultWrapper => {
357+
const native = loadNative();
358+
359+
const msg = native.parseCubestoreResultMessage(message);
360+
return new CubeStoreResultWrapper(msg);
361+
};
362+
363+
export const getCubestoreResult = (ref: CubeStoreResultWrapper): ResultRow[] => {
354364
const native = loadNative();
355365

356-
return native.parseCubestoreResultMessage(message);
366+
return native.getCubestoreResult(ref);
357367
};
358368

359369
export interface PyConfiguration {

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use cubenativeutils::wrappers::NativeContextHolder;
3232
use cubesqlplanner::cube_bridge::base_query_options::NativeBaseQueryOptions;
3333
use cubesqlplanner::planner::base_query::BaseQuery;
3434

35-
use cubeorchestrator::cubestore_message_parser::parse_cubestore_ws_result;
35+
use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
3636

3737
use cubesql::{telemetry::ReportingLogger, CubeError};
3838

@@ -508,34 +508,41 @@ fn debug_js_to_clrepr_to_js(mut cx: FunctionContext) -> JsResult<JsValue> {
508508

509509
//============ sql orchestrator ===================
510510

511-
fn parse_cubestore_ws_result_message(mut cx: FunctionContext) -> JsResult<JsValue> {
511+
fn parse_cubestore_ws_result_message(mut cx: FunctionContext) -> JsResult<JsBox<CubeStoreResult>> {
512512
let msg = cx.argument::<JsBuffer>(0)?;
513-
let msg_data = msg.as_slice(&cx).to_vec();
514-
match parse_cubestore_ws_result(&msg_data) {
513+
let msg_data = msg.as_slice(&cx);
514+
match CubeStoreResult::new(msg_data) {
515515
Ok(result) => {
516-
let js_array = cx.execute_scoped(|mut cx| {
517-
let js_array = JsArray::new(&mut cx, result.rows.len());
518-
519-
for (i, row) in result.rows.iter().enumerate() {
520-
let js_row = cx.execute_scoped(|mut cx| {
521-
let js_row = JsObject::new(&mut cx);
522-
for (key, value) in result.columns.iter().zip(row.into_iter()) {
523-
let js_value = cx.string(*value);
524-
js_row.set(&mut cx, *key, js_value)?;
525-
}
526-
Ok(js_row)
527-
})?;
528-
529-
js_array.set(&mut cx, i as u32, js_row)?;
530-
}
516+
Ok(cx.boxed(result))
517+
}
518+
Err(err) => cx.throw_error(err.to_string()),
519+
}
520+
}
531521

532-
Ok(js_array)
522+
fn get_cubestore_result(mut cx: FunctionContext) -> JsResult<JsValue> {
523+
let result = cx.argument::<JsBox<CubeStoreResult>>(0)?;
524+
525+
let js_array = cx.execute_scoped(|mut cx| {
526+
let js_array = JsArray::new(&mut cx, result.rows.len());
527+
528+
for (i, row) in result.rows.iter().enumerate() {
529+
let js_row = cx.execute_scoped(|mut cx| {
530+
let js_row = JsObject::new(&mut cx);
531+
for (key, value) in result.columns.iter().zip(row.into_iter()) {
532+
let js_key = cx.string(key);
533+
let js_value = cx.string(value);
534+
js_row.set(&mut cx, js_key, js_value)?;
535+
}
536+
Ok(js_row)
533537
})?;
534538

535-
Ok(js_array.upcast())
539+
js_array.set(&mut cx, i as u32, js_row)?;
536540
}
537-
Err(err) => cx.throw_error(err.to_string()),
538-
}
541+
542+
Ok(js_array)
543+
})?;
544+
545+
Ok(js_array.upcast())
539546
}
540547

541548
pub fn register_module_exports<C: NodeConfiguration + 'static>(
@@ -556,6 +563,7 @@ pub fn register_module_exports<C: NodeConfiguration + 'static>(
556563
"parseCubestoreResultMessage",
557564
parse_cubestore_ws_result_message,
558565
)?;
566+
cx.export_function("getCubestoreResult", get_cubestore_result)?;
559567

560568
crate::template::template_register_module(&mut cx)?;
561569

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use neon::prelude::Finalize;
12
use cubeshared::codegen::{root_as_http_message, HttpCommand};
23

34
#[derive(Debug)]
@@ -23,55 +24,61 @@ impl std::fmt::Display for ParseError {
2324
}
2425
}
2526

26-
pub struct CubeStoreResult<'a> {
27-
pub columns: Vec<&'a str>,
28-
pub rows: Vec<Vec<&'a str>>,
29-
}
30-
3127
impl std::error::Error for ParseError {}
3228

33-
pub fn parse_cubestore_ws_result(msg_data: &[u8]) -> Result<CubeStoreResult, ParseError> {
34-
let http_message = root_as_http_message(msg_data).map_err(|_| ParseError::FlatBufferError)?;
29+
pub struct CubeStoreResult {
30+
pub columns: Vec<String>,
31+
pub rows: Vec<Vec<String>>,
32+
}
3533

36-
let command_type = http_message.command_type();
34+
impl Finalize for CubeStoreResult {}
3735

38-
match command_type {
39-
HttpCommand::HttpError => {
40-
let http_error = http_message
41-
.command_as_http_error()
42-
.ok_or(ParseError::FlatBufferError)?;
43-
let error_message = http_error.error().unwrap_or("Unknown error").to_string();
44-
Err(ParseError::ErrorMessage(error_message))
45-
}
46-
HttpCommand::HttpResultSet => {
47-
let result_set = http_message
48-
.command_as_http_result_set()
49-
.ok_or(ParseError::EmptyResultSet)?;
36+
impl CubeStoreResult {
37+
pub fn new(msg_data: &[u8]) -> Result<Self, ParseError> {
38+
let mut result = CubeStoreResult {
39+
columns: vec![],
40+
rows: vec![],
41+
};
5042

51-
let result_set_columns = result_set.columns().ok_or(ParseError::EmptyResultSet)?;
43+
let http_message = root_as_http_message(msg_data)
44+
.map_err(|_| ParseError::FlatBufferError)?;
5245

53-
if result_set_columns.iter().any(|c| c.is_empty()) {
54-
return Err(ParseError::ColumnNameNotDefined);
46+
match http_message.command_type() {
47+
HttpCommand::HttpError => {
48+
let http_error = http_message
49+
.command_as_http_error()
50+
.ok_or(ParseError::FlatBufferError)?;
51+
let error_message = http_error.error().unwrap_or("Unknown error").to_string();
52+
Err(ParseError::ErrorMessage(error_message))
5553
}
54+
HttpCommand::HttpResultSet => {
55+
let result_set = http_message
56+
.command_as_http_result_set()
57+
.ok_or(ParseError::EmptyResultSet)?;
5658

57-
let result_set_rows = result_set.rows().ok_or(ParseError::EmptyResultSet)?;
58-
let mut result = CubeStoreResult{
59-
columns: result_set_columns.iter().collect(),
60-
rows: Vec::with_capacity(result_set_rows.len())
61-
};
59+
let result_set_columns = result_set.columns().ok_or(ParseError::EmptyResultSet)?;
6260

63-
for row in result_set_rows.iter() {
64-
let values = row.values().ok_or(ParseError::NullRow)?;
65-
let row_obj: Vec<_> = values
66-
.iter()
67-
.map(|val| val.string_value().unwrap_or(""))
68-
.collect();
61+
if result_set_columns.iter().any(|c| c.is_empty()) {
62+
return Err(ParseError::ColumnNameNotDefined);
63+
}
6964

70-
result.rows.push(row_obj);
71-
}
65+
result.columns = result_set_columns.iter().map(|val| val.to_owned()).collect();
66+
let result_set_rows = result_set.rows().ok_or(ParseError::EmptyResultSet)?;
67+
result.rows = Vec::with_capacity(result_set_rows.len());
68+
69+
for row in result_set_rows.iter() {
70+
let values = row.values().ok_or(ParseError::NullRow)?;
71+
let row_obj: Vec<_> = values
72+
.iter()
73+
.map(|val| val.string_value().unwrap_or("").to_owned())
74+
.collect();
75+
76+
result.rows.push(row_obj);
77+
}
7278

73-
Ok(result)
79+
Ok(result)
80+
},
81+
_ => Err(ParseError::UnsupportedCommand),
7482
}
75-
_ => Err(ParseError::UnsupportedCommand),
7683
}
7784
}

0 commit comments

Comments
 (0)