thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 26(+17 -9)
common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java 38(+38 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java 20(+19 -1)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java 36(+36 -0)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java 5(+4 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 8b669e9..bf18df6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
+import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.*;
@@ -58,6 +59,8 @@ public class DeviceActor extends ContextAwareActor {
processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg);
} else if (msg instanceof ToDeviceRpcRequestPluginMsg) {
processor.processRpcRequest(context(), (ToDeviceRpcRequestPluginMsg) msg);
+ } else if (msg instanceof DeviceCredentialsUpdateNotificationMsg){
+ processor.processCredentialsUpdate(context(), (DeviceCredentialsUpdateNotificationMsg) msg);
}
} else if (msg instanceof TimeoutMsg) {
processor.processTimeout(context(), (TimeoutMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 3949691..3aef0c8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -32,13 +32,7 @@ import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
-import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
-import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.core.SessionCloseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
@@ -47,6 +41,7 @@ import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributes;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
+import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutIntMsg;
@@ -74,6 +69,7 @@ import java.util.stream.Collectors;
public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
private final DeviceId deviceId;
+ private final Map<SessionId, SessionInfo> sessions;
private final Map<SessionId, SessionInfo> attributeSubscriptions;
private final Map<SessionId, SessionInfo> rpcSubscriptions;
@@ -85,6 +81,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
public DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, DeviceId deviceId) {
super(systemContext, logger);
this.deviceId = deviceId;
+ this.sessions = new HashMap<>();
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.rpcPendingMap = new HashMap<>();
@@ -281,7 +278,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (!msg.isAdded()) {
logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
- .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
+ .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
attributeSubscriptions.entrySet().removeIf(filter);
rpcSubscriptions.entrySet().removeIf(filter);
}
@@ -342,8 +339,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private void processSessionStateMsgs(ToDeviceActorMsg msg) {
SessionId sessionId = msg.getSessionId();
FromDeviceMsg inMsg = msg.getPayload();
- if (inMsg instanceof SessionCloseMsg) {
+ if (inMsg instanceof SessionOpenMsg) {
+ logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
+ sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
+ } else if (inMsg instanceof SessionCloseMsg) {
logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
+ sessions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
rpcSubscriptions.remove(sessionId);
}
@@ -363,4 +364,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
return systemContext.getAttributesService().findAll(this.deviceId, attributeType);
}
+ public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) {
+ sessions.forEach((k, v) -> {
+ sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
+ });
+ attributeSubscriptions.clear();
+ rpcSubscriptions.clear();
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index 1c64f54..3db7210 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.service;
+import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -28,4 +29,6 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
void onPluginStateChange(TenantId tenantId, PluginId pluginId, ComponentLifecycleEvent state);
void onRuleStateChange(TenantId tenantId, RuleId ruleId, ComponentLifecycleEvent state);
+
+ void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index db6526d..bbf1300 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -32,16 +32,19 @@ import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
import org.thingsboard.server.actors.session.SessionManagerActor;
import org.thingsboard.server.actors.stats.StatsActor;
+import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
@@ -56,6 +59,7 @@ import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import java.util.Optional;
@Service
@Slf4j
@@ -221,6 +225,17 @@ public class DefaultActorService implements ActorService {
broadcast(ComponentLifecycleMsg.forRule(tenantId, ruleId, state));
}
+ @Override
+ public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
+ DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
+ Optional<ServerAddress> address = actorContext.getRoutingService().resolve(deviceId);
+ if (address.isPresent()) {
+ rpcService.tell(address.get(), msg);
+ } else {
+ onMsg(msg);
+ }
+ }
+
public void broadcast(ToAllNodesMsg msg) {
rpcService.broadcast(msg);
appActor.tell(msg, ActorRef.noSender());
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
index 90b0cb3..916e678 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
@@ -20,15 +20,14 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.core.AttributesSubscribeMsg;
-import org.thingsboard.server.common.msg.core.ResponseMsg;
-import org.thingsboard.server.common.msg.core.RpcSubscribeMsg;
+import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.*;
import akka.actor.ActorContext;
import akka.event.LoggingAdapter;
+import org.thingsboard.server.common.msg.session.ctrl.*;
import org.thingsboard.server.common.msg.session.ex.SessionException;
import java.util.HashMap;
@@ -37,7 +36,8 @@ import java.util.Optional;
class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
- Map<Integer, ToDeviceActorMsg> pendingMap = new HashMap<>();
+ private boolean firstMsg = true;
+ private Map<Integer, ToDeviceActorMsg> pendingMap = new HashMap<>();
private Optional<ServerAddress> currentTargetServer;
private boolean subscribedToAttributeUpdates;
private boolean subscribedToRpcCommands;
@@ -49,6 +49,10 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
@Override
protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
updateSessionCtx(msg, SessionType.ASYNC);
+ if (firstMsg) {
+ toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
+ firstMsg = false;
+ }
ToDeviceActorMsg pendingMsg = toDeviceMsg(msg);
FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload();
switch (fromDeviceMsg.getMsgType()) {
@@ -80,17 +84,21 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
@Override
public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) {
try {
- switch (msg.getMsgType()) {
- case STATUS_CODE_RESPONSE:
- case GET_ATTRIBUTES_RESPONSE:
- ResponseMsg responseMsg = (ResponseMsg) msg;
- if (responseMsg.getRequestId() >= 0) {
- logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
- pendingMap.remove(responseMsg.getRequestId());
- }
- break;
+ if (msg.getMsgType() != MsgType.SESSION_CLOSE) {
+ switch (msg.getMsgType()) {
+ case STATUS_CODE_RESPONSE:
+ case GET_ATTRIBUTES_RESPONSE:
+ ResponseMsg responseMsg = (ResponseMsg) msg;
+ if (responseMsg.getRequestId() >= 0) {
+ logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
+ pendingMap.remove(responseMsg.getRequestId());
+ }
+ break;
+ }
+ sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
+ } else {
+ sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId()));
}
- sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
} catch (SessionException e) {
logger.warning("Failed to push session response msg", e);
}
@@ -102,7 +110,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
}
protected void cleanupSession(ActorContext ctx) {
- toDeviceMsg(new SessionCloseMsg()).ifPresent(msg -> forwardToAppActor(ctx, msg));
+ toDeviceMsg(new SessionCloseMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
}
@Override
@@ -110,6 +118,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) {
Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolve(getDeviceId());
if (!newTargetServer.equals(currentTargetServer)) {
+ firstMsg = true;
currentTargetServer = newTargetServer;
pendingMap.values().forEach(v -> {
forwardToAppActor(context, v, currentTargetServer);
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
index afb35ac..9fb13d3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
@@ -52,7 +52,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) {
if (pendingResponse) {
try {
- sessionCtx.onMsg(new SessionCloseMsg(sessionId, true));
+ sessionCtx.onMsg(SessionCloseMsg.onTimeout(sessionId));
} catch (SessionException e) {
logger.warning("Failed to push session close msg", e);
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index 1c0a7be..43416ce 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.exception.ThingsboardException;
+import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
@RestController
@RequestMapping("/api")
@@ -48,7 +49,7 @@ public class DeviceController extends BaseController {
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/device", method = RequestMethod.POST)
- @ResponseBody
+ @ResponseBody
public Device saveDevice(@RequestBody Device device) throws ThingsboardException {
try {
device.setTenantId(getCurrentUser().getTenantId());
@@ -74,7 +75,7 @@ public class DeviceController extends BaseController {
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/customer/{customerId}/device/{deviceId}", method = RequestMethod.POST)
- @ResponseBody
+ @ResponseBody
public Device assignDeviceToCustomer(@PathVariable("customerId") String strCustomerId,
@PathVariable("deviceId") String strDeviceId) throws ThingsboardException {
checkParameter("customerId", strCustomerId);
@@ -85,7 +86,7 @@ public class DeviceController extends BaseController {
DeviceId deviceId = new DeviceId(toUUID(strDeviceId));
checkDeviceId(deviceId);
-
+
return checkNotNull(deviceService.assignDeviceToCustomer(deviceId, customerId));
} catch (Exception e) {
throw handleException(e);
@@ -94,7 +95,7 @@ public class DeviceController extends BaseController {
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/customer/device/{deviceId}", method = RequestMethod.DELETE)
- @ResponseBody
+ @ResponseBody
public Device unassignDeviceFromCustomer(@PathVariable("deviceId") String strDeviceId) throws ThingsboardException {
checkParameter("deviceId", strDeviceId);
try {
@@ -125,19 +126,21 @@ public class DeviceController extends BaseController {
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/device/credentials", method = RequestMethod.POST)
- @ResponseBody
+ @ResponseBody
public DeviceCredentials saveDeviceCredentials(@RequestBody DeviceCredentials deviceCredentials) throws ThingsboardException {
checkNotNull(deviceCredentials);
try {
checkDeviceId(deviceCredentials.getDeviceId());
- return checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials));
+ DeviceCredentials result = checkNotNull(deviceCredentialsService.updateDeviceCredentials(deviceCredentials));
+ actorService.onCredentialsUpdate(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId());
+ return result;
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
- @RequestMapping(value = "/tenant/devices", params = { "limit" }, method = RequestMethod.GET)
+ @RequestMapping(value = "/tenant/devices", params = {"limit"}, method = RequestMethod.GET)
@ResponseBody
public TextPageData<Device> getTenantDevices(
@RequestParam int limit,
@@ -154,7 +157,7 @@ public class DeviceController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/customer/{customerId}/devices", params = { "limit" }, method = RequestMethod.GET)
+ @RequestMapping(value = "/customer/{customerId}/devices", params = {"limit"}, method = RequestMethod.GET)
@ResponseBody
public TextPageData<Device> getCustomerDevices(
@PathVariable("customerId") String strCustomerId,
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index aa7fcc1..0695413 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -29,7 +29,7 @@ server:
# Zookeeper connection parameters. Used for service discovery.
zk:
# Enable/disable zookeeper discovery service.
- enabled: "${ZOOKEEPER_ENABLED:false}"
+ enabled: "${ZOOKEEPER_ENABLED:true}"
# Zookeeper connect string
url: "${ZOOKEEPER_URL:localhost:2181}"
# Zookeeper retry interval in milliseconds
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java
new file mode 100644
index 0000000..3e96e40
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionCloseNotification.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.core;
+
+import lombok.ToString;
+import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
+import org.thingsboard.server.common.msg.session.ToDeviceMsg;
+
+@ToString
+public class SessionCloseNotification implements ToDeviceMsg {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isSuccess() {
+ return true;
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SESSION_CLOSE;
+ }
+
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java
new file mode 100644
index 0000000..d18dc9f
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/SessionOpenMsg.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.core;
+
+import org.thingsboard.server.common.msg.session.FromDeviceMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
+
+/**
+ * @author Andrew Shvayka
+ */
+public class SessionOpenMsg implements FromDeviceMsg {
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SESSION_OPEN;
+ }
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
index d188527..03b611e 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
@@ -21,11 +21,25 @@ import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
public class SessionCloseMsg implements SessionCtrlMsg {
private final SessionId sessionId;
+ private final boolean revoked;
private final boolean timeout;
- public SessionCloseMsg(SessionId sessionId, boolean timeout) {
+ public static SessionCloseMsg onError(SessionId sessionId) {
+ return new SessionCloseMsg(sessionId, false, false);
+ }
+
+ public static SessionCloseMsg onTimeout(SessionId sessionId) {
+ return new SessionCloseMsg(sessionId, false, true);
+ }
+
+ public static SessionCloseMsg onCredentialsRevoked(SessionId sessionId) {
+ return new SessionCloseMsg(sessionId, true, false);
+ }
+
+ private SessionCloseMsg(SessionId sessionId, boolean unauthorized, boolean timeout) {
super();
this.sessionId = sessionId;
+ this.revoked = unauthorized;
this.timeout = timeout;
}
@@ -34,6 +48,10 @@ public class SessionCloseMsg implements SessionCtrlMsg {
return sessionId;
}
+ public boolean isCredentialsRevoked() {
+ return revoked;
+ }
+
public boolean isTimeout() {
return timeout;
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java
index 1b91425..549a143 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/MsgType.java
@@ -28,7 +28,7 @@ public enum MsgType {
RULE_ENGINE_ERROR,
- SESSION_CLOSE;
+ SESSION_OPEN, SESSION_CLOSE;
private final boolean requiresRulesProcessing;
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java
new file mode 100644
index 0000000..0104824
--- /dev/null
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceCredentialsUpdateNotificationMsg.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.api.device;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.ToString;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.kv.AttributeKey;
+
+import java.util.Set;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Data
+public class DeviceCredentialsUpdateNotificationMsg implements ToDeviceActorNotificationMsg {
+
+ private final TenantId tenantId;
+ private final DeviceId deviceId;
+
+}
diff --git a/tools/src/main/resources/test.properties b/tools/src/main/resources/test.properties
new file mode 100644
index 0000000..6e9ed89
--- /dev/null
+++ b/tools/src/main/resources/test.properties
@@ -0,0 +1,5 @@
+restUrl=http://localhost:8080
+mqttUrls=tcp://localhost:1883
+deviceCount=1
+durationMs=60000
+iterationIntervalMs=1000
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
index a61f0bf..6f6e35a 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
+
@Slf4j
public class CoapSessionCtx extends DeviceAwareSessionContext {
@@ -87,6 +88,8 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
private void onSessionClose(SessionCloseMsg msg) {
if (msg.isTimeout()) {
exchange.respond(ResponseCode.SERVICE_UNAVAILABLE);
+ } else if (msg.isCredentialsRevoked()) {
+ exchange.respond(ResponseCode.UNAUTHORIZED);
} else {
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR);
}
@@ -120,7 +123,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
public void close() {
log.info("[{}] Closing processing context. Timeout: {}", sessionId, exchange.advanced().isTimedOut());
- processor.process(new SessionCloseMsg(sessionId, exchange.advanced().isTimedOut()));
+ processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId));
}
@Override
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index e1bb45c..b4d8108 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -210,7 +210,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processDisconnect(ChannelHandlerContext ctx) {
- processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false));
ctx.close();
}
@@ -255,6 +254,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
- processor.process(new SessionCloseMsg(sessionCtx.getSessionId(), false));
+ processor.process(SessionCloseMsg.onError(sessionCtx.getSessionId()));
}
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java
index 5cae9f5..f653682 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttSessionCtx.java
@@ -16,12 +16,13 @@
package org.thingsboard.server.transport.mqtt.session;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
import org.thingsboard.server.common.msg.session.SessionType;
+import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.msg.session.ex.SessionException;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -75,7 +76,10 @@ public class MqttSessionCtx extends DeviceAwareSessionContext {
@Override
public void onMsg(SessionCtrlMsg msg) throws SessionException {
-
+ if (msg instanceof SessionCloseMsg) {
+ pushToNetwork(new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)));
+ channel.close();
+ }
}
@Override