Skip to content

Groundwork to support telemetry for Akka gRPC responses #2035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.grpc.Status;

import akka.japi.function.Function;
import akka.actor.ActorSystem;
import akka.actor.ClassicActorSystemProvider;
Expand Down Expand Up @@ -141,7 +143,18 @@ public class @{serviceName}HandlerFactory {
String method = segments.next();
if (segments.hasNext()) return notFound; // we don't allow any random `/prefix/Method/anything/here
else {
return handle(spi.onRequest(prefix, method, req), method, implementation, mat, eHandler, system);
final akka.japi.Function<ActorSystem, akka.japi.Function<Throwable, Trailers>> instrumentedEHandler = s -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we'll allocate two lambdas for every request, while handling it/matching the path, also when SPI/telemetry is not used.

It would be better if a single callback per method has created up front on handler creation, and not happen at all if telemetry not used. That will require some tricker changes than these minimal amends though, it's not immediately obvious how to go about it to me, probably a private field per case and then use that inside the generated pattern match branches in the handle method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I mainly proposed this as an "as simple as possible" solution that disrupts the existing code-base as little as possible. But let me see if I can come up with something that addresses your concerns.

return t -> {
final Trailers ts = eHandler.apply(s).apply(t);
spi.onResponse(prefix, method, ts);

return ts;
};
};
return handle(spi.onRequest(prefix, method, req), method, implementation, mat, instrumentedEHandler, system)
.whenComplete((resp, e) -> {
if (resp != null && e == null) spi.onResponse(prefix, method, new Trailers(Status.OK));
});
}
} else {
return notFound;
Expand Down
17 changes: 14 additions & 3 deletions codegen/src/main/twirl/templates/ScalaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
package @service.packageName

import scala.concurrent.ExecutionContext
import scala.util.Success

import io.grpc.Status

import akka.grpc.scaladsl.{ GrpcExceptionHandler, GrpcMarshalling }
import akka.grpc.Trailers
Expand Down Expand Up @@ -114,20 +117,28 @@ object @{serviceName}Handler {

import @{service.name}.Serializers._

def handle(request: model.HttpRequest, method: String): scala.concurrent.Future[model.HttpResponse] =
def handle(request: model.HttpRequest, method: String): scala.concurrent.Future[model.HttpResponse] = {
val instrumentedEHandler: ActorSystem => PartialFunction[Throwable, Trailers] = eHandler(_).andThen { trailer =>
spi.onResponse(prefix, method, trailer)
trailer
}
GrpcMarshalling.negotiated(request, (reader, writer) =>
(method match {
@for(method <- service.methods) {
case "@method.grpcName" =>
@{if(powerApis) { "val metadata = MetadataBuilder.fromHttpMessage(request)" } else { "" }}
@{method.unmarshal}(request.entity)(@method.deserializer.name, mat, reader)
.@{if(method.outputStreaming) { "map" } else { "flatMap" }}(implementation.@{method.nameSafe}(_@{if(powerApis) { ", metadata" } else { "" }}))
.map(e => @{method.marshal}(e, eHandler)(@method.serializer.name, writer, system))
.map(e => @{method.marshal}(e, instrumentedEHandler)(@method.serializer.name, writer, system))
}
case m => scala.concurrent.Future.failed(new NotImplementedError(s"Not implemented: $m"))
})
.recoverWith(GrpcExceptionHandler.from(eHandler(system.classicSystem))(system, writer))
.andThen {
case Success(_) => spi.onResponse(prefix, method, Trailers(Status.OK))
}
.recoverWith(GrpcExceptionHandler.from(instrumentedEHandler(system.classicSystem))(system, writer))
).getOrElse(unsupportedMediaType)
}

def isThisService(path: model.Uri.Path): Boolean =
path match {
Expand Down
2 changes: 2 additions & 0 deletions runtime/src/main/scala/akka/grpc/internal/TelemetrySpi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import akka.actor.{
ExtensionIdProvider
}
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.grpc.Trailers
import akka.http.javadsl.model.HttpRequest

import scala.annotation.nowarn
Expand Down Expand Up @@ -61,6 +62,7 @@ private[internal] object TelemetrySpi {
trait TelemetrySpi {
@nowarn
def onRequest[T <: HttpRequest](prefix: String, method: String, request: T): T = request
def onResponse(prefix: String, method: String, trailers: Trailers): Unit = ()
}

@InternalApi
Expand Down