thingsboard-developers

transaction cluster mode init

12/3/2018 3:28:52 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 50949c1..e32e622 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -56,12 +56,8 @@ import scala.concurrent.duration.Duration;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-
-import java.util.Arrays;
-import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
 
@@ -235,6 +231,9 @@ public class DefaultActorService implements ActorService {
             case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
                 actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
                 break;
+            case CLUSTER_TRANSACTION_SERVICE_MESSAGE:
+                actorContext.getRuleChainTransactionService().onRemoteTransactionMsg(serverAddress, msg.getPayload().toByteArray());
+                break;
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
index dab9e69..28c92eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
@@ -19,8 +19,6 @@ import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 
-import java.util.Optional;
-
 /**
  * Created by ashvayka on 01.05.18.
  */
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
index f09a0f9..d0eb713 100644
--- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -15,17 +15,29 @@
  */
 package org.thingsboard.server.service.transaction;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.AssetId;
+import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+import java.util.Optional;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -42,6 +54,12 @@ import java.util.function.Consumer;
 @Slf4j
 public class BaseRuleChainTransactionService implements RuleChainTransactionService {
 
+    @Autowired
+    private ClusterRoutingService routingService;
+
+    @Autowired
+    private ClusterRpcService clusterRpcService;
+
     @Value("${actors.rule.transaction.queue_size}")
     private int finalQueueSize;
     @Value("${actors.rule.transaction.duration}")
@@ -52,10 +70,12 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
     private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
 
     private ExecutorService timeoutExecutor;
+    private ExecutorService executor;
 
     @PostConstruct
     public void init() {
         timeoutExecutor = Executors.newSingleThreadExecutor();
+        executor = Executors.newSingleThreadExecutor();
         executeOnTimeout();
     }
 
@@ -64,6 +84,9 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
         if (timeoutExecutor != null) {
             timeoutExecutor.shutdownNow();
         }
+        if (executor != null) {
+            executor.shutdownNow();
+        }
     }
 
     @Override
@@ -126,7 +149,7 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
     }
 
     private void executeOnTimeout() {
-        timeoutExecutor.execute(() -> {
+        timeoutExecutor.submit(() -> {
             while (true) {
                 TbTransactionTask task = timeoutQueue.peek();
                 if (task != null) {
@@ -171,4 +194,45 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
         task.setExpirationTime(System.currentTimeMillis() + duration);
         task.getOnStart().accept(task.getMsg());
     }
+
+    @Override
+    public void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.TransactionServiceMsgProto proto;
+        try {
+            proto = ClusterAPIProtos.TransactionServiceMsgProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
+
+        String entityTypeStr = proto.getEntityType();
+        EntityId entityId;
+        if (entityTypeStr.equals(EntityType.ASSET.name())) {
+            entityId = new AssetId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
+        } else {
+            entityId = new DeviceId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
+        }
+        onTransactionEnd(tenantId, entityId);
+    }
+
+    @Override
+    public void onTransactionEnd(TenantId tenantId, EntityId entityId) {
+        executor.submit(() -> onTransactionEndSync(tenantId, entityId));
+    }
+
+    private void onTransactionEndSync(TenantId tenantId, EntityId entityId) {
+        Optional<ServerAddress> address = routingService.resolveById(entityId);
+        address.ifPresent(serverAddress -> sendTransactionEvent(tenantId, entityId, serverAddress));
+    }
+
+    private void sendTransactionEvent(TenantId tenantId, EntityId entityId, ServerAddress address) {
+        log.trace("[{}][{}] Originator is monitored on other server: {}", tenantId, entityId, address);
+        ClusterAPIProtos.TransactionServiceMsgProto.Builder builder = ClusterAPIProtos.TransactionServiceMsgProto.newBuilder();
+        builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
+        builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
+        builder.setEntityType(entityId.getEntityType().name());
+        builder.setOriginatorIdMSB(entityId.getId().getMostSignificantBits());
+        builder.setOriginatorIdLSB(entityId.getId().getLeastSignificantBits());
+        clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TRANSACTION_SERVICE_MESSAGE, builder.build().toByteArray());
+    }
 }
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 01e9323..343de84 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -60,6 +60,7 @@ enum MessageType {
   CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
 
   CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13;
+  CLUSTER_TRANSACTION_SERVICE_MESSAGE = 14;
 }
 
 // Messages related to CLUSTER_TELEMETRY_MESSAGE
@@ -142,3 +143,11 @@ message DeviceStateServiceMsgProto {
     bool updated = 6;
     bool deleted = 7;
 }
+
+message TransactionServiceMsgProto {
+    int64 tenantIdMSB = 1;
+    int64 tenantIdLSB = 2;
+    string entityType = 3;
+    int64 originatorIdMSB = 4;
+    int64 originatorIdLSB = 5;
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
index 1e34997..51aebea 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
@@ -15,9 +15,11 @@
  */
 package org.thingsboard.rule.engine.api;
 
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 
-import java.util.concurrent.Callable;
 import java.util.function.Consumer;
 
 public interface RuleChainTransactionService {
@@ -26,4 +28,8 @@ public interface RuleChainTransactionService {
 
     boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure);
 
+    void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes);
+
+    void onTransactionEnd(TenantId tenantId, EntityId entityId);
+
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
index 1bcd58b..14d2e20 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
@@ -51,6 +51,7 @@ public class TbTransactionEndNode implements TbNode {
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
+        ctx.getRuleChainTransactionService().onTransactionEnd(ctx.getTenantId(), msg.getTransactionData().getOriginatorId());
         boolean isFailed = ctx.getRuleChainTransactionService().endTransaction(ctx, msg, throwable -> ctx.tellFailure(msg, throwable));
         if (!isFailed) {
             ctx.tellNext(msg, SUCCESS);