diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
index 136bbfd..34b6833 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
@@ -51,9 +51,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
try {
- log.info("[{}] Processing {}", session.getId(), message);
SessionMetaData sessionMd = internalSessionMap.get(session.getId());
if (sessionMd != null) {
+ log.info("[{}][{}] Processing {}", sessionMd.sessionRef.getSecurityCtx().getTenantId(), session.getId(), message);
webSocketService.handleWebSocketMsg(sessionMd.sessionRef, message.getPayload());
} else {
log.warn("[{}] Failed to find session", session.getId());
@@ -74,7 +74,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef));
externalSessionMap.put(externalSessionId, internalSessionId);
processInWebSocketService(sessionRef, SessionEvent.onEstablished());
- log.info("[{}][{}] Session is started", externalSessionId, session.getId());
+ log.info("[{}][{}][{}] Session is opened", sessionRef.getSecurityCtx().getTenantId(), externalSessionId, session.getId());
} catch (InvalidParameterException e) {
log.warn("[[{}] Failed to start session", session.getId(), e);
session.close(CloseStatus.BAD_DATA.withReason(e.getMessage()));
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index bd9e759..ef4f819 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -72,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -85,8 +86,8 @@ import java.util.stream.Collectors;
@Slf4j
public class DefaultTelemetryWebSocketService implements TelemetryWebSocketService {
- public static final int DEFAULT_LIMIT = 100;
- public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
+ private static final int DEFAULT_LIMIT = 100;
+ private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
private static final String PROCESSING_MSG = "[{}] Processing: {}";
private static final ObjectMapper jsonMapper = new ObjectMapper();
@@ -115,7 +116,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
@PostConstruct
public void initExecutor() {
- executor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+ executor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
@PreDestroy