thingsboard-aplcache

Max count of sessions per device

5/27/2018 4:22:53 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 94581b9..d33734e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -203,6 +203,10 @@ public class ActorSystemContext {
     @Getter
     private long queuePartitionId;
 
+    @Value("${actors.session.max_concurrent_sessions_per_device:1}")
+    @Getter
+    private long maxConcurrentSessionsPerDevice;
+
     @Value("${actors.session.sync.timeout}")
     @Getter
     private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index d416497..85b3285 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -81,6 +81,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -117,7 +118,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         super(systemContext, logger);
         this.tenantId = tenantId;
         this.deviceId = deviceId;
-        this.sessions = new HashMap<>();
+        this.sessions = new LinkedHashMap<>();
         this.attributeSubscriptions = new HashMap<>();
         this.rpcSubscriptions = new HashMap<>();
         this.toDeviceRpcPendingMap = new HashMap<>();
@@ -501,6 +502,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         FromDeviceMsg inMsg = msg.getPayload();
         if (inMsg instanceof SessionOpenMsg) {
             logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
+            if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
+                SessionId sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
+                if (sessionIdToRemove != null) {
+                    closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
+                }
+            }
             sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
             if (sessions.size() == 1) {
                 reportSessionOpen();
@@ -528,13 +535,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     }
 
     void processCredentialsUpdate() {
-        sessions.forEach((k, v) -> {
-            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
-        });
+        sessions.forEach(this::closeSession);
         attributeSubscriptions.clear();
         rpcSubscriptions.clear();
     }
 
+    private void closeSession(SessionId sessionId, SessionInfo sessionInfo) {
+        sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), sessionId), sessionInfo.getServer());
+    }
+
     void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
         this.deviceName = msg.getDeviceName();
         this.deviceType = msg.getDeviceType();
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
index 70534ae..a8f14fe 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
@@ -56,12 +56,14 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     @Override
     protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
         updateSessionCtx(msg, SessionType.ASYNC);
+        DeviceToDeviceActorMsg pendingMsg = toDeviceMsg(msg);
+        FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload();
         if (firstMsg) {
-            toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
+            if (fromDeviceMsg.getMsgType() != SessionMsgType.SESSION_OPEN) {
+                toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
+            }
             firstMsg = false;
         }
-        DeviceToDeviceActorMsg pendingMsg = toDeviceMsg(msg);
-        FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload();
         switch (fromDeviceMsg.getMsgType()) {
             case POST_TELEMETRY_REQUEST:
             case POST_ATTRIBUTES_REQUEST:
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index 6630d7c..678495b 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -97,7 +97,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
 
     @Override
     public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
-        log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
+        log.error("[{}] response to request: [{}]", this.hashCode(), response.getId());
         if (routingService.getCurrentServer().equals(response.getServerAddress())) {
             UUID requestId = response.getId();
             Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 188291a..5653193 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -227,6 +227,7 @@ actors:
   tenant:
     create_components_on_init: true
   session:
+    max_concurrent_sessions_per_device: "${ACTORS_MAX_CONCURRENT_SESSION_PER_DEVICE:1}"
     sync:
       # Default timeout for processing request using synchronous session (HTTP, CoAP) in milliseconds
       timeout: "${ACTORS_SESSION_SYNC_TIMEOUT:10000}"
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java
index c121a71..28e319e 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java
@@ -15,6 +15,9 @@
  */
 package org.thingsboard.server.common.msg.core;
 
+import lombok.Data;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
@@ -22,7 +25,9 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
 /**
  * @author Andrew Shvayka
  */
+@Data
 public class SessionOpenMsg implements FromDeviceMsg {
+
     @Override
     public SessionMsgType getMsgType() {
         return SessionMsgType.SESSION_OPEN;
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 0b38817..185b7a8 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -29,7 +29,9 @@ import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
 import org.thingsboard.server.common.data.security.DeviceX509Credentials;
+import org.thingsboard.server.common.msg.core.SessionOpenMsg;
 import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
+import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
 import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
@@ -95,6 +97,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         log.trace("[{}] Processing msg: {}", sessionId, msg);
         if (msg instanceof MqttMessage) {
             processMqttMsg(ctx, (MqttMessage) msg);
+        } else {
+            ctx.close();
         }
     }
 
@@ -303,6 +307,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         } else {
             ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
             connected = true;
+            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+                    new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
             checkGatewaySession();
         }
     }
@@ -314,6 +320,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
                 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
                 connected = true;
+                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
                 checkGatewaySession();
             } else {
                 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));