thingsboard-memoizeit

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 6ed400a..a2e0aed 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 import org.thingsboard.rule.engine.api.MailService;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.Event;
@@ -222,6 +223,11 @@ public class ActorSystemContext {
     @Getter
     private RuleEngineTransportService ruleEngineTransportService;
 
+    @Lazy
+    @Autowired
+    @Getter
+    private RuleChainTransactionService ruleChainTransactionService;
+
     @Value("${cluster.partition_id}")
     @Getter
     private long queuePartitionId;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 27a766e..78dc1f5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -20,6 +20,7 @@ import com.datastax.driver.core.utils.UUIDs;
 import org.springframework.util.StringUtils;
 import org.thingsboard.rule.engine.api.ListeningExecutor;
 import org.thingsboard.rule.engine.api.MailService;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
 import org.thingsboard.rule.engine.api.RuleEngineRpcService;
@@ -124,7 +125,7 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
-        return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId());
+        return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(), data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId());
     }
 
     @Override
@@ -233,6 +234,11 @@ class DefaultTbContext implements TbContext {
     }
 
     @Override
+    public RuleChainTransactionService getRuleChainTransactionService() {
+        return mainCtx.getRuleChainTransactionService();
+    }
+
+    @Override
     public MailService getMailService() {
         if (mainCtx.isAllowSystemMailService()) {
             return mainCtx.getMailService();
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 50949c1..e32e622 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -56,12 +56,8 @@ import scala.concurrent.duration.Duration;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-
-import java.util.Arrays;
-import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
 
@@ -235,6 +231,9 @@ public class DefaultActorService implements ActorService {
             case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
                 actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
                 break;
+            case CLUSTER_TRANSACTION_SERVICE_MESSAGE:
+                actorContext.getRuleChainTransactionService().onRemoteTransactionMsg(serverAddress, msg.getPayload().toByteArray());
+                break;
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
index dab9e69..28c92eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
@@ -19,8 +19,6 @@ import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 
-import java.util.Optional;
-
 /**
  * Created by ashvayka on 01.05.18.
  */
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
new file mode 100644
index 0000000..d7aa766
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -0,0 +1,255 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transaction;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.executors.DbCallbackExecutorService;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+@Service
+@Slf4j
+public class BaseRuleChainTransactionService implements RuleChainTransactionService {
+
+    @Autowired
+    private ClusterRoutingService routingService;
+
+    @Autowired
+    private ClusterRpcService clusterRpcService;
+
+    @Autowired
+    private DbCallbackExecutorService callbackExecutor;
+
+    @Value("${actors.rule.transaction.queue_size}")
+    private int finalQueueSize;
+    @Value("${actors.rule.transaction.duration}")
+    private long duration;
+
+    private final Lock transactionLock = new ReentrantLock();
+    private final ConcurrentMap<EntityId, BlockingQueue<TbTransactionTask>> transactionMap = new ConcurrentHashMap<>();
+    private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
+
+    private ExecutorService timeoutExecutor;
+
+    @PostConstruct
+    public void init() {
+        timeoutExecutor = Executors.newSingleThreadExecutor();
+        executeOnTimeout();
+    }
+
+    @PreDestroy
+    public void destroy() {
+        if (timeoutExecutor != null) {
+            timeoutExecutor.shutdownNow();
+        }
+    }
+
+    @Override
+    public void beginTransaction(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
+        transactionLock.lock();
+        try {
+            BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
+                    new LinkedBlockingQueue<>(finalQueueSize));
+
+            TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration);
+            int queueSize = queue.size();
+            if (queueSize >= finalQueueSize) {
+                executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!");
+            } else {
+                addMsgToQueues(queue, transactionTask);
+                if (queueSize == 0) {
+                    executeOnSuccess(transactionTask.getOnStart(), transactionTask.getMsg());
+                } else {
+                    log.trace("Msg [{}][{}] is waiting to start transaction!", msg.getId(), msg.getType());
+                }
+            }
+        } finally {
+            transactionLock.unlock();
+        }
+    }
+
+    @Override
+    public void endTransaction(TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure) {
+        EntityId originatorId = msg.getTransactionData().getOriginatorId();
+        UUID transactionId = msg.getTransactionData().getTransactionId();
+
+        Optional<ServerAddress> address = routingService.resolveById(originatorId);
+        if (address.isPresent()) {
+            sendTransactionEventToRemoteServer(originatorId, transactionId, address.get());
+            executeOnSuccess(onSuccess, msg);
+        } else {
+            endLocalTransaction(transactionId, originatorId, onSuccess, onFailure);
+        }
+    }
+
+    @Override
+    public void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.TransactionEndServiceMsgProto proto;
+        try {
+            proto = ClusterAPIProtos.TransactionEndServiceMsgProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        EntityId originatorId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
+        UUID transactionId = new UUID(proto.getTransactionIdMSB(), proto.getTransactionIdLSB());
+        endLocalTransaction(transactionId, originatorId, msg -> {
+        }, error -> {
+        });
+    }
+
+    private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask transactionTask) {
+        queue.offer(transactionTask);
+        timeoutQueue.offer(transactionTask);
+        log.trace("Added msg to queue, size: [{}]", queue.size());
+    }
+
+    private void endLocalTransaction(UUID transactionId, EntityId originatorId, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure) {
+        transactionLock.lock();
+        try {
+            BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(originatorId, id ->
+                    new LinkedBlockingQueue<>(finalQueueSize));
+
+            TbTransactionTask currentTransactionTask = queue.peek();
+            if (currentTransactionTask != null) {
+                if (currentTransactionTask.getMsg().getTransactionData().getTransactionId().equals(transactionId)) {
+                    currentTransactionTask.setCompleted(true);
+                    queue.poll();
+                    log.trace("Removed msg from queue, size [{}]", queue.size());
+
+                    executeOnSuccess(currentTransactionTask.getOnEnd(), currentTransactionTask.getMsg());
+                    executeOnSuccess(onSuccess, currentTransactionTask.getMsg());
+
+                    TbTransactionTask nextTransactionTask = queue.peek();
+                    if (nextTransactionTask != null) {
+                        executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
+                    }
+                } else {
+                    log.trace("Task has expired!");
+                    executeOnFailure(onFailure, "Task has expired!");
+                }
+            } else {
+                log.trace("Queue is empty, previous task has expired!");
+                executeOnFailure(onFailure, "Queue is empty, previous task has expired!");
+            }
+        } finally {
+            transactionLock.unlock();
+        }
+    }
+
+    private void executeOnTimeout() {
+        timeoutExecutor.submit(() -> {
+            while (true) {
+                TbTransactionTask transactionTask = timeoutQueue.peek();
+                if (transactionTask != null) {
+                    transactionLock.lock();
+                    try {
+                        if (transactionTask.isCompleted()) {
+                            timeoutQueue.poll();
+                        } else {
+                            if (System.currentTimeMillis() > transactionTask.getExpirationTime()) {
+                                log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
+                                timeoutQueue.poll();
+                                executeOnFailure(transactionTask.getOnFailure(), "Task has expired!");
+
+                                BlockingQueue<TbTransactionTask> queue = transactionMap.get(transactionTask.getMsg().getTransactionData().getOriginatorId());
+                                if (queue != null) {
+                                    queue.poll();
+                                    TbTransactionTask nextTransactionTask = queue.peek();
+                                    if (nextTransactionTask != null) {
+                                        executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
+                                    }
+                                }
+                            } else {
+                                try {
+                                    log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
+                                    TimeUnit.MILLISECONDS.sleep(duration);
+                                } catch (InterruptedException e) {
+                                    throw new IllegalStateException("Thread interrupted", e);
+                                }
+                            }
+                        }
+                    } finally {
+                        transactionLock.unlock();
+                    }
+                } else {
+                    try {
+                        log.trace("Queue is empty, waiting for tasks!");
+                        TimeUnit.SECONDS.sleep(1);
+                    } catch (InterruptedException e) {
+                        throw new IllegalStateException("Thread interrupted", e);
+                    }
+                }
+            }
+        });
+    }
+
+    private void executeOnFailure(Consumer<Throwable> onFailure, String exception) {
+        executeCallback(() -> {
+            onFailure.accept(new RuntimeException(exception));
+            return null;
+        });
+    }
+
+    private void executeOnSuccess(Consumer<TbMsg> onSuccess, TbMsg tbMsg) {
+        executeCallback(() -> {
+            onSuccess.accept(tbMsg);
+            return null;
+        });
+    }
+
+    private void executeCallback(Callable<Void> task) {
+        callbackExecutor.executeAsync(task);
+    }
+
+    private void sendTransactionEventToRemoteServer(EntityId entityId, UUID transactionId, ServerAddress address) {
+        log.trace("[{}][{}] Originator is monitored on other server: {}", entityId, transactionId, address);
+        ClusterAPIProtos.TransactionEndServiceMsgProto.Builder builder = ClusterAPIProtos.TransactionEndServiceMsgProto.newBuilder();
+        builder.setEntityType(entityId.getEntityType().name());
+        builder.setOriginatorIdMSB(entityId.getId().getMostSignificantBits());
+        builder.setOriginatorIdLSB(entityId.getId().getLeastSignificantBits());
+        builder.setTransactionIdMSB(transactionId.getMostSignificantBits());
+        builder.setTransactionIdLSB(transactionId.getLeastSignificantBits());
+        clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TRANSACTION_SERVICE_MESSAGE, builder.build().toByteArray());
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java
new file mode 100644
index 0000000..49e29ba
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transaction;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.function.Consumer;
+
+@Data
+@AllArgsConstructor
+public final class TbTransactionTask {
+
+    private final TbMsg msg;
+    private final Consumer<TbMsg> onStart;
+    private final Consumer<TbMsg> onEnd;
+    private final Consumer<Throwable> onFailure;
+    private final long expirationTime;
+
+    private boolean isCompleted;
+
+    public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure, long expirationTime) {
+        this.msg = msg;
+        this.onStart = onStart;
+        this.onEnd = onEnd;
+        this.onFailure = onFailure;
+        this.expirationTime = expirationTime;
+        this.isCompleted = false;
+    }
+}
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 01e9323..4200fee 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -60,6 +60,7 @@ enum MessageType {
   CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
 
   CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13;
+  CLUSTER_TRANSACTION_SERVICE_MESSAGE = 14;
 }
 
 // Messages related to CLUSTER_TELEMETRY_MESSAGE
