thingsboard-developers

Add async websocket send support.

11/22/2018 10:41:33 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
index c8b4553..82892a9 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
@@ -16,7 +16,6 @@
 package org.thingsboard.server.controller.plugin;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.tomcat.websocket.Constants;
 import org.springframework.beans.factory.BeanCreationNotAllowedException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -41,14 +40,19 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
 
+import javax.websocket.RemoteEndpoint;
+import javax.websocket.SendHandler;
+import javax.websocket.SendResult;
 import javax.websocket.Session;
 import java.io.IOException;
 import java.net.URI;
 import java.security.InvalidParameterException;
+import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
 
 @Service
 @Slf4j
@@ -60,8 +64,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
     @Autowired
     private TelemetryWebSocketService webSocketService;
 
-    @Value("${server.ws.blocking_send_timeout:5000}")
-    private long blockingSendTimeout;
+    @Value("${server.ws.send_timeout:5000}")
+    private long sendTimeout;
 
     @Value("${server.ws.limits.max_sessions_per_tenant:0}")
     private int maxSessionsPerTenant;
@@ -106,7 +110,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
             if (session instanceof NativeWebSocketSession) {
                 Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
                 if (nativeSession != null) {
-                    nativeSession.getUserProperties().put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, new Long(blockingSendTimeout));
+                    nativeSession.getAsyncRemote().setSendTimeout(sendTimeout);
                 }
             }
             String internalSessionId = session.getId();
@@ -177,15 +181,52 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
         }
     }
 
-    private static class SessionMetaData {
+    private static class SessionMetaData implements SendHandler {
         private final WebSocketSession session;
+        private final RemoteEndpoint.Async asyncRemote;
         private final TelemetryWebSocketSessionRef sessionRef;
 
+        private volatile boolean isSending = false;
+
+        private Queue<String> msgQueue = new LinkedBlockingQueue<>();
+
         SessionMetaData(WebSocketSession session, TelemetryWebSocketSessionRef sessionRef) {
             super();
             this.session = session;
+            Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
+            this.asyncRemote = nativeSession.getAsyncRemote();
             this.sessionRef = sessionRef;
         }
+
+        public synchronized void sendMsg(String msg) {
+            if (isSending) {
+                msgQueue.add(msg);
+            } else {
+                isSending = true;
+                sendMsgInternal(msg);
+            }
+        }
+
+        private void sendMsgInternal(String msg) {
+            try {
+                this.asyncRemote.sendText(msg, this);
+            } catch (Exception e) {
+                log.error("[{}] Failed to send msg", session.getId(), e);
+            }
+        }
+
+        @Override
+        public void onResult(SendResult result) {
+            if (!result.isOK()) {
+                log.error("[{}] Failed to send msg", session.getId(), result.getException());
+            }
+            String msg = msgQueue.poll();
+            if (msg != null) {
+                sendMsgInternal(msg);
+            } else {
+                isSending = false;
+            }
+        }
     }
 
     @Override
@@ -202,9 +243,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
                         if (blacklistedSessions.putIfAbsent(externalId, sessionRef) == null) {
                             log.info("[{}][{}][{}] Failed to process session update. Max session updates limit reached"
                                     , sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), externalId);
-                            synchronized (sessionMd) {
-                                sessionMd.session.sendMessage(new TextMessage("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}"));
-                            }
+                            sessionMd.sendMsg("{\"subscriptionId\":" + subscriptionId + ", \"errorCode\":" + ThingsboardErrorCode.TOO_MANY_UPDATES.getErrorCode() + ", \"errorMsg\":\"Too many updates!\"}");
                         }
                         return;
                     } else {
@@ -212,14 +251,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
                         blacklistedSessions.remove(externalId);
                     }
                 }
-                synchronized (sessionMd) {
-                    long start = System.currentTimeMillis();
-                    sessionMd.session.sendMessage(new TextMessage(msg));
-                    long took = System.currentTimeMillis() - start;
-                    if (took >= 1000) {
-                        log.info("[{}][{}] Sending message took more than 1 second [{}ms] {}", sessionRef.getSecurityCtx().getTenantId(), externalId, took, msg);
-                    }
-                }
+                sessionMd.sendMsg(msg);
             } else {
                 log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId);
             }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index e98ea17..08808f9 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -33,7 +33,7 @@ server:
     key-alias: "${SSL_KEY_ALIAS:tomcat}"
   log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}"
   ws:
-    blocking_send_timeout: "${TB_SERVER_WS_BLOCKING_SEND_TIMEOUT:5000}"
+    send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}"
     limits:
       # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
       max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"