thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RemoteToRuleChainTellNextMsg.java 45(+45 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 30(+26 -4)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java 8(+7 -1)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java 2(+1 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index d453e59..8e7a9ae 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -111,6 +111,7 @@ public class AppActor extends RuleChainManagerActor {
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
+ case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg);
break;
case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RemoteToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RemoteToRuleChainTellNextMsg.java
new file mode 100644
index 0000000..5c1769a
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RemoteToRuleChainTellNextMsg.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
+import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RemoteToRuleChainTellNextMsg extends RuleNodeToRuleChainTellNextMsg implements TenantAwareMsg, RuleChainAwareMsg {
+
+ private final TenantId tenantId;
+ private final RuleChainId ruleChainId;
+
+ public RemoteToRuleChainTellNextMsg(RuleNodeToRuleChainTellNextMsg original, TenantId tenantId, RuleChainId ruleChainId) {
+ super(original.getOriginator(), original.getRelationTypes(), original.getMsg());
+ this.tenantId = tenantId;
+ this.ruleChainId = ruleChainId;
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG;
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 3ba646a..c1a55fb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -49,6 +49,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
processor.onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg);
break;
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
+ case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
break;
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 7d560db..cae377c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -20,6 +20,9 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.LoggingAdapter;
import com.datastax.driver.core.utils.UUIDs;
+
+import java.util.Optional;
+
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
@@ -37,6 +40,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.rule.RuleChainService;
@@ -217,12 +221,30 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
checkActive();
- RuleNodeId originator = envelope.getOriginator();
- List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
- .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
- .collect(Collectors.toList());
+ TbMsg msg = envelope.getMsg();
+ EntityId originatorEntityId = msg.getOriginator();
+ Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(originatorEntityId);
+
+ if (address.isPresent()) {
+ onRemoteTellNext(address.get(), envelope);
+ } else {
+ onLocalTellNext(envelope);
+ }
+ }
+ private void onRemoteTellNext(ServerAddress serverAddress, RuleNodeToRuleChainTellNextMsg envelope) {
TbMsg msg = envelope.getMsg();
+ logger.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
+ envelope = new RemoteToRuleChainTellNextMsg(envelope, tenantId, entityId);
+ systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(serverAddress, envelope));
+ }
+
+ private void onLocalTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
+ TbMsg msg = envelope.getMsg();
+ RuleNodeId originatorNodeId = envelope.getOriginator();
+ List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
+ .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
+ .collect(Collectors.toList());
int relationsCount = relations.size();
EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
if (relationsCount == 0) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
index 8b13747..ae4ae45 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
@@ -20,12 +20,13 @@ import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
-public final class RuleChainToRuleChainMsg implements TbActorMsg {
+public final class RuleChainToRuleChainMsg implements TbActorMsg, RuleChainAwareMsg {
private final RuleChainId target;
private final RuleChainId source;
@@ -34,6 +35,11 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg {
private final boolean enqueue;
@Override
+ public RuleChainId getRuleChainId() {
+ return target;
+ }
+
+ @Override
public MsgType getMsgType() {
return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
index c0a475c..9414892 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -27,7 +27,7 @@ import java.util.Set;
* Created by ashvayka on 19.03.18.
*/
@Data
-final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
+class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
private final RuleNodeId originator;
private final Set<String> relationTypes;
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index dc48e88..b4ab0d2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
+import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import scala.concurrent.duration.Duration;
@@ -94,7 +95,8 @@ public class TenantActor extends RuleChainManagerActor {
onToDeviceActorMsg((DeviceAwareMsg) msg);
break;
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
- onRuleChainMsg((RuleChainToRuleChainMsg) msg);
+ case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
+ onRuleChainMsg((RuleChainAwareMsg) msg);
break;
default:
return false;
@@ -120,8 +122,8 @@ public class TenantActor extends RuleChainManagerActor {
else logger.info("[{}] No Root Chain", msg);
}
- private void onRuleChainMsg(RuleChainToRuleChainMsg msg) {
- ruleChainManager.getOrCreateActor(context(), msg.getTarget()).tell(msg, self());
+ private void onRuleChainMsg(RuleChainAwareMsg msg) {
+ ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java
new file mode 100644
index 0000000..e261cbb
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.aware;
+
+import org.thingsboard.server.common.data.id.RuleChainId;
+
+public interface RuleChainAwareMsg {
+
+ RuleChainId getRuleChainId();
+
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 7702788..c8b5c4e 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -63,6 +63,11 @@ public enum MsgType {
RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
/**
+ * Message forwarded from original rule chain to remote rule chain due to change in the cluster structure or originator entity of the TbMsg.
+ */
+ REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG,
+
+ /**
* Message that is sent by RuleActor implementation to RuleActor itself to log the error.
*/
RULE_TO_SELF_ERROR_MSG,
@@ -101,6 +106,10 @@ public enum MsgType {
/**
* Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
*/
- RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG;
+ RULE_ENGINE_QUEUE_PUT_ACK_MSG,
+ ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
+ TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
+ SESSION_TIMEOUT_MSG,
+ SESSION_CTRL_MSG;
}