Skip to content

Commit

Permalink
support non live query
Browse files Browse the repository at this point in the history
  • Loading branch information
ryantxu committed Oct 1, 2019
1 parent 6ce847a commit 7c12e13
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
16 changes: 12 additions & 4 deletions src/StreamingListener.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TimeSeriesMessage } from 'types';
import { KeyValue, CircularDataFrame, FieldType, LoadingState } from '@grafana/data';
import { KeyValue, CircularDataFrame, FieldType, LoadingState, DataFrame } from '@grafana/data';
import { Subject, Observable, ReplaySubject } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { DataQueryResponse } from '@grafana/ui';
Expand All @@ -17,7 +17,8 @@ export class StreamListener {
stream?: WebSocketSubject<any>;

constructor(private capacity: number, url?: string) {
if (url) {
if (url && url.length > 4 && url.startsWith('ws')) {
console.log('Connect to:', url);
this.stream = webSocket({
url,
openObserver: {
Expand Down Expand Up @@ -65,6 +66,14 @@ export class StreamListener {
setTimeout(this.dummyValues, 100 + Math.random() * 800); // ~1/sec
};

getAllFrames(): DataFrame[] {
const all: DataFrame[] = [];
for (const v of Object.values(this.byName)) {
all.push(v.frame);
}
return all;
}

getAllObservers(): Array<Observable<DataQueryResponse>> {
const all: Array<Observable<DataQueryResponse>> = [];
for (const v of Object.values(this.byName)) {
Expand Down Expand Up @@ -111,9 +120,8 @@ export class StreamListener {
df.validate();
info.subject.next({
key: info.key,
state: LoadingState.Streaming, // ???
state: LoadingState.Streaming,
data: [df],
});
console.log('PROCESS', msg);
}
}
42 changes: 38 additions & 4 deletions src/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { DataQueryRequest, DataQueryResponse, DataSourceApi, DataSourceInstanceSettings } from '@grafana/ui';

import { StreamingQuery, MyDataSourceOptions } from './types';
import { Observable, of, merge } from 'rxjs';
import { Observable, of, merge, interval } from 'rxjs';
import { map } from 'rxjs/operators';
import { StreamListener } from 'StreamingListener';
import { LoadingState, DataFrame } from '@grafana/data';

export class DataSource extends DataSourceApi<StreamingQuery, MyDataSourceOptions> {
listener?: StreamListener;
Expand All @@ -13,23 +15,54 @@ export class DataSource extends DataSourceApi<StreamingQuery, MyDataSourceOption
}

query(options: DataQueryRequest<StreamingQuery>): Observable<DataQueryResponse> {
if (!this.listener) {
const { listener } = this;
if (!listener) {
throw new Error('missing listener');
}

const isLive = options.rangeRaw && options.rangeRaw!.to === 'now';
if (!isLive) {
let hasStar = false;
let data: DataFrame[] = [];
options.targets.forEach(t => {
if (!t.name || t.name === '*') {
hasStar = true;
} else {
data.push(listener.getOrCreate(t.name).frame);
}
});
if (hasStar) {
data = listener.getAllFrames();
}
return of({ data });
}

let hasStar = false;
let subs: Array<Observable<DataQueryResponse>> = [];
options.targets.forEach(t => {
if (!t.name || t.name === '*') {
hasStar = true;
} else {
subs.push(this.listener!.listen(t.name));
subs.push(listener.listen(t.name));
}
});

if (hasStar) {
subs = this.listener.getAllObservers();
subs = listener.getAllObservers();
}

// Update every 1/2 sec regardless of results
const ping = interval(1000).pipe(
map(v => {
return {
key: 'heartbeat',
state: LoadingState.Streaming,
data: [],
} as DataQueryResponse;
})
);
subs.push(ping);

if (subs.length === 0) {
return of({ data: [] }); // nothing
}
Expand All @@ -41,6 +74,7 @@ export class DataSource extends DataSourceApi<StreamingQuery, MyDataSourceOption

testDatasource() {
// Implement a health check for your data source.

return new Promise((resolve, reject) => {
resolve({
status: 'success',
Expand Down

0 comments on commit 7c12e13

Please sign in to comment.