Skip to content

Commit 7356274

Browse files
committed
add: JsonRpcServer callbacks
1 parent b18f93b commit 7356274

File tree

6 files changed

+66
-113
lines changed

6 files changed

+66
-113
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,9 +1733,11 @@ lazy val `json-rpc-server` = project
17331733
Compile / moduleDependencies ++= slf4jApi,
17341734
Compile / internalModuleDependencies := Seq(
17351735
(`akka-wrapper` / Compile / exportedModule).value,
1736+
(`ydoc-api` / Compile / exportedModule).value,
17361737
(`scala-libs-wrapper` / Compile / exportedModule).value
17371738
)
17381739
)
1740+
.dependsOn(`ydoc-api`)
17391741
.dependsOn(`runtime-utils` % "test->compile")
17401742

17411743
// An automatic JPMS module

engine/language-server/src/main/scala/org/enso/languageserver/boot/MainModule.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.enso.distribution.{DistributionManager, Environment, LanguageHome}
1212
import org.enso.editions.EditionResolver
1313
import org.enso.profiling.events.EventsMonitor
1414
import org.enso.editions.updater.EditionManager
15-
import org.enso.jsonrpc.{JsonRpcServer, SecureConnectionConfig}
15+
import org.enso.jsonrpc.{YdocJsonRpcServer, JsonRpcServer, SecureConnectionConfig}
1616
import org.enso.runner.common.CompilerBasedDependencyExtractor
1717
import org.enso.languageserver.capability.CapabilityRouter
1818
import org.enso.languageserver.data._
@@ -502,7 +502,7 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) {
502502

503503
val materializer: Materializer = Materializer.createMaterializer(system)
504504
val jsonRpcServer =
505-
new JsonRpcServer(
505+
new YdocJsonRpcServer(
506506
jsonRpcProtocolFactory,
507507
jsonRpcControllerFactory,
508508
JsonRpcServer
@@ -513,7 +513,7 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) {
513513
),
514514
List(healthCheckEndpoint, idlenessEndpoint, renameProjectEndpoint),
515515
messagesCallback
516-
)(system, materializer)
516+
)(system)
517517
log.trace("Created JSON RPC Server [{}]", jsonRpcServer)
518518

