thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java 66(+65 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 50949c1..e32e622 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -56,12 +56,8 @@ import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-
-import java.util.Arrays;
-import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
@@ -235,6 +231,9 @@ public class DefaultActorService implements ActorService {
case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
break;
+ case CLUSTER_TRANSACTION_SERVICE_MESSAGE:
+ actorContext.getRuleChainTransactionService().onRemoteTransactionMsg(serverAddress, msg.getPayload().toByteArray());
+ break;
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
index dab9e69..28c92eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
@@ -19,8 +19,6 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import java.util.Optional;
-
/**
* Created by ashvayka on 01.05.18.
*/
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
index f09a0f9..d0eb713 100644
--- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -15,17 +15,29 @@
*/
package org.thingsboard.server.service.transaction;
+import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.AssetId;
+import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import java.util.Optional;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -42,6 +54,12 @@ import java.util.function.Consumer;
@Slf4j
public class BaseRuleChainTransactionService implements RuleChainTransactionService {
+ @Autowired
+ private ClusterRoutingService routingService;
+
+ @Autowired
+ private ClusterRpcService clusterRpcService;
+
@Value("${actors.rule.transaction.queue_size}")
private int finalQueueSize;
@Value("${actors.rule.transaction.duration}")
@@ -52,10 +70,12 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
private ExecutorService timeoutExecutor;
+ private ExecutorService executor;
@PostConstruct
public void init() {
timeoutExecutor = Executors.newSingleThreadExecutor();
+ executor = Executors.newSingleThreadExecutor();
executeOnTimeout();
}
@@ -64,6 +84,9 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
if (timeoutExecutor != null) {
timeoutExecutor.shutdownNow();
}
+ if (executor != null) {
+ executor.shutdownNow();
+ }
}
@Override
@@ -126,7 +149,7 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
}
private void executeOnTimeout() {
- timeoutExecutor.execute(() -> {
+ timeoutExecutor.submit(() -> {
while (true) {
TbTransactionTask task = timeoutQueue.peek();
if (task != null) {
@@ -171,4 +194,45 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
task.setExpirationTime(System.currentTimeMillis() + duration);
task.getOnStart().accept(task.getMsg());
}
+
+ @Override
+ public void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.TransactionServiceMsgProto proto;
+ try {
+ proto = ClusterAPIProtos.TransactionServiceMsgProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
+
+ String entityTypeStr = proto.getEntityType();
+ EntityId entityId;
+ if (entityTypeStr.equals(EntityType.ASSET.name())) {
+ entityId = new AssetId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
+ } else {
+ entityId = new DeviceId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
+ }
+ onTransactionEnd(tenantId, entityId);
+ }
+
+ @Override
+ public void onTransactionEnd(TenantId tenantId, EntityId entityId) {
+ executor.submit(() -> onTransactionEndSync(tenantId, entityId));
+ }
+
+ private void onTransactionEndSync(TenantId tenantId, EntityId entityId) {
+ Optional<ServerAddress> address = routingService.resolveById(entityId);
+ address.ifPresent(serverAddress -> sendTransactionEvent(tenantId, entityId, serverAddress));
+ }
+
+ private void sendTransactionEvent(TenantId tenantId, EntityId entityId, ServerAddress address) {
+ log.trace("[{}][{}] Originator is monitored on other server: {}", tenantId, entityId, address);
+ ClusterAPIProtos.TransactionServiceMsgProto.Builder builder = ClusterAPIProtos.TransactionServiceMsgProto.newBuilder();
+ builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
+ builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
+ builder.setEntityType(entityId.getEntityType().name());
+ builder.setOriginatorIdMSB(entityId.getId().getMostSignificantBits());
+ builder.setOriginatorIdLSB(entityId.getId().getLeastSignificantBits());
+ clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TRANSACTION_SERVICE_MESSAGE, builder.build().toByteArray());
+ }
}
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 01e9323..343de84 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -60,6 +60,7 @@ enum MessageType {
CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13;
+ CLUSTER_TRANSACTION_SERVICE_MESSAGE = 14;
}
// Messages related to CLUSTER_TELEMETRY_MESSAGE
@@ -142,3 +143,11 @@ message DeviceStateServiceMsgProto {
bool updated = 6;
bool deleted = 7;
}
+
+message TransactionServiceMsgProto {
+ int64 tenantIdMSB = 1;
+ int64 tenantIdLSB = 2;
+ string entityType = 3;
+ int64 originatorIdMSB = 4;
+ int64 originatorIdLSB = 5;
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
index 1e34997..51aebea 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
@@ -15,9 +15,11 @@
*/
package org.thingsboard.rule.engine.api;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import java.util.concurrent.Callable;
import java.util.function.Consumer;
public interface RuleChainTransactionService {
@@ -26,4 +28,8 @@ public interface RuleChainTransactionService {
boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure);
+ void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes);
+
+ void onTransactionEnd(TenantId tenantId, EntityId entityId);
+
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
index 1bcd58b..14d2e20 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
@@ -51,6 +51,7 @@ public class TbTransactionEndNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
+ ctx.getRuleChainTransactionService().onTransactionEnd(ctx.getTenantId(), msg.getTransactionData().getOriginatorId());
boolean isFailed = ctx.getRuleChainTransactionService().endTransaction(ctx, msg, throwable -> ctx.tellFailure(msg, throwable));
if (!isFailed) {
ctx.tellNext(msg, SUCCESS);