@@ -142,3 +143,11 @@ message DeviceStateServiceMsgProto {
     bool updated = 6;
     bool deleted = 7;
 }
+
+message TransactionEndServiceMsgProto {
+    string entityType = 1;
+    int64 originatorIdMSB = 2;
+    int64 originatorIdLSB = 3;
+    int64 transactionIdMSB = 4;
+    int64 transactionIdLSB = 5;
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 0f87429..85b8f0a 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -213,6 +213,11 @@ actors:
     node:
       # Errors for particular actor are persisted once per specified amount of milliseconds
       error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
+    transaction:
+      # Size of queues which store messages for transaction rule nodes
+      queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:20}"
+      # Time in milliseconds for transaction to complete
+      duration: "${ACTORS_RULE_TRANSACTION_DURATION:15000}"
   statistics:
     # Enable/disable actor statistics
     enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -310,7 +315,7 @@ spring:
     password: "${SPRING_DATASOURCE_PASSWORD:}"
 
 # PostgreSQL DAO Configuration
-# spring:
+#spring:
 #  data:
 #    sql:
 #      repositories:
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index a6eb64e..325f8ee 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -41,6 +41,7 @@ public final class TbMsg implements Serializable {
     private final TbMsgMetaData metaData;
     private final TbMsgDataType dataType;
     private final String data;
+    private final TbMsgTransactionData transactionData;
 
     //The following fields are not persisted to DB, because they can always be recovered from the context;
     private final RuleChainId ruleChainId;
@@ -55,11 +56,17 @@ public final class TbMsg implements Serializable {
         this.metaData = metaData;
         this.data = data;
         this.dataType = TbMsgDataType.JSON;
+        this.transactionData = new TbMsgTransactionData(id, originator);
         this.ruleChainId = ruleChainId;
         this.ruleNodeId = ruleNodeId;
         this.clusterPartition = clusterPartition;
     }
 
+    public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
+                 RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
+        this(id, type, originator, metaData, dataType, data, new TbMsgTransactionData(id, originator), ruleChainId, ruleNodeId, clusterPartition);
+    }
+
     public static ByteBuffer toBytes(TbMsg msg) {
         MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
         builder.setId(msg.getId().toString());
@@ -82,6 +89,16 @@ public final class TbMsg implements Serializable {
             builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
         }
 
+        TbMsgTransactionData transactionData = msg.getTransactionData();
+        if (transactionData != null) {
+            MsgProtos.TbMsgTransactionDataProto.Builder transactionBuilder = MsgProtos.TbMsgTransactionDataProto.newBuilder();
+            transactionBuilder.setId(transactionData.getTransactionId().toString());
+            transactionBuilder.setEntityType(transactionData.getOriginatorId().getEntityType().name());
+            transactionBuilder.setEntityIdMSB(transactionData.getOriginatorId().getId().getMostSignificantBits());
+            transactionBuilder.setEntityIdLSB(transactionData.getOriginatorId().getId().getLeastSignificantBits());
+            builder.setTransactionData(transactionBuilder.build());
+        }
+
         builder.setDataType(msg.getDataType().ordinal());
         builder.setData(msg.getData());
         byte[] bytes = builder.build().toByteArray();
@@ -92,6 +109,9 @@ public final class TbMsg implements Serializable {
         try {
             MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
             TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
+            EntityId transactionEntityId = EntityIdFactory.getByTypeAndUuid(proto.getTransactionData().getEntityType(),
+                    new UUID(proto.getTransactionData().getEntityIdMSB(), proto.getTransactionData().getEntityIdLSB()));
+            TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.fromString(proto.getTransactionData().getId()), transactionEntityId);
             EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
             RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
             RuleNodeId ruleNodeId = null;
@@ -99,7 +119,7 @@ public final class TbMsg implements Serializable {
                  ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
             }
             TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
-            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getClusterPartition());
+            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), transactionData, ruleChainId, ruleNodeId, proto.getClusterPartition());
         } catch (InvalidProtocolBufferException e) {
             throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
         }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgTransactionData.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgTransactionData.java
