Skip to content

Commit

Permalink
fixed broadcast streaming mockoon#83
Browse files Browse the repository at this point in the history
  • Loading branch information
isuru89 committed Feb 1, 2024
1 parent dae108b commit d7cdca0
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 52 deletions.
135 changes: 94 additions & 41 deletions packages/commons-server/src/libs/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ import {
} from '../utils';
import { WebSocketResponseRulesInterpreter } from '../ws-response-rules-interpreter';
import { CrudRouteIds, crudRoutesBuilder, databucketActions } from './crud';
import { isWebSocketOpen, messageToString, serveFileContentInWs } from './ws';
import {
BroadcastContext,
DelegatedBroadcastHandler,
isWebSocketOpen,
messageToString,
serveFileContentInWs,
ServerContext
} from './ws';

/**
* Create a server instance from an Environment object.
Expand Down Expand Up @@ -508,17 +515,18 @@ export class MockoonServer extends (EventEmitter as new () => TypedEmitter<Serve
* @param wsRoutes
*/
private createWSRoutes(wsRoutes: Route[]) {
const envPath = this.environment.endpointPrefix
? `/${this.environment.endpointPrefix}`
: '';

wsRoutes.forEach((wsRoute) => {
const webSocketServer = new WebSocket.Server({
noServer: true
noServer: true,
path: `${envPath}/${wsRoute.endpoint}`
});

this.webSocketServers.push(webSocketServer);

const envPath = this.environment.endpointPrefix
? `/${this.environment.endpointPrefix}`
: '';

webSocketServer.on(
'connection',
this.createWebSocketConnectionHandler(webSocketServer, wsRoute)
Expand Down Expand Up @@ -589,15 +597,29 @@ export class MockoonServer extends (EventEmitter as new () => TypedEmitter<Serve
);
});

let streamCloseable: NodeJS.Timeout | null;
// handle common close method.
// There would be more close methods registered, if the route is in streaming mode.
socket.on('close', (code, reason) => {
this.emit(
'ws-closed',
inflightRequest,
code,
reason ? reason.toString('utf8') : null
);
});

// This is not waiting until a messge from client. But will push messages as a stream.
if (
route.streamingMode === StreamingMode.BROADCAST ||
route.streamingMode === StreamingMode.UNICAST
) {
streamCloseable = this.handleTimedWsResponses(
if (route.streamingMode === StreamingMode.BROADCAST) {
this.handleBroadcastResponse(
webSocketServer,
socket,
route,
baseErrorMeta
);

return;
} else if (route.streamingMode === StreamingMode.UNICAST) {
this.handleOneToOneStreamingResponses(
socket,
route,
request,
Expand All @@ -607,20 +629,6 @@ export class MockoonServer extends (EventEmitter as new () => TypedEmitter<Serve
return;
}

socket.on('close', (code, reason) => {
// close any interval data pushes
if (streamCloseable) {
clearInterval(streamCloseable);
}

this.emit(
'ws-closed',
inflightRequest,
code,
reason ? reason.toString('utf8') : null
);
});

socket.on('message', (data, isBinary) => {
if (isBinary) {
this.emit(
Expand All @@ -639,11 +647,11 @@ export class MockoonServer extends (EventEmitter as new () => TypedEmitter<Serve

const enabledRouteResponse = new WebSocketResponseRulesInterpreter(
route.responses,
request,
route.responseMode,
this.environment,
this.processedDatabuckets,
this.globalVariables
this.globalVariables,
request
).chooseResponse(responseNumber, messageData);

if (!enabledRouteResponse) {
Expand Down Expand Up @@ -695,7 +703,7 @@ export class MockoonServer extends (EventEmitter as new () => TypedEmitter<Serve
socket: WebSocket,
route: Route,
enabledRouteResponse: RouteResponse,
request: IncomingMessage,
request?: IncomingMessage,
data?: string
): string | undefined {
let content: any = enabledRouteResponse.body;
Expand Down Expand Up @@ -782,6 +790,53 @@ export class MockoonServer extends (EventEmitter as new () => TypedEmitter<Serve
return content;
}

private handleBroadcastResponse(
webSocketServer: WebSocketServer,
socket: WebSocket,
route: Route,
baseErrorMeta: any
) {
const broadcastContext = BroadcastContext.getInstance();
const handler: DelegatedBroadcastHandler = (
_: number,
enabledRouteResponse: RouteResponse
) => {
// todo
const content =
this.deriveFinalResponseContentForWebSockets(
socket,
route,
enabledRouteResponse
) || '';

if (!content) {
return;
}

const errorMetaData = {
...baseErrorMeta,
selectedResponseUUID: enabledRouteResponse.uuid,
selectedResponseLabel: enabledRouteResponse.label
};

webSocketServer.clients.forEach((client) => {
if (isWebSocketOpen(client)) {
this.serveWsResponse(client, content, errorMetaData);
}
});
};

broadcastContext.registerRoute(
route,
new ServerContext(
this.environment,
this.processedDatabuckets,
this.globalVariables
),
handler
);
}

/**
* Handle streaming websocket responses.
*
Expand All @@ -791,23 +846,22 @@ export class MockoonServer extends (EventEmitter as new () => TypedEmitter<Serve
* @param baseErrorMeta
* @returns setInterval reference, so that it can be cleared when socket closes.
*/
private handleTimedWsResponses(
webSocketServer: WebSocketServer,
private handleOneToOneStreamingResponses(
socket: WebSocket,
route: Route,
request: IncomingMessage,
baseErrorMeta: any
): NodeJS.Timeout | null {
) {
let responseNumber = 1;

const intervalRef = setInterval(() => {
const enabledRouteResponse = new WebSocketResponseRulesInterpreter(
route.responses,
request,
route.responseMode,
this.environment,
this.processedDatabuckets,
this.globalVariables
this.globalVariables,
request
).chooseResponse(responseNumber);

if (!enabledRouteResponse) {
Expand Down Expand Up @@ -838,16 +892,15 @@ export class MockoonServer extends (EventEmitter as new () => TypedEmitter<Serve
if (isWebSocketOpen(socket)) {
this.serveWsResponse(socket, content, errorMetaData);
}
} else if (route.streamingMode === StreamingMode.BROADCAST) {
webSocketServer.clients.forEach((client) => {
if (isWebSocketOpen(client)) {
this.serveWsResponse(client, content, errorMetaData);
}
});
}
}, route.streamingInterval);

return intervalRef;
socket.on('close', () => {
// close any interval data pushes
if (intervalRef) {
clearInterval(intervalRef);
}
});
}

/**
Expand Down
106 changes: 106 additions & 0 deletions packages/commons-server/src/libs/server/ws.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import {
Environment,
ProcessedDatabucket,
Route,
RouteResponse,
ServerErrorCodes,
Expand All @@ -7,9 +9,113 @@ import {
import { readFile } from 'fs';
import TypedEventEmitter from 'typed-emitter';
import { RawData, WebSocket } from 'ws';
import { WebSocketResponseRulesInterpreter } from '../ws-response-rules-interpreter';

export type DelegatedTemplateParser = (content: string) => string;

export type DelegatedBroadcastHandler = (
responseNumber: number,
enabledRouteResponse: RouteResponse
) => void;

export class ServerContext {
constructor(
public environment: Environment,
public processedDatabuckets: ProcessedDatabucket[],
public globalVariables: Record<string, any>
) {}
}

/**
* Represents a single running streaming data for a route.
* For all socket clients, there will be only one single instance.
*/
class WsRunningInstance {
private closeable: NodeJS.Timeout;
constructor(
private route: Route,
private serverContext: ServerContext,
private handler: DelegatedBroadcastHandler
) {}

public run() {
let responseNumber = 1;

this.closeable = setInterval(() => {
const enabledRouteResponse = new WebSocketResponseRulesInterpreter(
this.route.responses,
this.route.responseMode,
this.serverContext.environment,
this.serverContext.processedDatabuckets,
this.serverContext.globalVariables
).chooseResponse(responseNumber);

if (!enabledRouteResponse) {
return;
}

responseNumber += 1;

this.handler(responseNumber, enabledRouteResponse);
}, this.route.streamingInterval);
}

public close() {
if (this.closeable) {
clearInterval(this.closeable);
}
}
}

/**
* Context for all websocket broadcast end points.
* This class holds all streaming data generators per route.
* This guarantees that it creates one and only generator for a given route.
*/
export class BroadcastContext {
private static context: BroadcastContext;

private readonly routeDataGenerators: Map<string, WsRunningInstance> =
new Map<string, WsRunningInstance>();

public static getInstance(): BroadcastContext {
if (!BroadcastContext.context) {
BroadcastContext.context = new BroadcastContext();
}

return BroadcastContext.context;
}

public registerRoute(
route: Route,
serverContext: ServerContext,
nextResponseHandler: DelegatedBroadcastHandler
) {
const ref = this.routeDataGenerators.get(route.endpoint);
if (ref) {
// A reference already exists. So, broadcasting will be done by it.
// No need to create another.
return;
}

const instance = new WsRunningInstance(
route,
serverContext,
nextResponseHandler
);
this.routeDataGenerators.set(route.endpoint, instance);
instance.run();
}

public closeRoute(route: Route) {
const ref = this.routeDataGenerators.get(route.endpoint);
if (ref) {
this.routeDataGenerators.delete(route.endpoint);
ref.close();
}
}
}

/**
* Convert incoming websocket message to string representation.
*
Expand Down
2 changes: 1 addition & 1 deletion packages/commons-server/src/libs/template-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { WebSocketHelpers } from './templating-helpers/web-socket-helpers';

export type WebSocketRequest = {
message?: string;
request: IncomingMessage;
request?: IncomingMessage;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ export const requestHelperNames: (keyof ReturnType<typeof WebSocketHelpers>)[] =
];

export const parseWebSocketMessage = (
request: IncomingMessage,
messageData: string
messageData: string,
request?: IncomingMessage
): any => {
const contentType = request.headers['content-type'];
const contentType = request?.headers['content-type'];
if (contentType) {
if (contentType.includes('application/json')) {
return JSON.parse(messageData);
Expand All @@ -37,15 +37,19 @@ export const parseWebSocketMessage = (
};

export const WebSocketHelpers = function (
request: IncomingMessage,
request?: IncomingMessage,
messageData?: string,
environment?: Environment
) {
if (!request) {
return {};
}

const location = parseUrl(request.url || '', true);

// for simplicity, we suppport only json, xml or raw types only.
const rawMessageBody = messageData || '';
const message = parseWebSocketMessage(request, rawMessageBody);
const message = parseWebSocketMessage(rawMessageBody, request);

return {
// get json property from body
Expand Down

0 comments on commit d7cdca0

Please sign in to comment.