11package org .enso .jsonrpc
22
3- import akka .NotUsed
4- import akka .actor .{ActorRef , ActorSystem , Props }
5- import akka .http .scaladsl .model .ws .{BinaryMessage , Message , TextMessage }
3+ import akka .actor .{Actor , ActorRef , ActorSystem , Props }
64import akka .http .scaladsl .server .Directives ._
75import akka .http .scaladsl .server .Route
8- import akka .stream .scaladsl .{Flow , Sink , Source }
9- import akka .stream .{Materializer , OverflowStrategy }
106import com .typesafe .scalalogging .LazyLogging
11- import org .enso .jsonrpc .MessageHandler .WebMessage
7+ import org .enso .jsonrpc .MessageHandler
8+ import org .enso .ydoc .api .MessageCallbacks
9+ import org .enso .ydoc .api .YjsChannel
1210
1311import java .util .UUID
1412
15- import scala .concurrent .duration ._
1613import scala .concurrent .ExecutionContext
1714
1815/** Exposes a multi-client JSON RPC Server instance over WebSocket connections.
@@ -21,98 +18,43 @@ import scala.concurrent.ExecutionContext
2118 * @param clientControllerFactory a factory used to create a client controller
2219 * @param config a server config
2320 * @param optionalEndpoints a list of optional endpoints
24- * @param messageCallbacks a list of message callbacks
2521 * @param system an actor system
2622 * @param materializer a materializer
2723 */
2824class YdocJsonRpcServer (
2925 protocolFactory : ProtocolFactory ,
3026 clientControllerFactory : ClientControllerFactory ,
31- config : JsonRpcServer .Config = JsonRpcServer .Config .default,
32- optionalEndpoints : List [Endpoint ] = List .empty,
33- messageCallbacks : List [WebMessage => Unit ] = List .empty
34- )(
35- implicit val system : ActorSystem ,
36- implicit val materializer : Materializer
27+ config : JsonRpcServer .Config = JsonRpcServer .Config .default,
28+ optionalEndpoints : List [Endpoint ] = List .empty,
29+ messageCallbacks : List [MessageHandler .WebMessage => Unit ] = List .empty
30+ )(implicit
31+ val system : ActorSystem
3732) extends Server
3833 with LazyLogging {
3934
4035 implicit val ec : ExecutionContext = system.dispatcher
4136
42- private val messageCallbackSinks =
43- messageCallbacks.map(Sink .foreach[WebMessage ])
44-
45- private var incomingMessageHandler : ActorRef = _
46- private var outgoingMessageHandler : ActorRef = _
47-
48- private def newUser : Flow [Message , Message , NotUsed ] = {
49- val messageHandler =
37+ val yjsChannelCallbacks = {
38+ val incomingMessageHandler : ActorRef =
5039 system.actorOf(
5140 Props (
5241 new MessageHandlerSupervisor (
5342 clientControllerFactory,
5443 protocolFactory
5544 )
5645 ),
57- s " message-handler-supervisor- ${UUID .randomUUID()}"
46+ s " ydoc- message-handler-supervisor- ${UUID .randomUUID()}"
5847 )
59-
60- val incomingMessagesFlow =
61- Flow [Message ]
62- .mapConcat({
63- case textMsg : TextMessage => textMsg :: Nil
64- case _ : BinaryMessage => Nil
65- })
66- .mapAsync(1 )(
67- _.toStrict(config.lazyMessageTimeout)
68- .map(msg => MessageHandler .WebMessage (msg.text))
69- )
70- val incomingMessagesFlowWithCallbacks =
71- messageCallbackSinks.foldLeft(incomingMessagesFlow)(_ alsoTo _)
72-
73- val incomingMessages : Sink [Message , NotUsed ] =
74- incomingMessagesFlowWithCallbacks
75- .wireTap { webMessage =>
76- logger.trace(s " Received text message: ${webMessage.message}. " )
77- }
78- .to(
79- Sink .actorRef[MessageHandler .WebMessage ](
80- messageHandler, {
81- logger.trace(" JSON sink stream finished with no failure" )
82- MessageHandler .Disconnected ()
83- },
84- { e : Throwable =>
85- logger.trace(" JSON sink stream finished with a failure" , e)
86- MessageHandler .Disconnected ()
87- }
88- )
89- )
90-
91- val outgoingMessages : Source [Message , NotUsed ] =
92- Source
93- .actorRef[MessageHandler .WebMessage ](
94- PartialFunction .empty,
95- PartialFunction .empty,
96- config.outgoingBufferSize,
97- OverflowStrategy .fail
98- )
99- .mapMaterializedValue { outActor =>
100- messageHandler ! MessageHandler .Connected (outActor)
101- NotUsed
102- }
103- .map((outMsg : MessageHandler .WebMessage ) => TextMessage (outMsg.message))
104- .wireTap { textMessage =>
105- logger.trace(s " Sent text message ${textMessage.text}. " )
106- }
107-
108- Flow .fromSinkAndSourceCoupled(incomingMessages, outgoingMessages)
48+ new YdocJsonRpcServer .ServerCallbacks (
49+ incomingMessageHandler,
50+ messageCallbacks,
51+ system
52+ )
10953 }
11054
11155 override protected def serverRoute (port : Int ): Route = {
112- newUser.runWith(Source .actorRef(), sink)
113-
11456 val emptyEndpoint =
115- path(" _null " ) {
57+ path(" __null " ) {
11658 post { null }
11759 }
11860
@@ -127,35 +69,43 @@ class YdocJsonRpcServer(
12769
12870object YdocJsonRpcServer {
12971
130- /** A configuration object for properties of the YdocJsonRpcServer.
131- *
132- * @param outgoingBufferSize the number of messages buffered internally
133- * if the downstream connection is lagging behind.
134- * @param lazyMessageTimeout the timeout for downloading the whole of a lazy
135- * stream message from the user.
136- * @param path the http path that the server listen to.
137- */
138- case class Config (
139- outgoingBufferSize : Int ,
140- lazyMessageTimeout : FiniteDuration ,
141- secureConfig : Option [SecureConnectionConfig ],
142- path : String = " "
143- )
144-
145- case object Config {
72+ final class ServerCallbacks (
73+ incomingMessageHandler : ActorRef ,
74+ messageCallbacks : List [MessageHandler .WebMessage => Unit ],
75+ system : ActorSystem
76+ ) extends MessageCallbacks
77+ with LazyLogging {
14678
147- /** Creates a default instance of [[Config ]].
148- *
149- * @return a default config.
150- */
151- def default : Config =
152- Config (
153- outgoingBufferSize = 1000 ,
154- lazyMessageTimeout = 10 .seconds,
155- secureConfig = None
79+ override def onConnect (channel : YjsChannel ): Unit = {
80+ val outgoingMessageHandler = system.actorOf(
81+ Props (
82+ new OutgoingMessageHandler (channel)
83+ )
15684 )
85+ incomingMessageHandler ! MessageHandler .Connected (outgoingMessageHandler)
86+ }
87+
88+ override def onMessage (message : Object ): Unit = {
89+ message match {
90+ case m : String =>
91+ val webMessage = MessageHandler .WebMessage (m)
92+ incomingMessageHandler ! webMessage
93+ messageCallbacks.foreach(cb => cb(webMessage))
94+ case _ =>
95+ logger.error(" Received unsupported message:" , message)
96+ }
97+ }
15798 }
15899
159- case class WebConnect (webActor : ActorRef )
100+ final class OutgoingMessageHandler (channel : YjsChannel )
101+ extends Actor
102+ with LazyLogging {
160103
104+ override def receive : Receive = {
105+ case MessageHandler .WebMessage (message) =>
106+ channel.send(message)
107+ case unknown =>
108+ logger.error(" Sending unsupported message:" , unknown)
109+ }
110+ }
161111}
0 commit comments