thingsboard-developers

Details

diff --git a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
index 59b7da2..a0d3727 100644
--- a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
+++ b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java
@@ -41,7 +41,6 @@ import java.util.Map;
 public class WebSocketConfiguration implements WebSocketConfigurer {
 
     public static final String WS_PLUGIN_PREFIX = "/api/ws/plugins/";
-    public static final String WS_SECURITY_USER_ATTRIBUTE = "SECURITY_USER";
     private static final String WS_PLUGIN_MAPPING = WS_PLUGIN_PREFIX + "**";
 
     @Bean
@@ -68,7 +67,6 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
                             response.setStatusCode(HttpStatus.UNAUTHORIZED);
                             return false;
                         } else {
-                            attributes.put(WS_SECURITY_USER_ATTRIBUTE, user);
                             return true;
                         }
                     }
diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
index f579db8..c8b4553 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
@@ -16,14 +16,17 @@
 package org.thingsboard.server.controller.plugin;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.tomcat.websocket.Constants;
 import org.springframework.beans.factory.BeanCreationNotAllowedException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.security.core.Authentication;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import org.springframework.web.socket.CloseStatus;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.adapter.NativeWebSocketSession;
 import org.springframework.web.socket.handler.TextWebSocketHandler;
 import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
 import org.thingsboard.server.common.data.id.CustomerId;
@@ -38,6 +41,7 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
 
+import javax.websocket.Session;
 import java.io.IOException;
 import java.net.URI;
 import java.security.InvalidParameterException;
@@ -56,6 +60,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
     @Autowired
     private TelemetryWebSocketService webSocketService;
 
+    @Value("${server.ws.blocking_send_timeout:5000}")
+    private long blockingSendTimeout;
+
     @Value("${server.ws.limits.max_sessions_per_tenant:0}")
     private int maxSessionsPerTenant;
     @Value("${server.ws.limits.max_sessions_per_customer:0}")
@@ -96,6 +103,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
     public void afterConnectionEstablished(WebSocketSession session) throws Exception {
         super.afterConnectionEstablished(session);
         try {
+            if (session instanceof NativeWebSocketSession) {
+                Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
+                if (nativeSession != null) {
+                    nativeSession.getUserProperties().put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, new Long(blockingSendTimeout));
+                }
+            }
             String internalSessionId = session.getId();
             TelemetryWebSocketSessionRef sessionRef = toRef(session);
             String externalSessionId = sessionRef.getSessionId();
@@ -159,7 +172,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
         if (!"telemetry".equalsIgnoreCase(serviceToken)) {
             throw new InvalidParameterException("Can't find plugin with specified token!");
         } else {
-            SecurityUser currentUser = (SecurityUser) session.getAttributes().get(WebSocketConfiguration.WS_SECURITY_USER_ATTRIBUTE);
+            SecurityUser currentUser = (SecurityUser) ((Authentication)session.getPrincipal()).getPrincipal();
             return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress());
         }
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 6b87e03..6bda32d 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -75,14 +75,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -137,7 +130,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
 
     @PostConstruct
     public void initExecutor() {
-        executor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+        executor = Executors.newWorkStealingPool(50);
     }
 
     @PreDestroy
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index 3d12938..7d4d93f 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -23,28 +23,10 @@ import org.springframework.stereotype.Service;
 import org.thingsboard.rule.engine.api.util.DonAsynchron;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.transport.SessionMsgListener;
-import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
 import org.thingsboard.server.common.transport.service.AbstractTransportService;
 import org.thingsboard.server.gen.transport.TransportProtos;
-import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
-import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.*;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
@@ -53,15 +35,6 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 /**
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 2ced33a..5afd09e 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -28,11 +28,7 @@ import org.thingsboard.server.kafka.*;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * Created by ashvayka on 05.10.18.
@@ -68,7 +64,7 @@ public class RemoteTransportApiService {
 
     @PostConstruct
     public void init() {
-        this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+        this.transportCallbackExecutor = Executors.newWorkStealingPool(100);
 
         TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
         responseBuilder.settings(kafkaSettings);
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index c19f694..e98ea17 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -33,6 +33,7 @@ server:
     key-alias: "${SSL_KEY_ALIAS:tomcat}"
   log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}"
   ws:
+    blocking_send_timeout: "${TB_SERVER_WS_BLOCKING_SEND_TIMEOUT:5000}"
     limits:
       # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
       max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
@@ -167,6 +168,9 @@ cassandra:
     buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
     concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
     permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
+    dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}"
+    callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}"
+    poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
     rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
     tenant_rate_limits:
       enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}"
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index 9385844..5a81b97 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -28,15 +28,7 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
 import org.thingsboard.server.gen.transport.TransportProtos;
 
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * Created by ashvayka on 17.10.18.
@@ -278,7 +270,7 @@ public abstract class AbstractTransportService implements TransportService {
             new TbRateLimits(perDevicesLimitsConf);
         }
         this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
-        this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+        this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
         this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS);
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
index 7de20d9..f58f035 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
@@ -52,7 +52,7 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
             @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
             @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime,
             @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
-            @Value("${cassandra.query.callback_threads:2}") int callbackThreads,
+            @Value("${cassandra.query.callback_threads:4}") int callbackThreads,
             @Value("${cassandra.query.poll_ms:50}") long pollMs,
             @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
             @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
index b204a44..a5415f0 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
@@ -25,17 +25,7 @@ import org.thingsboard.server.common.msg.tools.TbRateLimits;
 
 import javax.annotation.Nullable;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -72,7 +62,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
         this.concurrencyLimit = concurrencyLimit;
         this.queue = new LinkedBlockingDeque<>(queueLimit);
         this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
-        this.callbackExecutor = new ThreadPoolExecutor(callbackThreads, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+        this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads);
         this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
         this.perTenantLimitsEnabled = perTenantLimitsEnabled;
         this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;