Skip to content

Commit 0ed3beb

Browse files
committed
chore(ksql-driver): Select queries for ksql allowed only from Cube Store. In order to query ksql create pre-aggregation first if Kafka download isn't enabled -- remove unloadWithoutTempTable as it's not needed due to readOnly flag on rollups
1 parent 0c2f701 commit 0ed3beb

File tree

3 files changed

+112
-1
lines changed

3 files changed

+112
-1
lines changed

packages/cubejs-ksql-driver/src/KsqlDriver.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,6 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
363363
public capabilities(): DriverCapabilities {
364364
return {
365365
streamingSource: true,
366-
unloadWithoutTempTable: !!this.config.kafkaHost,
367366
};
368367
}
369368
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
cube("RequestsNonReadOnly", {
2+
sql: `select 1 as tenant_id, 1 as deployment_id, 'req-1' as request_id, (NOW() - INTERVAL '1 day')::timestamp as timestamp
3+
UNION ALL
4+
select 2 as tenant_id, 1 as deployment_id, 'req-2' as request_id, (NOW() - INTERVAL '2 day')::timestamp as timestamp
5+
`,
6+
data_source: "postgres",
7+
measures: {
8+
count: {
9+
type: "count",
10+
},
11+
},
12+
dimensions: {
13+
tenant_id: {
14+
sql: `tenant_id`,
15+
type: "number",
16+
},
17+
request_id: {
18+
sql: `request_id`,
19+
type: "string",
20+
},
21+
timestamp: {
22+
sql: `timestamp`,
23+
type: "time",
24+
},
25+
},
26+
pre_aggregations: {
27+
batch_streaming_lambda: {
28+
type: `rollup_lambda`,
29+
rollups: [batch, RequestsNonReadOnlyStream.stream],
30+
},
31+
32+
batch: {
33+
external: true,
34+
type: "rollup",
35+
measures: [count],
36+
dimensions: [tenant_id, request_id, timestamp],
37+
granularity: "day",
38+
time_dimension: RequestsNonReadOnly.timestamp,
39+
partition_granularity: "day",
40+
build_range_start: { sql: "SELECT NOW() - INTERVAL '10 day'" },
41+
build_range_end: { sql: "SELECT NOW()" },
42+
},
43+
},
44+
});
45+
46+
cube("RequestsNonReadOnlyStream", {
47+
dataSource: "ksql",
48+
49+
sql: `SELECT * FROM REQUESTS`,
50+
51+
measures: {
52+
count: {
53+
type: "count",
54+
},
55+
},
56+
dimensions: {
57+
tenant_id: {
58+
sql: `TENANT_ID`,
59+
type: "number",
60+
},
61+
request_id: {
62+
sql: `REQUEST_ID`,
63+
type: "string",
64+
},
65+
timestamp: {
66+
sql: `TIMESTAMP`,
67+
type: "time",
68+
},
69+
},
70+
preAggregations: {
71+
stream: {
72+
streamOffset: "earliest",
73+
type: `rollup`,
74+
measures: [count],
75+
dimensions: [tenant_id, request_id, timestamp],
76+
time_dimension: RequestsNonReadOnlyStream.timestamp,
77+
granularity: "day",
78+
partition_granularity: "day",
79+
build_range_start: { sql: "SELECT DATE_SUB(NOW(), interval '96 hour')" },
80+
build_range_end: { sql: "SELECT NOW()" }
81+
},
82+
},
83+
});

packages/cubejs-testing/test/smoke-lambda.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,35 @@ describe('lambda', () => {
116116
expect(response.loadResponse.results[0].data.length).toEqual(3);
117117
});
118118

119+
test('Query lambda with ksql with create table ', async () => {
120+
const query: Query = {
121+
measures: ['RequestsNonReadOnly.count'],
122+
dimensions: ['RequestsNonReadOnly.tenant_id', 'RequestsNonReadOnly.request_id'],
123+
timeDimensions: [
124+
{
125+
dimension: 'RequestsNonReadOnly.timestamp',
126+
granularity: 'day'
127+
}
128+
],
129+
};
130+
// First call to trigger the pre-aggregation build
131+
await client.load(query);
132+
// We have to wait for cubestore to consume the data from Kafka. There is no way to know when it's done right now.
133+
await pausePromise(5000);
134+
135+
const response = await client.load(query);
136+
137+
// @ts-ignore
138+
expect(response.loadResponse.results[0].data.map(i => i['RequestsNonReadOnly.request_id'])).toEqual([
139+
'req-2',
140+
'req-1',
141+
'req-stream-2'
142+
]);
143+
144+
// @ts-ignore
145+
expect(response.loadResponse.results[0].data.length).toEqual(3);
146+
});
147+
119148
test('query', async () => {
120149
const query: Query = {
121150
measures: ['Orders.count'],

0 commit comments

Comments
 (0)