Skip to content

Commit 8456fe8

Browse files
authored
fix(transcription): Sends transcription over xmpp in separate thread. (#570)
* fix(transcription): Sends transcription over xmpp in separate thread. * squash: Xmpp packet queue per conference instance. * squash: One thread pool and xmpp and xmppSend queues per instance.
1 parent 6b25725 commit 8456fe8

9 files changed

+139
-151
lines changed

src/main/java/org/jitsi/jigasi/JvbConference.java

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.io.*;
6868
import java.net.*;
6969
import java.util.*;
70+
import java.util.concurrent.*;
7071

7172
import static net.java.sip.communicator.service.protocol.event.LocalUserChatRoomPresenceChangeEvent.*;
7273
import static org.jivesoftware.smack.packet.StanzaError.Condition.*;
@@ -185,11 +186,13 @@ public class JvbConference
185186
*/
186187
private String meetingId;
187188

189+
private static ExecutorService threadPool = Util.createNewThreadPool("xmpp-executor-pool");
190+
188191
/**
189192
* A queue used to offload xmpp execution in a new thread to avoid blocking xmpp threads,
190193
* by executing the tasks in new thread
191194
*/
192-
public static final PacketQueue<Runnable> xmppInvokeQueue = new PacketQueue<>(
195+
public final PacketQueue<Runnable> xmppInvokeQueue = new PacketQueue<>(
193196
Integer.MAX_VALUE,
194197
false,
195198
"xmpp-invoke-queue",
@@ -208,7 +211,32 @@ public class JvbConference
208211
return false;
209212
}
210213
},
211-
Util.createNewThreadPool("xmpp-executor-pool")
214+
threadPool
215+
);
216+
217+
/**
218+
* A queue used for sending xmpp messages.
219+
*/
220+
public final PacketQueue<Runnable> xmppSendQueue = new PacketQueue<>(
221+
Integer.MAX_VALUE,
222+
false,
223+
"xmpp-send-queue",
224+
r -> {
225+
// do process and try
226+
try
227+
{
228+
r.run();
229+
230+
return true;
231+
}
232+
catch (Throwable e)
233+
{
234+
logger.error("Error processing xmpp queue item", e);
235+
236+
return false;
237+
}
238+
},
239+
threadPool
212240
);
213241

214242
/**
@@ -2299,6 +2327,80 @@ private void processVisitorsJson(String json)
22992327
}
23002328
}
23012329

2330+
/**
2331+
* Send a message to the muc room
2332+
*
2333+
* @param messageString the message to send
2334+
*/
2335+
public void sendMessageToRoom(String messageString)
2336+
{
2337+
xmppSendQueue.add(() -> sendMessageToRoomInternal(messageString));
2338+
}
2339+
2340+
public void sendMessageToRoomInternal(String messageString)
2341+
{
2342+
if (isInTheRoom())
2343+
{
2344+
logger.error(this.callContext + " Cannot send message as chatRoom is null");
2345+
return;
2346+
}
2347+
2348+
try
2349+
{
2350+
this.mucRoom.sendMessage(this.mucRoom.createMessage(messageString));
2351+
if (logger.isTraceEnabled())
2352+
{
2353+
logger.trace(this.callContext + " Sending message: \"" + messageString + "\"");
2354+
}
2355+
}
2356+
catch (OperationFailedException e)
2357+
{
2358+
logger.warn(this.callContext + " Failed to send message " + messageString, e);
2359+
}
2360+
}
2361+
2362+
/**
2363+
* Send a json-message to the muc room
2364+
*
2365+
* @param jsonMessage the json message to send
2366+
*/
2367+
public void sendJsonMessage(JSONObject jsonMessage)
2368+
{
2369+
xmppSendQueue.add(() -> sendJsonMessageInternal(jsonMessage));
2370+
}
2371+
2372+
private void sendJsonMessageInternal(JSONObject jsonMessage)
2373+
{
2374+
if (this.mucRoom == null)
2375+
{
2376+
logger.error(this.callContext + " Cannot send message as chatRoom is null");
2377+
return;
2378+
}
2379+
2380+
if (!isInTheRoom())
2381+
{
2382+
if (logger.isDebugEnabled())
2383+
{
2384+
logger.debug(this.callContext + " Skip sending message to room which we left!");
2385+
}
2386+
return;
2387+
}
2388+
2389+
String messageString = jsonMessage.toString();
2390+
try
2391+
{
2392+
((ChatRoomJabberImpl)this.mucRoom).sendJsonMessage(messageString);
2393+
if (logger.isTraceEnabled())
2394+
{
2395+
logger.trace(this.callContext + " Sending json message: \"" + messageString + "\"");
2396+
}
2397+
}
2398+
catch (OperationFailedException e)
2399+
{
2400+
logger.warn(this.callContext + " Failed to send json message " + messageString, e);
2401+
}
2402+
}
2403+
23022404
/**
23032405
* Threads handles the timeout for stopping the conference.
23042406
* For waiting for conference call invite sent by the focus or for waiting

src/main/java/org/jitsi/jigasi/TranscriptionGatewaySession.java

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,6 @@ public class TranscriptionGatewaySession
8585
*/
8686
private TranscriptHandler handler;
8787

