Skip to content

Commit

Permalink
with websocket URL
Browse files Browse the repository at this point in the history
  • Loading branch information
ryantxu committed Sep 30, 2019
1 parent 95c63de commit a159bd4
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 52 deletions.
14 changes: 7 additions & 7 deletions src/ConfigEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ interface State {}
export class ConfigEditor extends PureComponent<Props, State> {
componentDidMount() {}

onAPIKeyChange = (event: ChangeEvent<HTMLInputElement>) => {
onURLChange = (event: ChangeEvent<HTMLInputElement>) => {
const { onOptionsChange, options } = this.props;
const jsonData = {
...options.jsonData,
apiKey: event.target.value,
};
onOptionsChange({ ...options, jsonData });
// const jsonData = {
// ...options.jsonData,
// apiKey: event.target.value,
// };
onOptionsChange({ ...options, url: event.target.value });
};

render() {
Expand All @@ -27,7 +27,7 @@ export class ConfigEditor extends PureComponent<Props, State> {
return (
<div className="gf-form-group">
<div className="gf-form">
<FormField label="API Key" labelWidth={6} onChange={this.onAPIKeyChange} value={jsonData.apiKey || ''} placeholder="Your API key" />
<FormField label="Web Socket URL" labelWidth={10} onChange={this.onURLChange} value={jsonData.apiKey || ''} placeholder="Websocket URL" />
</div>
</div>
);
Expand Down
21 changes: 7 additions & 14 deletions src/QueryEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,27 @@ import defaults from 'lodash/defaults';
import React, { PureComponent, ChangeEvent } from 'react';
import { FormField, QueryEditorProps } from '@grafana/ui';
import { DataSource } from './DataSource';
import { MyQuery, MyDataSourceOptions, defaultQuery } from './types';
import { StreamingQuery, MyDataSourceOptions } from './types';

type Props = QueryEditorProps<DataSource, MyQuery, MyDataSourceOptions>;
type Props = QueryEditorProps<DataSource, StreamingQuery, MyDataSourceOptions>;

interface State {}

export class QueryEditor extends PureComponent<Props, State> {
onComponentDidMount() {}

onQueryTextChange = (event: ChangeEvent<HTMLInputElement>) => {
onNameChange = (event: ChangeEvent<HTMLInputElement>) => {
const { onChange, query } = this.props;
onChange({ ...query, queryText: event.target.value });
};

onConstantChange = (event: ChangeEvent<HTMLInputElement>) => {
const { onChange, query, onRunQuery } = this.props;
onChange({ ...query, constant: parseFloat(event.target.value) });
onRunQuery(); // executes the query
onChange({ ...query, name: event.target.value });
};

render() {
const query = defaults(this.props.query, defaultQuery);
const { queryText, constant } = query;
const query = defaults(this.props.query, {});
const name = query.name || '';

return (
<div className="gf-form">
<FormField width={4} value={constant} onChange={this.onConstantChange} label="Constant" type="number" step="0.1"></FormField>
<FormField labelWidth={8} value={queryText || ''} onChange={this.onQueryTextChange} label="Query Text" tooltip="Not used yet"></FormField>
<FormField labelWidth={8} value={name} onChange={this.onNameChange} label="Name"></FormField>
</div>
);
}
Expand Down
78 changes: 78 additions & 0 deletions src/StreamingListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { TimeSeriesMessage } from 'types';
import { KeyValue, CircularDataFrame, FieldType, LoadingState } from '@grafana/data';
import { Subject, Observable, ReplaySubject } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { DataQueryResponse } from '@grafana/ui';

let counter = 100;

interface FrameInfo {
key: string;
subject: Subject<DataQueryResponse>;
frame: CircularDataFrame;
}

export class StreamListener {
byName: KeyValue<FrameInfo> = {};
stream: WebSocketSubject<any>;

constructor(private capacity: number, url: string) {
this.stream = webSocket(url);
this.stream.subscribe({
next: (msg: any) => {
console.log('GOT', msg);
},
});
}

getAllObservers(): Array<Observable<DataQueryResponse>> {
const all: Array<Observable<DataQueryResponse>> = [];
for (const v of Object.values(this.byName)) {
all.push(v.subject);
}
return all;
}

getOrCreate(name: string): FrameInfo {
const info = this.byName[name];
if (info) {
return info;
}
const frame = new CircularDataFrame({ capacity: this.capacity });
frame.name = name;
return (this.byName[name] = {
subject: new ReplaySubject(1),
frame,
key: 'S' + counter++,
});
}

listen(name: string): Observable<DataQueryResponse> {
return this.getOrCreate(name).subject;
}

process(msg: TimeSeriesMessage) {
const info = this.getOrCreate(msg.name);
const df = info.frame;
if (!df.fields.length) {
df.addField({ name: 'time', type: FieldType.time, config: { title: 'Time' } });
const f = df.addFieldFor(msg.value, 'value');
f.config.title = msg.name;
}
if (msg.config && df.fields[1].name === 'value') {
const f = df.fields[1];
f.config = { title: msg.name, ...msg.config };
}
if (!msg.time) {
msg.time = Date.now();
}
df.values.time.add(msg.time);
df.values.value.add(msg.value);
df.validate();
info.subject.next({
key: info.key,
state: LoadingState.Streaming, // ???
data: [df],
});
}
}
54 changes: 32 additions & 22 deletions src/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,48 @@
import defaults from 'lodash/defaults';

import { DataQueryRequest, DataQueryResponse, DataSourceApi, DataSourceInstanceSettings } from '@grafana/ui';

import { MyQuery, MyDataSourceOptions, defaultQuery } from './types';
import { MutableDataFrame, FieldType } from '@grafana/data';
import { StreamingQuery, MyDataSourceOptions } from './types';
import { Observable, of, merge } from 'rxjs';
import { StreamListener } from 'StreamingListener';

export class DataSource extends DataSourceApi<StreamingQuery, MyDataSourceOptions> {
listener?: StreamListener;

export class DataSource extends DataSourceApi<MyQuery, MyDataSourceOptions> {
constructor(instanceSettings: DataSourceInstanceSettings<MyDataSourceOptions>) {
super(instanceSettings);
if (instanceSettings.url) {
this.listener = new StreamListener(1000, instanceSettings.url);
}
}

query(options: DataQueryRequest<MyQuery>): Promise<DataQueryResponse> {
const { range } = options;
const from = range.from.valueOf();
const to = range.to.valueOf();

// Return a constant for each query
const data = options.targets.map(target => {
const query = defaults(target, defaultQuery);
return new MutableDataFrame({
refId: query.refId,
fields: [
{ name: 'Time', values: [from, to], type: FieldType.time },
{ name: 'Value', values: [query.constant, query.constant], type: FieldType.number },
],
});
query(options: DataQueryRequest<StreamingQuery>): Observable<DataQueryResponse> {
if (!this.listener) {
throw new Error('missing listener');
}

let hasStar = false;
let subs: Array<Observable<DataQueryResponse>> = [];
options.targets.forEach(t => {
if (!t.name || t.name === '*') {
hasStar = true;
} else {
subs.push(this.listener!.listen(t.name));
}
});

return Promise.resolve({ data });
if (hasStar) {
subs = this.listener.getAllObservers();
}
if (subs.length === 0) {
return of({ data: [] }); // nothing
}
if (subs.length === 1) {
return subs[0];
}
return merge(...subs);
}

testDatasource() {
// Implement a health check for your data source.

return new Promise((resolve, reject) => {
resolve({
status: 'success',
Expand Down
4 changes: 2 additions & 2 deletions src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { DataSourcePlugin } from '@grafana/ui';
import { DataSource } from './DataSource';
import { ConfigEditor } from './ConfigEditor';
import { QueryEditor } from './QueryEditor';
import { MyQuery, MyDataSourceOptions } from './types';
import { StreamingQuery, MyDataSourceOptions } from './types';

export const plugin = new DataSourcePlugin<DataSource, MyQuery, MyDataSourceOptions>(DataSource)
export const plugin = new DataSourcePlugin<DataSource, StreamingQuery, MyDataSourceOptions>(DataSource)
.setConfigEditor(ConfigEditor)
.setQueryEditor(QueryEditor);
17 changes: 10 additions & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import { DataQuery, DataSourceJsonData } from '@grafana/ui';
import { FieldConfig } from '@grafana/data';

export interface MyQuery extends DataQuery {
queryText?: string;
constant: number;
export interface StreamingQuery extends DataQuery {
name?: string;
}

export const defaultQuery: Partial<MyQuery> = {
constant: 6.5,
};

/**
* These are options configured for each DataSource instance
*/
export interface MyDataSourceOptions extends DataSourceJsonData {
apiKey?: string;
}

export interface TimeSeriesMessage {
name: string; // Name of the field
config?: FieldConfig; // optionally include field config
time?: number;
value?: any;
}

0 comments on commit a159bd4

Please sign in to comment.