thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 33(+15 -18)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 15(+13 -2)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java 4(+3 -1)
application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java 6(+3 -3)
application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java 5(+0 -5)
application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java 8(+0 -8)
common/message/src/main/java/org/thingsboard/server/common/msg/core/ActorSystemToDeviceSessionActorMsg.java 2(+1 -1)
common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java 6(+3 -3)
common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java 4(+2 -2)
common/message/src/main/java/org/thingsboard/server/common/msg/session/BasicTransportToDeviceSessionActorMsg.java 18(+8 -10)
common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java 5(+5 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/session/TransportToDeviceSessionActorMsg.java 3(+2 -1)
pom.xml 2(+1 -1)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java 4(+1 -3)
transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java 4(+2 -2)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 3345f81..2f5e20d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
-import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.thingsboard.server.actors.ActorSystemContext;
@@ -47,7 +46,7 @@ import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
-import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.GetAttributesRequest;
import org.thingsboard.server.common.msg.core.RuleEngineError;
import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
@@ -57,13 +56,12 @@ import org.thingsboard.server.common.msg.core.SessionOpenMsg;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
-import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
@@ -74,7 +72,6 @@ import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotific
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
-import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
@@ -156,7 +153,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
boolean sent = rpcSubscriptions.size() > 0;
Set<SessionId> syncSessionSet = new HashSet<>();
rpcSubscriptions.entrySet().forEach(sub -> {
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sub.getKey());
+ ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
sendMsgToSessionActor(response, sub.getValue().getServer());
if (SessionType.SYNC == sub.getValue().getType()) {
syncSessionSet.add(sub.getKey());
@@ -198,7 +195,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (data != null) {
logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT);
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
}
}
@@ -210,7 +207,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
if (remainingAcks == 0) {
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
}
}
}
@@ -248,7 +245,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
body.getMethod(),
body.getParams()
);
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId);
+ ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
sendMsgToSessionActor(response, server);
};
}
@@ -302,14 +299,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
}
@Override
public void onFailure(Throwable t) {
if (t instanceof Exception) {
ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
} else {
logger.error("[{}] Failed to process attributes request", deviceId, t);
}
@@ -391,14 +388,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (data != null) {
logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
}
}
void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
if (data != null) {
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
}
}
@@ -409,7 +406,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
pendingMsgs.put(tbMsg.getId(), pendingMsgData);
scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
} else {
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
+ ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
}
context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
@@ -436,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (notification != null) {
ToDeviceMsg finalNotification = notification;
attributeSubscriptions.entrySet().forEach(sub -> {
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(finalNotification, sub.getKey());
+ ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
sendMsgToSessionActor(response, sub.getValue().getServer());
});
}
@@ -462,7 +459,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
BasicCommandAckResponse response = success
? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
: BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
}
}
}
@@ -517,7 +514,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void sendMsgToSessionActor(ToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
+ private void sendMsgToSessionActor(ActorSystemToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
if (sessionAddress.isPresent()) {
ServerAddress address = sessionAddress.get();
logger.debug("{} Forwarding msg: {}", address, response);
@@ -530,7 +527,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processCredentialsUpdate() {
sessions.forEach((k, v) -> {
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
+ sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
});
attributeSubscriptions.clear();
rpcSubscriptions.clear();
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index f8bc94f..85d1b54 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -43,6 +43,7 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.service.script.NashornJsEngine;
import scala.concurrent.duration.Duration;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -63,15 +64,24 @@ class DefaultTbContext implements TbContext {
@Override
public void tellNext(TbMsg msg, String relationType) {
- tellNext(msg, relationType, null);
+ tellNext(msg, Collections.singleton(relationType), null);
+ }
+
+ @Override
+ public void tellNext(TbMsg msg, Set<String> relationTypes) {
+ tellNext(msg, relationTypes, null);
}
@Override
public void tellNext(TbMsg msg, String relationType, Throwable th) {
+ tellNext(msg, Collections.singleton(relationType), th);
+ }
+
+ private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
- mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th);
+ relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
- nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
+ nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg), nodeCtx.getSelfActor());
}
@Override
@@ -118,12 +128,6 @@ class DefaultTbContext implements TbContext {
}
@Override
- public void tellNext(TbMsg msg, Set<String> relationTypes) {
- //TODO: fix this to send set of relations instead of loop.
- relationTypes.forEach(type -> tellNext(msg, type));
- }
-
- @Override
public ListeningExecutor getJsExecutor() {
return mainCtx.getJsExecutor();
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 4812002..3ba646a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -54,6 +54,8 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
break;
+ case CLUSTER_EVENT_MSG:
+ break;
default:
return false;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 38a2ab7..ac902a7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -206,9 +206,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
checkActive();
RuleNodeId originator = envelope.getOriginator();
- String targetRelationType = envelope.getRelationType();
List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
- .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
+ .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
.collect(Collectors.toList());
TbMsg msg = envelope.getMsg();
@@ -237,6 +236,18 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
}
+ private boolean contains(Set<String> relationTypes, String type) {
+ if (relationTypes == null) {
+ return true;
+ }
+ for (String relationType : relationTypes) {
+ if (relationType.equalsIgnoreCase(type)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target, String fromRelationType) {
RuleChainId targetRCId = new RuleChainId(target.getId());
TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
index 054284d..c0a475c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
+import java.util.Set;
+
/**
* Created by ashvayka on 19.03.18.
*/
@@ -28,7 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg;
final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
private final RuleNodeId originator;
- private final String relationType;
+ private final Set<String> relationTypes;
private final TbMsg msg;
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 1d9c671..3624127 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -39,8 +39,12 @@ public abstract class ContextAwareActor extends UntypedActor {
logger.debug("Processing msg: {}", msg);
}
if (msg instanceof TbActorMsg) {
- if(!process((TbActorMsg) msg)){
- logger.warning("Unknown message: {}!", msg);
+ try {
+ if (!process((TbActorMsg) msg)) {
+ logger.warning("Unknown message: {}!", msg);
+ }
+ } catch (Exception e) {
+ throw e;
}
} else {
logger.warning("Unknown message: {}!", msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 56f3228..c80e913 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -211,6 +211,10 @@ public class DefaultActorService implements ActorService {
@Override
public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
+ log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
+ if(log.isDebugEnabled()){
+ log.info("MSG: ", msg);
+ }
switch (msg.getMessageType()) {
case CLUSTER_ACTOR_MESSAGE:
java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
index fab51a5..42e127e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
@@ -45,7 +45,7 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
this.sessionId = sessionId;
}
- protected abstract void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg);
+ protected abstract void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg);
protected abstract void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg);
@@ -63,12 +63,12 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
protected void cleanupSession(ActorContext ctx) {
}
- protected void updateSessionCtx(ToDeviceActorSessionMsg msg, SessionType type) {
+ protected void updateSessionCtx(TransportToDeviceSessionActorMsg msg, SessionType type) {
sessionCtx = msg.getSessionMsg().getSessionContext();
deviceToDeviceActorMsgPrototype = new BasicDeviceToDeviceActorMsg(msg, type);
}
- protected DeviceToDeviceActorMsg toDeviceMsg(ToDeviceActorSessionMsg msg) {
+ protected DeviceToDeviceActorMsg toDeviceMsg(TransportToDeviceSessionActorMsg msg) {
AdaptorToSessionActorMsg adaptorMsg = msg.getSessionMsg();
return new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, adaptorMsg.getMsg());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
index fa5287f..83d56e0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
@@ -46,7 +46,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
}
@Override
- protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
+ protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
updateSessionCtx(msg, SessionType.ASYNC);
if (firstMsg) {
toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
index 9d324c5..f67d46b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
-import akka.japi.Function;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -25,8 +24,8 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
import org.thingsboard.server.common.msg.session.SessionMsg;
import org.thingsboard.server.common.msg.session.SessionType;
@@ -63,38 +62,37 @@ public class SessionActor extends ContextAwareActor {
@Override
protected boolean process(TbActorMsg msg) {
- //TODO Move everything here, to work with TbActorMsg
- return false;
- }
-
- @Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("[{}] Processing: {}.", sessionId, msg);
- if (msg instanceof ToDeviceActorSessionMsg) {
- processDeviceMsg((ToDeviceActorSessionMsg) msg);
- } else if (msg instanceof ToDeviceSessionActorMsg) {
- processToDeviceMsg((ToDeviceSessionActorMsg) msg);
- } else if (msg instanceof SessionTimeoutMsg) {
- processTimeoutMsg((SessionTimeoutMsg) msg);
- } else if (msg instanceof SessionCtrlMsg) {
- processSessionCtrlMsg((SessionCtrlMsg) msg);
- } else if (msg instanceof ClusterEventMsg) {
- processClusterEvent((ClusterEventMsg) msg);
- } else {
- logger.warning("[{}] Unknown msg: {}", sessionId, msg);
+ switch (msg.getMsgType()) {
+ case TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG:
+ processTransportToSessionMsg((TransportToDeviceSessionActorMsg) msg);
+ break;
+ case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
+ processActorsToSessionMsg((ActorSystemToDeviceSessionActorMsg) msg);
+ break;
+ case SESSION_TIMEOUT_MSG:
+ processTimeoutMsg((SessionTimeoutMsg) msg);
+ break;
+ case SESSION_CTRL_MSG:
+ processSessionCloseMsg((SessionCtrlMsg) msg);
+ break;
+ case CLUSTER_EVENT_MSG:
+ processClusterEvent((ClusterEventMsg) msg);
+ break;
+ default: return false;
}
+ return true;
}
private void processClusterEvent(ClusterEventMsg msg) {
processor.processClusterEvent(context(), msg);
}
- private void processDeviceMsg(ToDeviceActorSessionMsg msg) {
+ private void processTransportToSessionMsg(TransportToDeviceSessionActorMsg msg) {
initProcessor(msg);
processor.processToDeviceActorMsg(context(), msg);
}
- private void processToDeviceMsg(ToDeviceSessionActorMsg msg) {
+ private void processActorsToSessionMsg(ActorSystemToDeviceSessionActorMsg msg) {
processor.processToDeviceMsg(context(), msg.getMsg());
}
@@ -106,7 +104,7 @@ public class SessionActor extends ContextAwareActor {
}
}
- private void processSessionCtrlMsg(SessionCtrlMsg msg) {
+ private void processSessionCloseMsg(SessionCtrlMsg msg) {
if (processor != null) {
processor.processSessionCtrlMsg(context(), msg);
} else if (msg instanceof SessionCloseMsg) {
@@ -116,7 +114,7 @@ public class SessionActor extends ContextAwareActor {
}
}
- private void initProcessor(ToDeviceActorSessionMsg msg) {
+ private void initProcessor(TransportToDeviceSessionActorMsg msg) {
if (processor == null) {
SessionMsg sessionMsg = (SessionMsg) msg.getSessionMsg();
if (sessionMsg.getSessionContext().getSessionType() == SessionType.SYNC) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
index b5b1791..1f8bb6d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
import akka.actor.*;
import org.thingsboard.server.actors.ActorSystemContext;
@@ -33,8 +32,9 @@ import akka.event.Logging;
import akka.event.LoggingAdapter;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.core.SessionCloseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
public class SessionManagerActor extends ContextAwareActor {
@@ -104,7 +104,7 @@ public class SessionManagerActor extends ContextAwareActor {
}
private void forwardToSessionActor(SessionAwareMsg msg) {
- if (msg instanceof ToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
+ if (msg instanceof ActorSystemToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
String sessionIdStr = msg.getSessionId().toUidStr();
ActorRef sessionActor = sessionActors.get(sessionIdStr);
if (sessionActor != null) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
index 6f71d79..a0f0a88 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
@@ -22,7 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.*;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.msg.session.ex.SessionException;
@@ -41,7 +41,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
}
@Override
- protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
+ protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
updateSessionCtx(msg, SessionType.SYNC);
pendingMsg = toDeviceMsg(msg);
pendingResponse = true;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
index 7d6dbca..d015fe0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
@@ -17,13 +17,20 @@ package org.thingsboard.server.actors.shared;
import lombok.Data;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
import java.io.Serializable;
@Data
-public class SessionTimeoutMsg implements Serializable {
+public class SessionTimeoutMsg implements Serializable, TbActorMsg {
private static final long serialVersionUID = 1L;
private final SessionId sessionId;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SESSION_TIMEOUT_MSG;
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
index 4067797..8ab6f99 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
@@ -107,7 +107,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
@Override
public void onServerAdded(ServerInstance server) {
- log.debug("On server added event: {}", server);
+ log.info("On server added event: {}", server);
addNode(server);
logCircle();
}
@@ -119,7 +119,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
@Override
public void onServerRemoved(ServerInstance server) {
- log.debug("On server removed event: {}", server);
+ log.info("On server removed event: {}", server);
removeNode(server);
logCircle();
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index b73a6f5..9236219 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -22,16 +22,11 @@ import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.util.SerializationUtils;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 0dc324b..de29b89 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -19,15 +19,7 @@ import io.grpc.stub.StreamObserver;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import java.util.UUID;
diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml
index 1779912..978a570 100644
--- a/application/src/main/resources/logback.xml
+++ b/application/src/main/resources/logback.xml
@@ -25,7 +25,7 @@
</encoder>
</appender>
- <logger name="org.thingsboard.server" level="TRACE" />
+ <logger name="org.thingsboard.server" level="INFO" />
<logger name="akka" level="INFO" />
<root level="INFO">
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
index 1da19cb..92c5105 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
@@ -24,7 +24,7 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.SessionType;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
import java.util.Optional;
@@ -45,7 +45,7 @@ public class BasicDeviceToDeviceActorMsg implements DeviceToDeviceActorMsg {
this(null, other.getTenantId(), other.getCustomerId(), other.getDeviceId(), other.getSessionId(), other.getSessionType(), msg);
}
- public BasicDeviceToDeviceActorMsg(ToDeviceActorSessionMsg msg, SessionType sessionType) {
+ public BasicDeviceToDeviceActorMsg(TransportToDeviceSessionActorMsg msg, SessionType sessionType) {
this(null, msg.getTenantId(), msg.getCustomerId(), msg.getDeviceId(), msg.getSessionId(), sessionType, msg.getSessionMsg().getMsg());
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index c7a96db..7702788 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -101,6 +101,6 @@ public enum MsgType {
/**
* Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
*/
- RULE_ENGINE_QUEUE_PUT_ACK_MSG;
+ RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG;
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
index a24bdcd..49fad13 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.common.msg.session.ctrl;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
public class SessionCloseMsg implements SessionCtrlMsg {
@@ -60,4 +61,8 @@ public class SessionCloseMsg implements SessionCtrlMsg {
return timeout;
}
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SESSION_CTRL_MSG;
+ }
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
index 19ca219..8082f72 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
@@ -15,8 +15,9 @@
*/
package org.thingsboard.server.common.msg.session;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
-public interface SessionCtrlMsg extends SessionAwareMsg {
+public interface SessionCtrlMsg extends SessionAwareMsg, TbActorMsg {
}
pom.xml 2(+1 -1)
diff --git a/pom.xml b/pom.xml
index d6a0bef..dfccb7d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -59,7 +59,7 @@
<velocity.version>1.7</velocity.version>
<velocity-tools.version>2.0</velocity-tools.version>
<mail.version>1.4.3</mail.version>
- <curator.version>2.11.0</curator.version>
+ <curator.version>4.0.1</curator.version>
<protobuf.version>3.0.2</protobuf.version>
<grpc.version>1.12.0</grpc.version>
<lombok.version>1.16.18</lombok.version>
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 7674549..bac68ff 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -38,8 +38,6 @@ import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils;
@Slf4j
@@ -186,7 +184,7 @@ public class CoapTransportResource extends CoapResource {
throw new IllegalArgumentException("Unsupported msg type: " + type);
}
log.trace("Processing msg: {}", msg);
- processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
} catch (AdaptorException e) {
log.debug("Failed to decode payload {}", e);
exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage());
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index 60b2220..320f06e 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -108,8 +108,8 @@ public class CoapServerTest {
@Override
public void process(SessionAwareMsg toActorMsg) {
- if (toActorMsg instanceof ToDeviceActorSessionMsg) {
- AdaptorToSessionActorMsg sessionMsg = ((ToDeviceActorSessionMsg) toActorMsg).getSessionMsg();
+ if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
+ AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
try {
FromDeviceMsg deviceMsg = sessionMsg.getMsg();
ToDeviceMsg toDeviceMsg = null;
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index 4d90b5f..930b442 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@@ -219,7 +219,7 @@ public class DeviceApiController {
private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
- processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
}
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 5ccce35..0b38817 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.data.security.DeviceX509Credentials;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -207,7 +207,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
}
if (msg != null) {
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else {
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close();
@@ -227,11 +227,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
@@ -261,10 +261,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
} else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
deviceSessionCtx.setDisallowAttributeResponses();
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 2056452..f666bb8 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -96,8 +96,8 @@ public class GatewaySessionCtx {
GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
devices.put(deviceName, ctx);
log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
- processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
+ processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
}
}
@@ -136,7 +136,7 @@ public class GatewaySessionCtx {
JsonConverter.parseWithTs(request, element.getAsJsonObject());
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
@@ -152,7 +152,7 @@ public class GatewaySessionCtx {
Integer requestId = jsonObj.get("id").getAsInt();
String data = jsonObj.get("data").toString();
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@@ -174,7 +174,7 @@ public class GatewaySessionCtx {
JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
@@ -207,7 +207,7 @@ public class GatewaySessionCtx {
request = new BasicGetAttributesRequest(requestId, null, keys);
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
- processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+ processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
ack(msg);
} else {