thingsboard-developers

Details

diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
index 82892a9..bfa262a 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
@@ -75,6 +75,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
     private int maxSessionsPerRegularUser;
     @Value("${server.ws.limits.max_sessions_per_public_user:0}")
     private int maxSessionsPerPublicUser;
+    @Value("${server.ws.limits.max_queue_per_ws_session:1000}")
+    private int maxMsgQueuePerSession;
 
     @Value("${server.ws.limits.max_updates_per_session:}")
     private String perSessionUpdatesConfiguration;
@@ -108,7 +110,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
         super.afterConnectionEstablished(session);
         try {
             if (session instanceof NativeWebSocketSession) {
-                Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
+                Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class);
                 if (nativeSession != null) {
                     nativeSession.getAsyncRemote().setSendTimeout(sendTimeout);
                 }
@@ -119,7 +121,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
             if (!checkLimits(session, sessionRef)) {
                 return;
             }
-            internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef));
+            internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, maxMsgQueuePerSession));
             externalSessionMap.put(externalSessionId, internalSessionId);
             processInWebSocketService(sessionRef, SessionEvent.onEstablished());
             log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId());
@@ -176,31 +178,40 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
         if (!"telemetry".equalsIgnoreCase(serviceToken)) {
             throw new InvalidParameterException("Can't find plugin with specified token!");
         } else {
-            SecurityUser currentUser = (SecurityUser) ((Authentication)session.getPrincipal()).getPrincipal();
+            SecurityUser currentUser = (SecurityUser) ((Authentication) session.getPrincipal()).getPrincipal();
             return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress());
         }
     }
 
-    private static class SessionMetaData implements SendHandler {
+    private class SessionMetaData implements SendHandler {
         private final WebSocketSession session;
         private final RemoteEndpoint.Async asyncRemote;
         private final TelemetryWebSocketSessionRef sessionRef;
 
         private volatile boolean isSending = false;
+        private final Queue<String> msgQueue;
 
-        private Queue<String> msgQueue = new LinkedBlockingQueue<>();
-
-        SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
+        SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef, int maxMsgQueuePerSession) {
             super();
             this.session = session;
-            Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
+            Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class);
             this.asyncRemote = nativeSession.getAsyncRemote();
             this.sessionRef = sessionRef;
+            this.msgQueue = new LinkedBlockingQueue<>(maxMsgQueuePerSession);
         }
 
-        public synchronized void sendMsg(String msg) {
+        synchronized void sendMsg(String msg) {
             if (isSending) {
-                msgQueue.add(msg);
+                try {
+                    msgQueue.add(msg);
+                } catch (RuntimeException e){
+                    log.trace("[{}] Session closed due to queue error", session.getId(), e);
+                    try {
+                        close(sessionRef, CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
+                    } catch (IOException ioe) {
+                        log.trace("[{}] Session transport error", session.getId(), ioe);
+                    }
+                }
             } else {
                 isSending = true;
                 sendMsgInternal(msg);
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 08808f9..0f87429 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -40,6 +40,7 @@ server:
       max_sessions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_CUSTOMER:0}"
       max_sessions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_REGULAR_USER:0}"
       max_sessions_per_public_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_PUBLIC_USER:0}"
+      max_queue_per_ws_session: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_QUEUE_PER_WS_SESSION:500}"
       max_subscriptions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_TENANT:0}"
       max_subscriptions_per_customer: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_CUSTOMER:0}"
       max_subscriptions_per_regular_user: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SUBSCRIPTIONS_PER_REGULAR_USER:0}"