new file mode 100644
index 0000000..466f9ce
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgTransactionData.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+@Data
+public final class TbMsgTransactionData implements Serializable {
+
+    private final UUID transactionId;
+    private final EntityId originatorId;
+
+}
diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto
index 60003dc..8bd0e4b 100644
--- a/common/message/src/main/proto/tbmsg.proto
+++ b/common/message/src/main/proto/tbmsg.proto
@@ -23,6 +23,13 @@ message TbMsgMetaDataProto {
     map<string, string> data = 1;
 }
 
+message TbMsgTransactionDataProto {
+    string id = 1;
+    string entityType = 2;
+    int64 entityIdMSB = 3;
+    int64 entityIdLSB = 4;
+}
+
 message TbMsgProto {
     string id = 1;
     string type = 2;
@@ -39,7 +46,9 @@ message TbMsgProto {
 
     TbMsgMetaDataProto metaData = 11;
 
-    int32 dataType = 12;
-    string data = 13;
+    TbMsgTransactionDataProto transactionData = 12;
+
+    int32 dataType = 13;
+    string data = 14;
 
 }
\ No newline at end of file
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
new file mode 100644
index 0000000..920af74
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+
+import java.util.function.Consumer;
+
+public interface RuleChainTransactionService {
+
+    void beginTransaction(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure);
+
+    void endTransaction(TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure);
+
+    void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes);
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 37c4a51..29e7b26 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -103,4 +103,6 @@ public interface TbContext {
     ScriptEngine createJsScriptEngine(String script, String... argNames);
 
     String getNodeId();
+
+    RuleChainTransactionService getRuleChainTransactionService();
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
new file mode 100644
index 0000000..75175ef
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transaction;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgDataType;
+import org.thingsboard.server.common.msg.TbMsgTransactionData;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
+
+@Slf4j
+@RuleNode(
+        type = ComponentType.ACTION,
+        name = "transaction start",
+        configClazz = EmptyNodeConfiguration.class,
+        nodeDescription = "",
+        nodeDetails = "",
+        uiResources = {"static/rulenode/rulenode-core-config.js"},
+        configDirective = "tbNodeEmptyConfig")
+public class TbTransactionBeginNode implements TbNode {
+
+    private EmptyNodeConfiguration config;
+
+    @Override
+    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
+        log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType());
+
+        TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator());
+        TbMsg tbMsg = new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), TbMsgDataType.JSON,
+                msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
+
+        ctx.getRuleChainTransactionService().beginTransaction(tbMsg, startMsg -> {
+                    log.trace("Transaction starting...[{}][{}]", startMsg.getId(), startMsg.getType());
+                    ctx.tellNext(startMsg, SUCCESS);
+                }, endMsg -> log.trace("Transaction ended successfully...[{}][{}]", endMsg.getId(), endMsg.getType()),
+                throwable -> {
+                    log.error("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable);
+                    ctx.tellFailure(tbMsg, throwable);
+                });
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
new file mode 100644
index 0000000..a51d97e
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transaction;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
+
+@Slf4j
+@RuleNode(
+        type = ComponentType.ACTION,
+        name = "transaction end",
+        configClazz = EmptyNodeConfiguration.class,
+        nodeDescription = "",
+        nodeDetails = "",
+        uiResources = {"static/rulenode/rulenode-core-config.js"},
+        configDirective = ("tbNodeEmptyConfig")
+)
+public class TbTransactionEndNode implements TbNode {
+
+    private EmptyNodeConfiguration config;
+
+    @Override
+    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
+        ctx.getRuleChainTransactionService().endTransaction(msg,
+                successMsg -> ctx.tellNext(successMsg, SUCCESS),
+                throwable -> ctx.tellFailure(msg, throwable));
+        log.trace("Msg left transaction - [{}][{}]", msg.getId(), msg.getType());
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}