thingsboard-aplcache

Credentials revocation

12/20/2016 7:50:52 AM

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 8b669e9..bf18df6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
+import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.*;
 
@@ -58,6 +59,8 @@ public class DeviceActor extends ContextAwareActor {
                 processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg);
             } else if (msg instanceof ToDeviceRpcRequestPluginMsg) {
                 processor.processRpcRequest(context(), (ToDeviceRpcRequestPluginMsg) msg);
+            } else if (msg instanceof DeviceCredentialsUpdateNotificationMsg){
+                processor.processCredentialsUpdate(context(), (DeviceCredentialsUpdateNotificationMsg) msg);
             }
         } else if (msg instanceof TimeoutMsg) {
             processor.processTimeout(context(), (TimeoutMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 3949691..3aef0c8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -32,13 +32,7 @@ import org.thingsboard.server.common.data.kv.AttributeKey;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
-import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
-import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.core.SessionCloseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
@@ -47,6 +41,7 @@ import org.thingsboard.server.common.msg.session.SessionType;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
 import org.thingsboard.server.extensions.api.device.DeviceAttributes;
 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
+import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutIntMsg;
@@ -74,6 +69,7 @@ import java.util.stream.Collectors;
 public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 
     private final DeviceId deviceId;
+    private final Map<SessionId, SessionInfo> sessions;
     private final Map<SessionId, SessionInfo> attributeSubscriptions;
     private final Map<SessionId, SessionInfo> rpcSubscriptions;
 
@@ -85,6 +81,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     public DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, DeviceId deviceId) {
         super(systemContext, logger);
         this.deviceId = deviceId;
+        this.sessions = new HashMap<>();
         this.attributeSubscriptions = new HashMap<>();
         this.rpcSubscriptions = new HashMap<>();
         this.rpcPendingMap = new HashMap<>();
@@ -281,7 +278,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (!msg.isAdded()) {
             logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
             Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
-                .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
+                    .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
             attributeSubscriptions.entrySet().removeIf(filter);
             rpcSubscriptions.entrySet().removeIf(filter);
         }
@@ -342,8 +339,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     private void processSessionStateMsgs(ToDeviceActorMsg msg) {
         SessionId sessionId = msg.getSessionId();
         FromDeviceMsg inMsg = msg.getPayload();
-        if (inMsg instanceof SessionCloseMsg) {
+        if (inMsg instanceof SessionOpenMsg) {
+            logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
+            sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
+        } else if (inMsg instanceof SessionCloseMsg) {
             logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
+            sessions.remove(sessionId);
             attributeSubscriptions.remove(sessionId);
             rpcSubscriptions.remove(sessionId);
         }
@@ -363,4 +364,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         return systemContext.getAttributesService().findAll(this.deviceId, attributeType);
     }
 
+    public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) {
+        sessions.forEach((k, v) -> {
+            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
+        });
+        attributeSubscriptions.clear();
+        rpcSubscriptions.clear();
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index 1c64f54..3db7210 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.actors.service;
 
+import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.RuleId;
 import org.thingsboard.server.common.data.id.TenantId;
@@ -28,4 +29,6 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
     void onPluginStateChange(TenantId tenantId, PluginId pluginId, ComponentLifecycleEvent state);
 
     void onRuleStateChange(TenantId tenantId, RuleId ruleId, ComponentLifecycleEvent state);
+
+    void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index db6526d..bbf1300 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -32,16 +32,19 @@ import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
 import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
 import org.thingsboard.server.actors.session.SessionManagerActor;
 import org.thingsboard.server.actors.stats.StatsActor;
+import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.RuleId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
@@ -56,6 +59,7 @@ import scala.concurrent.duration.Duration;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+import java.util.Optional;
 
 @Service
 @Slf4j
@@ -221,6 +225,17 @@ public class DefaultActorService implements ActorService {
         broadcast(ComponentLifecycleMsg.forRule(tenantId, ruleId, state));
     }
 
+    @Override
+    public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
+        DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
+        Optional<ServerAddress> address = actorContext.getRoutingService().resolve(deviceId);
+        if (address.isPresent()) {
+            rpcService.tell(address.get(), msg);
+        } else {
+            onMsg(msg);
+        }
+    }
+
     public void broadcast(ToAllNodesMsg msg) {
         rpcService.broadcast(msg);
         appActor.tell(msg, ActorRef.noSender());
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
index 90b0cb3..916e678 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
@@ -20,15 +20,14 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.core.AttributesSubscribeMsg;
-import org.thingsboard.server.common.msg.core.ResponseMsg;
-import org.thingsboard.server.common.msg.core.RpcSubscribeMsg;
+import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.core.SessionCloseMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.common.msg.session.*;
 
 import akka.actor.ActorContext;
 import akka.event.LoggingAdapter;
+import org.thingsboard.server.common.msg.session.ctrl.*;
 import org.thingsboard.server.common.msg.session.ex.SessionException;
 
 import java.util.HashMap;
@@ -37,7 +36,8 @@ import java.util.Optional;
 
 class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
 
-    Map<Integer, ToDeviceActorMsg> pendingMap = new HashMap<>();
+    private boolean firstMsg = true;
+    private Map<Integer, ToDeviceActorMsg> pendingMap = new HashMap<>();
     private Optional<ServerAddress> currentTargetServer;
     private boolean subscribedToAttributeUpdates;
     private boolean subscribedToRpcCommands;
@@ -49,6 +49,10 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     @Override
     protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
         updateSessionCtx(msg, SessionType.ASYNC);
+        if (firstMsg) {
+            toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
+            firstMsg = false;
+        }
         ToDeviceActorMsg pendingMsg = toDeviceMsg(msg);
         FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload();
         switch (fromDeviceMsg.getMsgType()) {
@@ -80,17 +84,21 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     @Override
     public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) {
         try {
-            switch (msg.getMsgType()) {
-                case STATUS_CODE_RESPONSE:
-                case GET_ATTRIBUTES_RESPONSE:
-                    ResponseMsg responseMsg = (ResponseMsg) msg;
-                    if (responseMsg.getRequestId() >= 0) {
-                        logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
-                        pendingMap.remove(responseMsg.getRequestId());
-                    }
-                    break;
+            if (msg.getMsgType() != MsgType.SESSION_CLOSE) {
+                switch (msg.getMsgType()) {
+                    case STATUS_CODE_RESPONSE:
+                    case GET_ATTRIBUTES_RESPONSE:
+                        ResponseMsg responseMsg = (ResponseMsg) msg;
+                        if (responseMsg.getRequestId() >= 0) {
+                            logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
+                            pendingMap.remove(responseMsg.getRequestId());
+                        }
+                        break;
+                }
+                sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
+            } else {
+                sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId()));
             }
-            sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
         } catch (SessionException e) {
             logger.warning("Failed to push session response msg", e);
         }
@@ -102,7 +110,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     }
 
     protected void cleanupSession(ActorContext ctx) {
-        toDeviceMsg(new SessionCloseMsg()).ifPresent(msg -> forwardToAppActor(ctx, msg));
+        toDeviceMsg(new SessionCloseMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
     }
 
     @Override
@@ -110,6 +118,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
         if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) {
             Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolve(getDeviceId());
             if (!newTargetServer.equals(currentTargetServer)) {
+                firstMsg = true;
                 currentTargetServer = newTargetServer;
                 pendingMap.values().forEach(v -> {
                     forwardToAppActor(context, v, currentTargetServer);
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
index afb35ac..9fb13d3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
@@ -52,7 +52,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) {
         if (pendingResponse) {
             try {
-                sessionCtx.onMsg(new SessionCloseMsg(sessionId, true));
+                sessionCtx.onMsg(SessionCloseMsg.onTimeout(sessionId));
             } catch (SessionException e) {
                 logger.warning("Failed to push session close msg", e);
             }
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index 1c0a7be..43416ce 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
 import org.thingsboard.server.dao.exception.IncorrectParameterException;
 import org.thingsboard.server.dao.model.ModelConstants;
 import org.thingsboard.server.exception.ThingsboardException;
+import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
 
 @RestController
 @RequestMapping("/api")
@@ -48,7 +49,7 @@ public class DeviceController extends BaseController {
 
     @PreAuthorize("hasAuthority('TENANT_ADMIN')")
     @RequestMapping(value = "/device", method = RequestMethod.POST)
-    @ResponseBody 
+    @ResponseBody
     public Device saveDevice(@RequestBody Device device) throws ThingsboardException {
         try {
             device.setTenantId(getCurrentUser().getTenantId());
@@ -74,7 +75,7 @@ public class DeviceController extends BaseController {
 
     @PreAuthorize("hasAuthority('TENANT_ADMIN')")
     @RequestMapping(value = "/customer/{customerId}/device/{deviceId}", method = RequestMethod.POST)
-    @ResponseBody 
+    @ResponseBody
     public Device assignDeviceToCustomer(@PathVariable("customerId") String strCustomerId,
                                          @PathVariable("deviceId") String strDeviceId) throws ThingsboardException {
         checkParameter("customerId", strCustomerId);
@@ -85,7 +86,7 @@ public class DeviceController extends BaseController {
 
             DeviceId deviceId = new DeviceId(toUUID(strDeviceId));
             checkDeviceId(deviceId);
-            
+
             return checkNotNull(deviceService.assignDeviceToCustomer(deviceId, customerId));
         } catch (Exception e) {
             throw handleException(e);
@@ -94,7 +95,7 @@ public class DeviceController extends BaseController {
 
     @PreAuthorize("hasAuthority('TENANT_ADMIN')")
     @RequestMapping(value = "/customer/device/{deviceId}", method = RequestMethod.DELETE)
-    @ResponseBody 
+    @ResponseBody
     public Device unassignDeviceFromCustomer(@PathVariable("deviceId") String strDeviceId) throws ThingsboardException {
         checkParameter("deviceId", strDeviceId);
         try {
@@ -125,19 +126,21 @@ public class DeviceController extends BaseController {
 
     @PreAuthorize("hasAuthority('TENANT_ADMIN')")
     @RequestMapping(value = "/device/credentials", method = RequestMethod.POST)
-    @ResponseBody 
+    @ResponseBody
     public DeviceCredentials saveDeviceCredentials(@RequestBody DeviceCredentials deviceCredentials) throws ThingsboardException {
         checkNotNull(deviceCredentials);
         try {
             checkDeviceId(deviceCredentials.getDeviceId());
-            return checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials));
+            DeviceCredentials result = checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials));
+            actorService.onCredentialsUpdate(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId());
+            return result;
         } catch (Exception e) {
             throw handleException(e);
         }
     }
 
     @PreAuthorize("hasAuthority('TENANT_ADMIN')")
-    @RequestMapping(value = "/tenant/devices", params = { "limit" }, method = RequestMethod.GET)
+    @RequestMapping(value = "/tenant/devices", params = {"limit"}, method = RequestMethod.GET)
     @ResponseBody
     public TextPageData<Device> getTenantDevices(
             @RequestParam int limit,
@@ -154,7 +157,7 @@ public class DeviceController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/customer/{customerId}/devices", params = { "limit" }, method = RequestMethod.GET)
+    @RequestMapping(value = "/customer/{customerId}/devices", params = {"limit"}, method = RequestMethod.GET)
     @ResponseBody
     public TextPageData<Device> getCustomerDevices(
             @PathVariable("customerId") String strCustomerId,
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index aa7fcc1..0695413 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -29,7 +29,7 @@ server:
 # Zookeeper connection parameters. Used for service discovery.
 zk:
   # Enable/disable zookeeper discovery service.
-  enabled: "${ZOOKEEPER_ENABLED:false}"
+  enabled: "${ZOOKEEPER_ENABLED:true}"
   # Zookeeper connect string
   url: "${ZOOKEEPER_URL:localhost:2181}"
   # Zookeeper retry interval in milliseconds
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java
new file mode 100644
index 0000000..3e96e40
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.core;
+
+import lombok.ToString;
+import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
+import org.thingsboard.server.common.msg.session.ToDeviceMsg;
+
+@ToString
+public class SessionCloseNotification implements ToDeviceMsg {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean isSuccess() {
+        return true;
+    }
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SESSION_CLOSE;
+    }
+
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java
new file mode 100644
index 0000000..d18dc9f
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.core;
+
+import org.thingsboard.server.common.msg.session.FromDeviceMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
+
+/**
+ * @author Andrew Shvayka
+ */
+public class SessionOpenMsg implements FromDeviceMsg {
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SESSION_OPEN;
+    }
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
index d188527..03b611e 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
@@ -21,11 +21,25 @@ import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
 public class SessionCloseMsg implements SessionCtrlMsg {
 
     private final SessionId sessionId;
+    private final boolean revoked;
     private final boolean timeout;
 
-    public SessionCloseMsg(SessionId sessionId, boolean timeout) {
+    public static SessionCloseMsg onError(SessionId sessionId) {
+        return new SessionCloseMsg(sessionId, false, false);
+    }
+
+    public static SessionCloseMsg onTimeout(SessionId sessionId) {
+        return new SessionCloseMsg(sessionId, false, true);
+    }
+
+    public static SessionCloseMsg onCredentialsRevoked(SessionId sessionId) {
+        return new SessionCloseMsg(sessionId, true, false);
+    }
+
+    private SessionCloseMsg(SessionId sessionId, boolean unauthorized, boolean timeout) {
         super();
         this.sessionId = sessionId;
+        this.revoked = unauthorized;
         this.timeout = timeout;
     }
 
@@ -34,6 +48,10 @@ public class SessionCloseMsg implements SessionCtrlMsg {
         return sessionId;
     }
 
+    public boolean isCredentialsRevoked() {
+        return revoked;
+    }
+
     public boolean isTimeout() {
         return timeout;
     }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java
index 1b91425..549a143 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java
@@ -28,7 +28,7 @@ public enum MsgType {
 
     RULE_ENGINE_ERROR,
 
-    SESSION_CLOSE;
+    SESSION_OPEN, SESSION_CLOSE;
 
     private final boolean requiresRulesProcessing;
 
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java
new file mode 100644
index 0000000..0104824
--- /dev/null
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.api.device;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.ToString;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.kv.AttributeKey;
+
+import java.util.Set;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Data
+public class DeviceCredentialsUpdateNotificationMsg implements ToDeviceActorNotificationMsg {
+
+    private final TenantId tenantId;
+    private final DeviceId deviceId;
+
+}
diff --git a/tools/src/main/resources/test.properties b/tools/src/main/resources/test.properties
new file mode 100644
index 0000000..6e9ed89
--- /dev/null
+++ b/tools/src/main/resources/test.properties
@@ -0,0 +1,5 @@
+restUrl=http://localhost:8080
+mqttUrls=tcp://localhost:1883
+deviceCount=1
+durationMs=60000
+iterationIntervalMs=1000
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
index a61f0bf..6f6e35a 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.atomic.AtomicInteger;
+
 @Slf4j
 public class CoapSessionCtx extends DeviceAwareSessionContext {
 
@@ -87,6 +88,8 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
     private void onSessionClose(SessionCloseMsg msg) {
         if (msg.isTimeout()) {
             exchange.respond(ResponseCode.SERVICE_UNAVAILABLE);
+        } else if (msg.isCredentialsRevoked()) {
+            exchange.respond(ResponseCode.UNAUTHORIZED);
         } else {
             exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR);
         }
@@ -120,7 +123,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
 
     public void close() {
         log.info("[{}] Closing processing context. Timeout: {}", sessionId, exchange.advanced().isTimedOut());
-        processor.process(new SessionCloseMsg(sessionId, exchange.advanced().isTimedOut()));
+        processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId));
     }
 
     @Override
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index e1bb45c..b4d8108 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -210,7 +210,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     }
 
     private void processDisconnect(ChannelHandlerContext ctx) {
-        processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false));
         ctx.close();
     }
 
@@ -255,6 +254,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     @Override
     public void operationComplete(Future<? super Void> future) throws Exception {
-        processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false));
+        processor.process(SessionCloseMsg.onError(sessionCtx.getSessionId()));
     }
 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java
index 5cae9f5..f653682 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java
@@ -16,12 +16,13 @@
 package org.thingsboard.server.transport.mqtt.session;
 
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.*;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
 import org.thingsboard.server.common.msg.session.SessionType;
+import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.msg.session.ex.SessionException;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -75,7 +76,10 @@ public class MqttSessionCtx extends DeviceAwareSessionContext {
 
     @Override
     public void onMsg(SessionCtrlMsg msg) throws SessionException {
-
+        if (msg instanceof SessionCloseMsg) {
+            pushToNetwork(new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)));
+            channel.close();
+        }
     }
 
     @Override