519519
val binaryServer =
@@ -532,9 +532,8 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) {
532532

533533
private val ydoc = {
534534
val c = org.enso.languageserver.boot.config.ApplicationConfig.load().ydoc
535-
val callbacks = org.enso.ydoc.api.NoOpMessageCallbacks.INSTANCE
536535
org.enso.runner.common.YdocServerApi
537-
.launchYdocServer(c.hostname, c.port, callbacks)
536+
.launchYdocServer(c.hostname, c.port, jsonRpcServer.yjsChannelCallbacks)
538537
}
539538

540539
log.debug(
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package org.enso.ydoc.api;
22

33
public interface MessageCallbacks {
4-
public interface YjsChannel {
5-
public void sendText(String message);
6-
}
74

85
public void onConnect(YjsChannel channel);
96

10-
public void onText(String message);
7+
public void onMessage(Object message);
118
}

lib/java/ydoc-api/src/main/java/org/enso/ydoc/api/NoOpMessageCallbacks.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ public class NoOpMessageCallbacks implements MessageCallbacks {
66
private NoOpMessageCallbacks() {}
77

88
@Override
9-
public void sendText(String message) {}
9+
public void onConnect(YjsChannel channel) {}
1010

1111
@Override
12-
public void onText(String message) {}
12+
public void onMessage(Object message) {}
1313
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.enso.ydoc.api;
2+
3+
public interface YjsChannel {
4+
public void send(Object message);
5+
}
Lines changed: 52 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
package org.enso.jsonrpc
22

3-
import akka.NotUsed
4-
import akka.actor.{ActorRef, ActorSystem, Props}
5-
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
3+
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
64
import akka.http.scaladsl.server.Directives._
75
import akka.http.scaladsl.server.Route
8-
import akka.stream.scaladsl.{Flow, Sink, Source}
9-
import akka.stream.{Materializer, OverflowStrategy}
106
import com.typesafe.scalalogging.LazyLogging
11-
import org.enso.jsonrpc.MessageHandler.WebMessage
7+
import org.enso.jsonrpc.MessageHandler
8+
import org.enso.ydoc.api.MessageCallbacks
9+
import org.enso.ydoc.api.YjsChannel
1210

1311
import java.util.UUID
1412

15-
import scala.concurrent.duration._
1613
import scala.concurrent.ExecutionContext
1714

1815
/** Exposes a multi-client JSON RPC Server instance over WebSocket connections.
@@ -21,98 +18,43 @@ import scala.concurrent.ExecutionContext
2118
* @param clientControllerFactory a factory used to create a client controller
2219
* @param config a server config
2320
* @param optionalEndpoints a list of optional endpoints
24-
* @param messageCallbacks a list of message callbacks
2521
* @param system an actor system
2622
* @param materializer a materializer
2723
*/
2824
class YdocJsonRpcServer(
2925
protocolFactory: ProtocolFactory,
3026
clientControllerFactory: ClientControllerFactory,
31-
config: JsonRpcServer.Config = JsonRpcServer.Config.default,
32-
optionalEndpoints: List[Endpoint] = List.empty,
33-
messageCallbacks: List[WebMessage => Unit] = List.empty
34-
)(
35-
implicit val system: ActorSystem,
36-
implicit val materializer: Materializer
27+
config: JsonRpcServer.Config = JsonRpcServer.Config.default,
28+
optionalEndpoints: List[Endpoint] = List.empty,
29+
messageCallbacks: List[MessageHandler.WebMessage => Unit] = List.empty
30+
)(implicit
31+
val system: ActorSystem
3732
) extends Server
3833
with LazyLogging {
3934

4035
implicit val ec: ExecutionContext = system.dispatcher
4136

42-
private val messageCallbackSinks =
43-
messageCallbacks.map(Sink.foreach[WebMessage])
44-
45-
private var incomingMessageHandler: ActorRef = _
46-
private var outgoingMessageHandler: ActorRef = _
47-
48-
private def newUser: Flow[Message, Message, NotUsed] = {
49-
val messageHandler =
37+
val yjsChannelCallbacks = {
38+
val incomingMessageHandler: ActorRef =
5039
system.actorOf(
5140
Props(
5241
new MessageHandlerSupervisor(
5342
clientControllerFactory,
5443
protocolFactory
5544
)
5645
),
57-
s"message-handler-supervisor-${UUID.randomUUID()}"
46+
s"ydoc-message-handler-supervisor-${UUID.randomUUID()}"
5847
)
59-
60-
val incomingMessagesFlow =
61-
Flow[Message]
62-
.mapConcat({
63-
case textMsg: TextMessage => textMsg :: Nil
64-
case _: BinaryMessage => Nil
65-
})
66-
.mapAsync(1)(
67-
_.toStrict(config.lazyMessageTimeout)
68-
.map(msg => MessageHandler.WebMessage(msg.text))
69-
)
70-
val incomingMessagesFlowWithCallbacks =
71-
messageCallbackSinks.foldLeft(incomingMessagesFlow)(_ alsoTo _)
72-
73-
val incomingMessages: Sink[Message, NotUsed] =
74-
incomingMessagesFlowWithCallbacks
75-
.wireTap { webMessage =>
76-
logger.trace(s"Received text message: ${webMessage.message}.")
77-
}
78-
.to(
79-
Sink.actorRef[MessageHandler.WebMessage](
80-
messageHandler, {
81-
logger.trace("JSON sink stream finished with no failure")
82-
MessageHandler.Disconnected()
83-
},
84-
{ e: Throwable =>
85-
logger.trace("JSON sink stream finished with a failure", e)
86-
MessageHandler.Disconnected()
87-
}
88-
)
89-
)
90-
91-
val outgoingMessages: Source[Message, NotUsed] =
92-
Source
93-
.actorRef[MessageHandler.WebMessage](
94-
PartialFunction.empty,
95-
PartialFunction.empty,
96-
config.outgoingBufferSize,
97-
OverflowStrategy.fail
98-
)
99-
.mapMaterializedValue { outActor =>
100-
messageHandler ! MessageHandler.Connected(outActor)
101-
NotUsed
102-
}
103-
.map((outMsg: MessageHandler.WebMessage) => TextMessage(outMsg.message))
104-
.wireTap { textMessage =>
105-
logger.trace(s"Sent text message ${textMessage.text}.")
106-
}
107-
108-
Flow.fromSinkAndSourceCoupled(incomingMessages, outgoingMessages)
48+
new YdocJsonRpcServer.ServerCallbacks(
49+
incomingMessageHandler,
50+
messageCallbacks,
51+
system
52+
)
10953
}
11054

11155
override protected def serverRoute(port: Int): Route = {
112-
newUser.runWith(Source.actorRef(), sink)
113-
11456
val emptyEndpoint =
115-
path("_null") {
57+
path("__null") {
11658
post { null }
11759
}
11860

@@ -127,35 +69,43 @@ class YdocJsonRpcServer(
12769

12870
object YdocJsonRpcServer {
12971

130-
/** A configuration object for properties of the YdocJsonRpcServer.
131-
*
132-
* @param outgoingBufferSize the number of messages buffered internally
133-
* if the downstream connection is lagging behind.
134-
* @param lazyMessageTimeout the timeout for downloading the whole of a lazy
135-
* stream message from the user.
136-
* @param path the http path that the server listen to.
137-
*/
138-
case class Config(
139-
outgoingBufferSize: Int,
140-
lazyMessageTimeout: FiniteDuration,
141-
secureConfig: Option[SecureConnectionConfig],
142-
path: String = ""
143-
)
144-
145-
case object Config {
72+
final class ServerCallbacks(
73+
incomingMessageHandler: ActorRef,
74+
messageCallbacks: List[MessageHandler.WebMessage => Unit],
75+
system: ActorSystem
76+
) extends MessageCallbacks
77+
with LazyLogging {
14678

147-
/** Creates a default instance of [[Config]].
148-
*
149-
* @return a default config.
150-
*/
151-
def default: Config =
152-
Config(
153-
outgoingBufferSize = 1000,
154-
lazyMessageTimeout = 10.seconds,
155-
secureConfig = None
79+
override def onConnect(channel: YjsChannel): Unit = {
80+
val outgoingMessageHandler = system.actorOf(
81+
Props(
82+
new OutgoingMessageHandler(channel)
83+
)
15684
)
85+
incomingMessageHandler ! MessageHandler.Connected(outgoingMessageHandler)
86+
}
87+
88+
override def onMessage(message: Object): Unit = {
89+
message match {
90+
case m: String =>
91+
val webMessage = MessageHandler.WebMessage(m)
92+
incomingMessageHandler ! webMessage
93+
messageCallbacks.foreach(cb => cb(webMessage))
94+
case _ =>
95+
logger.error("Received unsupported message:", message)
96+
}
97+
}
15798
}
15899

159-
case class WebConnect(webActor: ActorRef)
100+
final class OutgoingMessageHandler(channel: YjsChannel)
101+
extends Actor
102+
with LazyLogging {
160103

104+
override def receive: Receive = {
105+
case MessageHandler.WebMessage(message) =>
106+
channel.send(message)
107+
case unknown =>
108+
logger.error("Sending unsupported message:", unknown)
109+
}
110+
}
161111
}

0 commit comments

Comments
 (0)