Skip to content

Commit 27df1af

Browse files
committed
update: yjs callbacks handle Uint8Arrays
1 parent 5baf1a1 commit 27df1af

File tree

6 files changed

+122
-11
lines changed

6 files changed

+122
-11
lines changed

app/ydoc-channel/src/YjsChannel.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ export class YjsChannel<T = unknown> extends ObservableV2<WebSocketEventHandlers
173173
/**
174174
* Notifies all subscribed handlers with the received message.
175175
*/
176-
private notifyHandlers(message: T): void {
176+
protected notifyHandlers(message: any): void {
177177
// Create a MessageEvent-like object for WebSocket compatibility
178178
const messageEvent = { data: message } as MessageEvent
179179

@@ -229,4 +229,9 @@ export class YjsDataChannel<T = unknown> extends YjsChannel<T> {
229229
return new YjsDataChannel(doc, channelName, callbacks)
230230
})
231231
}
232+
233+
override notifyHandlers(message: any): void {
234+
const arr = message as Uint8Array
235+
super.notifyHandlers(arr.buffer)
236+
}
232237
}

engine/language-server/src/main/scala/org/enso/languageserver/http/server/BinaryYdocServer.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,16 @@ object BinaryYdocServer {
4848
): Unit = {
4949
logger.info(s"BinaryServerCallbacks.onMessage ${message.getClass}")
5050
message match {
51-
case bytes: ByteBuffer =>
52-
logger.info(s"Received binary message ${bytes.getClass}")
53-
decoder.decode(bytes) match {
51+
case bytes: Array[Byte] =>
52+
//val bytes = value.as(classOf[Array[Byte]])
53+
logger.info(s"Received binary message")
54+
decoder.decode(ByteBuffer.wrap(bytes)) match {
5455
case Right(message) =>
5556
incomingMessageHandler ! message
5657
case Left(error) =>
5758
logger.error("Failed to decode binary message", error)
5859
}
59-
messageCallbacks.foreach(cb => cb(bytes))
60+
messageCallbacks.foreach(cb => cb(ByteBuffer.wrap(bytes)))
6061
case _ =>
6162
logger.error(
6263
s"Received unsupported message: ${message.getClass}"

lib/java/ydoc-polyfill/src/test/java/org/enso/ydoc/api/CallbacksTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,31 @@ class YjsChannel {
104104

105105
Assert.assertEquals("World!", res.get());
106106
}
107+
108+
@Test
109+
public void onConnectSubscribeBuffer() throws Exception {
110+
var res = new AtomicReference<>();
111+
var code =
112+
"""
113+
class YjsChannel {
114+
subscribe(messageHandler) {
115+
var arr = new Uint8Array([0, 128, 255]);
116+
messageHandler(arr.buffer);
117+
}
118+
}
119+
120+
var channel = new YjsChannel();
121+
callbacks.onConnect(channel);
122+
""";
123+
124+
var callbacks =
125+
new TestCallbacks((channel) -> channel.subscribe((message) -> res.set(message)));
126+
context.getBindings("js").putMember("callbacks", callbacks);
127+
128+
CompletableFuture.runAsync(() -> context.eval("js", code), executor).get();
129+
var value = context.asValue(res.get());
130+
var arr = value.as(byte[].class);
131+
132+
Assert.assertArrayEquals(new byte[] {0, -128, -1}, arr);
133+
}
107134
}

lib/java/ydoc-server/src/main/java/org/enso/ydoc/server/Ydoc.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,10 @@ public Ydoc build() {
146146

147147
if (jsonChannelCallbacks == null) {
148148
jsonChannelCallbacks = NoOpMessageCallbacks.INSTANCE;
149-
} else {
150-
jsonChannelCallbacks = new YjsCallbacksSynchronized(jsonChannelCallbacks, executor);
151149
}
152150

153151
if (binaryChannelCallbacks == null) {
154152
binaryChannelCallbacks = NoOpMessageCallbacks.INSTANCE;
155-
} else {
156-
binaryChannelCallbacks = new YjsCallbacksSynchronized(binaryChannelCallbacks, executor);
157153
}
158154

159155
return new Ydoc(
@@ -175,6 +171,14 @@ public Context.Builder getContextBuilder() {
175171
return contextBuilder;
176172
}
177173

174+
public MessageCallbacks getJsonChannelCallbacksSynchronized() {
175+
return new YjsCallbacksSynchronized(jsonChannelCallbacks, executor);
176+
}
177+
178+
public MessageCallbacks getBinaryChannelCallbacksSynchronized(Context context) {
179+
return new YjsBinaryChannelCallbacksSynchronized(binaryChannelCallbacks, executor, context);
180+
}
181+
178182
public void start() throws ExecutionException, InterruptedException, IOException {
179183
var ydoc = Main.class.getResource(YDOC_PATH);
180184
if (ydoc == null) {
@@ -195,8 +199,10 @@ public void start() throws ExecutionException, InterruptedException, IOException
195199
var bindings = ctx.getBindings("js");
196200
bindings.putMember("YDOC_HOST", hostname);
197201
bindings.putMember("YDOC_PORT", port);
198-
bindings.putMember("YDOC_JSON_CHANNEL_CALLBACKS", jsonChannelCallbacks);
199-
bindings.putMember("YDOC_BINARY_CHANNEL_CALLBACKS", binaryChannelCallbacks);
202+
bindings.putMember(
203+
"YDOC_JSON_CHANNEL_CALLBACKS", getJsonChannelCallbacksSynchronized());
204+
bindings.putMember(
205+
"YDOC_BINARY_CHANNEL_CALLBACKS", getBinaryChannelCallbacksSynchronized(ctx));
200206
bindings.putMember("YDOC_LS_DEBUG", "false");
201207

202208
ctx.eval(ydocJs);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.enso.ydoc.server;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import org.enso.ydoc.api.MessageCallbacks;
5+
import org.enso.ydoc.api.YjsChannel;
6+
import org.graalvm.polyglot.Context;
7+
8+
public class YjsBinaryChannelCallbacksSynchronized implements MessageCallbacks {
9+
10+
private final MessageCallbacks callbacks;
11+
private final ExecutorService executor;
12+
private final Context context;
13+
14+
public YjsBinaryChannelCallbacksSynchronized(
15+
MessageCallbacks callbacks, ExecutorService executor, Context context) {
16+
this.callbacks = callbacks;
17+
this.executor = executor;
18+
this.context = context;
19+
}
20+
21+
@Override
22+
public void onConnect(YjsChannel channel) {
23+
var synchronizedChannel =
24+
new YjsBinaryChannelSynchronized(channel, this.executor, this.context);
25+
this.callbacks.onConnect(synchronizedChannel);
26+
}
27+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.enso.ydoc.server;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.function.Consumer;
5+
import org.enso.ydoc.api.YjsChannel;
6+
import org.graalvm.polyglot.Context;
7+
8+
public class YjsBinaryChannelSynchronized implements YjsChannel {
9+
10+
private final YjsChannel channel;
11+
private final ExecutorService executor;
12+
private final Context context;
13+
14+
public YjsBinaryChannelSynchronized(
15+
YjsChannel channel, ExecutorService executor, Context context) {
16+
this.channel = channel;
17+
this.executor = executor;
18+
this.context = context;
19+
}
20+
21+
@Override
22+
public void send(Object message) {
23+
executor.execute(() -> channel.send(message));
24+
}
25+
26+
@Override
27+
public void subscribe(Consumer<Object> messageHandler) {
28+
executor.execute(
29+
() ->
30+
channel.subscribe(
31+
(message) -> {
32+
System.out.println("DBG YjsBinaryChannel message " + message);
33+
var value = context.asValue(message);
34+
System.out.println("DBG YjsBinaryChannel value " + value);
35+
byte[] bytes = null;
36+
try {
37+
bytes = value.as(byte[].class);
38+
} catch (Throwable e) {
39+
System.out.println("DBG YjsBinaryChannel ERR " + e);
40+
}
41+
System.out.println("DBG YjsBinaryChannel bytes " + bytes);
42+
messageHandler.accept(bytes);
43+
}));
44+
}
45+
}

0 commit comments

Comments
 (0)