Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(microservices): prevent grpc write promise from throwing #13368

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import { INestApplication } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { fail } from 'assert';
import { expect } from 'chai';
import { expect, use } from 'chai';
import * as chaiAsPromised from 'chai-as-promised';
import { join } from 'path';
import * as sinon from 'sinon';
import * as request from 'supertest';
import { GrpcController } from '../src/grpc/grpc.controller';

use(chaiAsPromised);

describe('GRPC transport', () => {
let server;
let app: INestApplication;
Expand All @@ -32,6 +36,7 @@ describe('GRPC transport', () => {
],
},
});

// Start gRPC microservice
await app.startAllMicroservices();
await app.init();
Expand Down Expand Up @@ -149,6 +154,50 @@ describe('GRPC transport', () => {
expect(receivedIds).to.deep.equal(expectedIds);
});

describe('streaming calls that error', () => {
// We want to assert that the application does not crash when an error is encountered with an unhandledRejection
// the best way to do that is to listen for the unhandledRejection event and fail the test if it is called
let processSpy: sinon.SinonSpy;

beforeEach(() => {
processSpy = sinon.spy();
process.on('unhandledRejection', processSpy);
});

afterEach(() => {
process.off('unhandledRejection', processSpy);
});

it('should not crash when replying with an error', async () => {
const call = new Promise<void>((resolve, reject) => {
const stream = client.streamDivide({
data: [{ dividend: 1, divisor: 0 }],
});

stream.on('data', () => {
fail('Stream should not have emitted any data');
});

stream.on('error', err => {
if (err.code !== GRPC.status.CANCELLED) {
reject(err);
}
});

stream.on('end', () => {
resolve();
});
});

await expect(call).to.eventually.be.rejectedWith(
'3 INVALID_ARGUMENT: dividing by 0 is not possible',
);

// if this fails the application has crashed
expect(processSpy.called).to.be.false;
});
});

after(async () => {
await app.close();
});
Expand Down
13 changes: 12 additions & 1 deletion integration/microservices/src/grpc/grpc.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
RpcException,
} from '@nestjs/microservices';
import { join } from 'path';
import { Observable, of, catchError } from 'rxjs';
import { Observable, of, catchError, from, mergeMap } from 'rxjs';

class ErrorHandlingProxy extends ClientGrpcProxy {
serializeError(err) {
Expand Down Expand Up @@ -107,6 +107,17 @@ export class GrpcController {
};
}

// contrived example meant to show when an error is encountered, like dividing by zero, the
// application does not crash and the error is returned appropriately to the client
@GrpcMethod('Math', 'StreamDivide')
streamDivide({
data,
}: {
data: { dividend: number; divisor: number }[];
}): Observable<any> {
return from(data).pipe(mergeMap(request => this.divide(request)));
}

@GrpcMethod('Math2')
async sum2({ data }: { data: number[] }): Promise<any> {
return of({
Expand Down
12 changes: 11 additions & 1 deletion integration/microservices/src/grpc/math.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ service Math {
rpc SumStream(stream RequestSum) returns(stream SumResult);
rpc SumStreamPass(stream RequestSum) returns(stream SumResult);
rpc Divide (RequestDivide) returns (DivideResult);
rpc StreamLargeMessages(Empty) returns (stream BackpressureData) {}
rpc StreamLargeMessages(Empty) returns (stream BackpressureData);
/* Given a series of dividend and divisor, stream back the division results for each */
rpc StreamDivide (StreamDivideRequest) returns (stream StreamDivideResponse);
}

message BackpressureData {
Expand All @@ -33,3 +35,11 @@ message RequestDivide {
message DivideResult {
int32 result = 1;
}

message StreamDivideRequest {
repeated RequestDivide data = 1;
}

message StreamDivideResponse {
DivideResult data = 1;
}
32 changes: 10 additions & 22 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
return async (call: GrpcCall, callback: Function) => {
const handler = methodHandler(call.request, call.metadata, call);
const result$ = this.transformToObservable(await handler);

try {
await this.writeObservableToGrpc(result$, call);
} catch (err) {
call.emit('error', err);
return;
}
await this.writeObservableToGrpc(result$, call);
};
}

Expand All @@ -261,11 +255,13 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
* @param call The GRPC call we want to write to.
* @returns A promise that resolves when we're done writing to the call.
*/
public writeObservableToGrpc<T>(
private writeObservableToGrpc<T>(
source: Observable<T>,
call: GrpcCall<T>,
): Promise<void> {
return new Promise((resolve, reject) => {
// this promise should **not** reject, as we're handling errors in the observable for the Call
// the promise is only needed to signal when writing/draining has been completed
return new Promise((resolve, _doNotUse) => {
const valuesWaitingToBeDrained: T[] = [];
let shouldErrorAfterDraining = false;
let error: any;
Expand All @@ -278,17 +274,14 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
// If the call is cancelled, unsubscribe from the source
const cancelHandler = () => {
subscription.unsubscribe();
// The call has been cancelled, so we need to either resolve
// or reject the promise. We're resolving in this case because
// rejection is noisy. If at any point in the future, we need to
// know that cancellation happened, we can either reject or
// start resolving with some sort of outcome value.
// Calls that are cancelled by the client should be successfully resolved here
resolve();
};
call.on(CANCEL_EVENT, cancelHandler);
subscription.add(() => call.off(CANCEL_EVENT, cancelHandler));

// In all cases, when we finalize, end the writable stream
// being careful that errors and writes must be emitted _before_ this call is ended
subscription.add(() => call.end());

const drain = () => {
Expand All @@ -312,7 +305,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
} else if (shouldErrorAfterDraining) {
call.emit('error', error);
subscription.unsubscribe();
reject(error);
resolve();
}
};

Expand All @@ -337,7 +330,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
// reject and teardown.
call.emit('error', err);
subscription.unsubscribe();
reject(err);
resolve();
} else {
// We're waiting for a drain event, record the
// error so it can be handled after everything is drained.
Expand Down Expand Up @@ -386,12 +379,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
const handler = methodHandler(req.asObservable(), call.metadata, call);
const res = this.transformToObservable(await handler);
if (isResponseStream) {
try {
await this.writeObservableToGrpc(res, call);
} catch (err) {
call.emit('error', err);
return;
}
await this.writeObservableToGrpc(res, call);
} else {
const response = await lastValueFrom(
res.pipe(
Expand Down
21 changes: 21 additions & 0 deletions packages/microservices/test/server/server-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ describe('ServerGrpc', () => {
const fn = server.createStreamServiceMethod(sinon.spy());
expect(fn).to.be.a('function');
});

describe('on call', () => {
it('should call native method', async () => {
const call = {
Expand All @@ -392,6 +393,26 @@ describe('ServerGrpc', () => {
expect(call.off.calledWith('cancelled')).to.be.true;
});

it('should handle error thrown in handler', async () => {
const call = {
write: sinon.spy(() => true),
end: sinon.spy(),
on: sinon.spy(),
off: sinon.spy(),
emit: sinon.spy(),
};

const callback = sinon.spy();
const error = new Error('handler threw');
const native = sinon.spy(() => throwError(() => error));

// implicit assertion that this will never throw when call.emit emits an error event
await server.createStreamServiceMethod(native)(call, callback);
expect(native.called).to.be.true;
expect(call.emit.calledWith('error', error)).to.be.ok;
expect(call.end.called).to.be.true;
});

it(`should close the result observable when receiving an 'cancelled' event from the client`, async () => {
const et = new EventTarget();
const cancel = () => et.dispatchEvent(new Event('cancelled'));
Expand Down