Skip to content

Commit

Permalink
Fix memory leak with old WS subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed May 22, 2024
1 parent 30d0e3b commit eeb72de
Showing 1 changed file with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -143,6 +144,7 @@ public class DefaultWebSocketService implements WebSocketService {
private final ConcurrentMap<CustomerId, Set<String>> customerSubscriptionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<UserId, Set<String>> regularUserSubscriptionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<UserId, Set<String>> publicUserSubscriptionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Map<Integer, Integer>> sessionCmdMap = new ConcurrentHashMap<>();

private ExecutorService executor;
private ScheduledExecutorService pingExecutor;
Expand Down Expand Up @@ -201,9 +203,7 @@ public void handleSessionEvent(WebSocketSessionRef sessionRef, SessionEvent even
event.getError().orElse(new RuntimeException("No error specified")));
break;
case CLOSED:
wsSessionsMap.remove(sessionId);
oldSubService.cancelAllSessionSubscriptions(sessionId);
entityDataSubService.cancelAllSessionSubscriptions(sessionId);
cleanupSessionById(sessionId);
processSessionClose(sessionRef);
break;
}
Expand Down Expand Up @@ -299,9 +299,7 @@ public void close(String sessionId, CloseStatus status) {
public void cleanupIfStale(String sessionId) {
if (!msgEndpoint.isOpen(sessionId)) {
log.info("[{}] Cleaning up stale session ", sessionId);
wsSessionsMap.remove(sessionId);
oldSubService.cancelAllSessionSubscriptions(sessionId);
entityDataSubService.cancelAllSessionSubscriptions(sessionId);
cleanupSessionById(sessionId);
}
}

Expand Down Expand Up @@ -451,7 +449,7 @@ public void onSuccess(List<AttributeKvEntry> data) {
TbAttributeSubscription sub = TbAttributeSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionId)
.subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId()))
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId)
.queryTs(queryTs)
Expand Down Expand Up @@ -500,6 +498,13 @@ public void onFailure(Throwable e) {
}
}

private int registerNewSessionSubId(String sessionId, WebSocketSessionRef sessionRef, int cmdId) {
var cmdMap = sessionCmdMap.computeIfAbsent(sessionId, id -> new ConcurrentHashMap<>());
var subId = sessionRef.getSessionSubIdSeq().incrementAndGet();
cmdMap.put(cmdId, subId);
return subId;
}

private void handleWsHistoryCmd(WebSocketSessionRef sessionRef, GetHistoryCmd cmd) {
if (!validateCmd(sessionRef, cmd, () -> {
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty() || cmd.getEntityType() == null || cmd.getEntityType().isEmpty()) {
Expand Down Expand Up @@ -557,7 +562,7 @@ public void onSuccess(List<AttributeKvEntry> data) {
TbAttributeSubscription sub = TbAttributeSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionId)
.subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId()))
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId)
.queryTs(queryTs)
Expand Down Expand Up @@ -655,7 +660,7 @@ public void onSuccess(List<TsKvEntry> data) {
TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionId)
.subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId()))
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId)
.updateProcessor((subscription, update) -> {
Expand Down Expand Up @@ -710,7 +715,7 @@ public void onSuccess(List<TsKvEntry> data) {
TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionId)
.subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId()))
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId)
.updateProcessor((subscription, update) -> {
Expand Down Expand Up @@ -749,12 +754,25 @@ public void onFailure(Throwable e) {

private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
oldSubService.cancelAllSessionSubscriptions(sessionId);
log.warn("[{}][{}] Cleanup session due to empty entity id.", sessionId, cmd.getCmdId());
cleanupSessionById(sessionId);
} else {
oldSubService.cancelSubscription(sessionId, cmd.getCmdId());
Integer subId = sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId());
if (subId == null) {
log.trace("[{}][{}] Failed to lookup subscription id mapping", sessionId, cmd.getCmdId());
subId = cmd.getCmdId();
}
oldSubService.cancelSubscription(sessionId, subId);
}
}

private void cleanupSessionById(String sessionId) {
wsSessionsMap.remove(sessionId);
oldSubService.cancelAllSessionSubscriptions(sessionId);
sessionCmdMap.remove(sessionId);
entityDataSubService.cancelAllSessionSubscriptions(sessionId);
}

private boolean validateSubscriptionCmd(WebSocketSessionRef sessionRef, EntityDataCmd cmd) {
return validateCmd(sessionRef, cmd, () -> {
if (cmd.getQuery() == null && !cmd.hasAnyCmd()) {
Expand Down

0 comments on commit eeb72de

Please sign in to comment.