88-
/**
89-
* The ChatRoom of the conference which is going to be transcribed.
90-
* We will post messages to the ChatRoom to update users of progress
91-
* of transcription
92-
*/
93-
private ChatRoom chatRoom = null;
94-
9588
/**
9689
* The transcriber managing transcriptions of audio
9790
*/
@@ -213,14 +206,13 @@ Exception onConferenceCallStarted(Call jvbConferenceCall)
213206
// We can now safely set the Call connecting to the muc room
214207
// and the ChatRoom of the muc room
215208
this.jvbCall = jvbConferenceCall;
216-
this.chatRoom = super.jvbConference.getJvbRoom();
217209

218210
// If the transcription service is not correctly configured, there is no
219211
// point in continuing this session, so end it immediately
220212
if (!service.isConfiguredProperly())
221213
{
222214
logger.warn("TranscriptionService is not properly configured");
223-
sendMessageToRoom("Transcriber is not properly " +
215+
super.jvbConference.sendMessageToRoom("Transcriber is not properly " +
224216
"configured. Contact the service administrators and let them " +
225217
"know! I will now leave.");
226218
jvbConference.stop();
@@ -260,7 +252,7 @@ Exception onConferenceCallStarted(Call jvbConferenceCall)
260252

261253
if (welcomeMessage.length() > 0)
262254
{
263-
sendMessageToRoom(welcomeMessage.toString());
255+
super.jvbConference.sendMessageToRoom(welcomeMessage.toString());
264256
}
265257

266258
try
@@ -637,6 +629,13 @@ private List<ConferenceMember> getCurrentConferenceMembers()
637629
*/
638630
private List<ChatRoomMember> getCurrentChatRoomMembers()
639631
{
632+
if (super.jvbConference == null)
633+
{
634+
return null;
635+
}
636+
637+
ChatRoom chatRoom = super.jvbConference.getJvbRoom();
638+
640639
return chatRoom == null ? null : chatRoom.getMembers();
641640
}
642641

@@ -710,40 +709,14 @@ private String getParticipantIdentifier(ConferenceMember conferenceMember)
710709
return getConferenceMemberResourceID(conferenceMember);
711710
}
712711

713-
714-
/**
715-
* Send a message to the muc room
716-
*
717-
* @param messageString the message to send
718-
*/
719-
private void sendMessageToRoom(String messageString)
720-
{
721-
if (chatRoom == null)
722-
{
723-
logger.error("Cannot send message as chatRoom is null");
724-
return;
725-
}
726-
727-
Message message = chatRoom.createMessage(messageString);
728-
try
729-
{
730-
chatRoom.sendMessage(message);
731-
logger.debug("Sending message: \"" + messageString + "\"");
732-
}
733-
catch (OperationFailedException e)
734-
{
735-
logger.warn("Failed to send message " + messageString, e);
736-
}
737-
}
738-
739712
/**
740713
* Send a {@link TranscriptionResult} to the {@link ChatRoom}
741714
*
742715
* @param result the {@link TranscriptionResult} to send
743716
*/
744717
private void sendTranscriptionResultToRoom(TranscriptionResult result)
745718
{
746-
handler.publishTranscriptionResult(this.chatRoom, result);
719+
handler.publishTranscriptionResult(super.jvbConference, result);
747720
}
748721

749722
/**
@@ -753,7 +726,7 @@ private void sendTranscriptionResultToRoom(TranscriptionResult result)
753726
*/
754727
private void sendTranslationResultToRoom(TranslationResult result)
755728
{
756-
handler.publishTranslationResult(this.chatRoom, result);
729+
handler.publishTranslationResult(super.jvbConference, result);
757730
}
758731

759732
@Override

src/main/java/org/jitsi/jigasi/lobby/Lobby.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ protected void leaveRoom()
202202
@Override
203203
public void invitationReceived(ChatRoomInvitationReceivedEvent evt)
204204
{
205-
JvbConference.xmppInvokeQueue.add(() -> invitationReceivedInternal(evt));
205+
this.jvbConference.xmppInvokeQueue.add(() -> invitationReceivedInternal(evt));
206206
}
207207

208208
private void invitationReceivedInternal(ChatRoomInvitationReceivedEvent chatRoomInvitationReceivedEvent)
@@ -255,7 +255,7 @@ private void notifyAccessGranted()
255255
@Override
256256
public void localUserPresenceChanged(LocalUserChatRoomPresenceChangeEvent evt)
257257
{
258-
JvbConference.xmppInvokeQueue.add(() -> localUserPresenceChangedInternal(evt));
258+
this.jvbConference.xmppInvokeQueue.add(() -> localUserPresenceChangedInternal(evt));
259259
}
260260

261261
private void localUserPresenceChangedInternal(LocalUserChatRoomPresenceChangeEvent evt)

src/main/java/org/jitsi/jigasi/transcription/AbstractTranscriptPublisher.java

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717
*/
1818
package org.jitsi.jigasi.transcription;
1919

20-
import net.java.sip.communicator.impl.protocol.jabber.*;
21-
import net.java.sip.communicator.service.protocol.*;
22-
import net.java.sip.communicator.service.protocol.Message;
2320
import org.jitsi.jigasi.*;
2421
import org.jitsi.service.libjitsi.*;
2522
import org.jitsi.service.neomedia.*;
@@ -156,16 +153,6 @@ public abstract class AbstractTranscriptPublisher<T>
156153
private static final Logger logger
157154
= Logger.getLogger(AbstractTranscriptPublisher.class);
158155

159-
/**
160-
* Aspect for successful upload of transcript
161-
*/
162-
private static final String DD_ASPECT_SUCCESS = "upload_success";
163-
164-
/**
165-
* Aspect for failed upload of transcript
166-
*/
167-
private static final String DD_ASPECT_FAIL = "upload_fail";
168-
169156
/**
170157
* Get a string which contains a time stamp and a random UUID, with an
171158
* optional pre- and suffix attached.
@@ -187,76 +174,6 @@ protected static String generateHardToGuessTimeString(String prefix,
187174
UUID.randomUUID(), suffix);
188175
}
189176

190-
/**
191-
* Send a message to the muc room
192-
*
193-
* @param chatRoom the chatroom to send the message to
194-
* @param message the message to send
195-
*/
196-
protected void sendMessage(ChatRoom chatRoom, T message)
197-
{
198-
if (chatRoom == null)
199-
{
200-
logger.error("Cannot send message as chatRoom is null");
201-
return;
202-
}
203-
204-
String messageString = message.toString();
205-
Message chatRoomMessage = chatRoom.createMessage(messageString);
206-
try
207-
{
208-
chatRoom.sendMessage(chatRoomMessage);
209-
if (logger.isTraceEnabled())
210-
logger.trace("Sending message: \"" + messageString + "\"");
211-
}
212-
catch (OperationFailedException e)
213-
{
214-
logger.warn("Failed to send message " + messageString, e);
215-
}
216-
}
217-
218-
/**
219-
* Send a json-message to the muc room
220-
*
221-
* @param chatRoom the chatroom to send the message to
222-
* @param jsonMessage the json message to send
223-
*/
224-
protected void sendJsonMessage(ChatRoom chatRoom, T jsonMessage)
225-
{
226-
if (chatRoom == null)
227-
{
228-
logger.error("Cannot send message as chatRoom is null");
229-
return;
230-
}
231-
232-
if (!(chatRoom instanceof ChatRoomJabberImpl))
233-
{
234-
logger.error("Cannot send message as chatRoom is not an" +
235-
"instance of ChatRoomJabberImpl");
236-
return;
237-
}
238-
239-
if (!chatRoom.isJoined())
240-
{
241-
if (logger.isDebugEnabled())
242-
{
243-
logger.debug("Skip sending message to room which we left!");
244-
}
245-
return;
246-
}
247-
248-
String messageString = jsonMessage.toString();
249-
try
250-
{
251-
((ChatRoomJabberImpl)chatRoom).sendJsonMessage(messageString);
252-
if (logger.isTraceEnabled())
253-
logger.trace("Sending json message: \"" + messageString + "\"");
254-
}
255-
catch (OperationFailedException e)
256-
{
257-
logger.warn("Failed to send json message " + messageString, e);
258-
}
259-
}
260177
/**
261178
* Save a transcript given as a String to subdirectory of getLogDirPath()
262179
* with the given directory name and the given file name

src/main/java/org/jitsi/jigasi/transcription/LocalJsonTranscriptHandler.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.jitsi.jigasi.transcription;
1919

2020
import net.java.sip.communicator.service.protocol.*;
21+
import org.jitsi.jigasi.*;
2122
import org.json.simple.*;
2223

2324
import java.time.*;
@@ -225,19 +226,15 @@ public JSONFormatter getFormatter()
225226
}
226227

227228
@Override
228-
public void publish(ChatRoom room, TranscriptionResult result)
229+
public void publish(JvbConference jvbConference, TranscriptionResult result)
229230
{
230-
JSONObject eventObject = createTranscriptionJSONObject(result);
231-
232-
super.sendJsonMessage(room, eventObject);
231+
jvbConference.sendJsonMessage(createTranscriptionJSONObject(result));
233232
}
234233

235234
@Override
236-
public void publish(ChatRoom room, TranslationResult result)
235+
public void publish(JvbConference jvbConference, TranslationResult result)
237236
{
238-
JSONObject eventObject = createTranslationJSONObject(result);
239-
240-
super.sendJsonMessage(room, eventObject);
237+
jvbConference.sendJsonMessage(createTranslationJSONObject(result));
241238
}
242239

243240
/**

0 commit comments

Comments
 (0)