thingsboard-memoizeit

Changes

Details

diff --git a/application/pom.xml b/application/pom.xml
index 8449246..95322fa 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -54,6 +54,10 @@
             <artifactId>extensions-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.thingsboard.rule-engine</groupId>
+            <artifactId>rule-engine-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.thingsboard</groupId>
             <artifactId>extensions-core</artifactId>
         </dependency>
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index f59ec63..68d5547 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -136,6 +136,12 @@ public class ActorSystemContext {
     @Value("${actors.plugin.error_persist_frequency}")
     @Getter private long pluginErrorPersistFrequency;
 
+    @Value("${actors.rule.chain.error_persist_frequency}")
+    @Getter private long ruleChainErrorPersistFrequency;
+
+    @Value("${actors.rule.node.error_persist_frequency}")
+    @Getter private long ruleNodeErrorPersistFrequency;
+
     @Value("${actors.rule.termination.delay}")
     @Getter private long ruleActorTerminationDelay;
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 549d71d..5e05a60 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -33,9 +33,11 @@ import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.page.PageDataIterable;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.dao.model.ModelConstants;
 import org.thingsboard.server.dao.tenant.TenantService;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
@@ -86,27 +88,48 @@ public class AppActor extends RuleChainManagerActor {
     }
 
     @Override
-    public void onReceive(Object msg) throws Exception {
-        logger.debug("Received message: {}", msg);
-        if (msg instanceof ToDeviceActorMsg) {
-            processDeviceMsg((ToDeviceActorMsg) msg);
-        } else if (msg instanceof ToPluginActorMsg) {
-            onToPluginMsg((ToPluginActorMsg) msg);
-        } else if (msg instanceof ToDeviceActorNotificationMsg) {
-            onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
-        } else if (msg instanceof Terminated) {
-            processTermination((Terminated) msg);
-        } else if (msg instanceof ClusterEventMsg) {
-            broadcast(msg);
-        } else if (msg instanceof ComponentLifecycleMsg) {
-            onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
-        } else if (msg instanceof PluginTerminationMsg) {
-            onPluginTerminated((PluginTerminationMsg) msg);
+    protected void process(TbActorMsg msg) {
+        switch (msg.getMsgType()) {
+            case COMPONENT_LIFE_CYCLE_MSG:
+                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+                break;
+            case SERVICE_TO_RULE_ENGINE_MSG:
+                onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+                break;
+        }
+    }
+
+    private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
+        if (SYSTEM_TENANT.equals(msg.getTenantId())) {
+            //TODO: ashvayka handle this.
         } else {
-            logger.warning("Unknown message: {}!", msg);
+            getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
         }
     }
 
+
+//    @Override
+//    public void onReceive(Object msg) throws Exception {
+//        logger.debug("Received message: {}", msg);
+//        if (msg instanceof ToDeviceActorMsg) {
+//            processDeviceMsg((ToDeviceActorMsg) msg);
+//        } else if (msg instanceof ToPluginActorMsg) {
+//            onToPluginMsg((ToPluginActorMsg) msg);
+//        } else if (msg instanceof ToDeviceActorNotificationMsg) {
+//            onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
+//        } else if (msg instanceof Terminated) {
+//            processTermination((Terminated) msg);
+//        } else if (msg instanceof ClusterEventMsg) {
+//            broadcast(msg);
+//        } else if (msg instanceof ComponentLifecycleMsg) {
+//            onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+//        } else if (msg instanceof PluginTerminationMsg) {
+//            onPluginTerminated((PluginTerminationMsg) msg);
+//        } else {
+//            logger.warning("Unknown message: {}!", msg);
+//        }
+//    }
+
     private void onPluginTerminated(PluginTerminationMsg msg) {
         pluginManager.remove(msg.getId());
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
index 6e78e20..09aaf80 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
@@ -57,7 +57,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
     }
 
     @Override
-    public void start() throws Exception {
+    public void start(ActorContext context) throws Exception {
         logger.info("[{}] Going to start plugin actor.", entityId);
         pluginMd = systemContext.getPluginService().findPluginById(entityId);
         if (pluginMd == null) {
@@ -76,7 +76,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
     }
 
     @Override
-    public void stop() throws Exception {
+    public void stop(ActorContext context) throws Exception {
         onStop();
     }
 
@@ -191,7 +191,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
             if (pluginImpl != null) {
                 pluginImpl.stop(trustedCtx);
             }
-            start();
+            start(context);
         }
     }
 
@@ -217,7 +217,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
             pluginImpl.resume(trustedCtx);
             logger.info("[{}] Plugin resumed.", entityId);
         } else {
-            start();
+            start(context);
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
index 2ebebfc..236fca7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
@@ -68,7 +68,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
     }
 
     @Override
-    public void start() throws Exception {
+    public void start(ActorContext context) throws Exception {
         logger.info("[{}][{}] Starting rule actor.", entityId, tenantId);
         ruleMd = systemContext.getRuleService().findRuleById(entityId);
         if (ruleMd == null) {
@@ -86,7 +86,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
     }
 
     @Override
-    public void stop() throws Exception {
+    public void stop(ActorContext context) throws Exception {
         onStop();
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
new file mode 100644
index 0000000..0cd7601
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -0,0 +1,61 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+class DefaultTbContext implements TbContext {
+
+    private final ActorSystemContext mainCtx;
+    private final RuleNodeCtx nodeCtx;
+
+    public DefaultTbContext(ActorSystemContext mainCtx, RuleNodeCtx nodeCtx) {
+        this.mainCtx = mainCtx;
+        this.nodeCtx = nodeCtx;
+    }
+
+    @Override
+    public void tellNext(TbMsg msg) {
+        tellNext(msg, null);
+    }
+
+    @Override
+    public void tellNext(TbMsg msg, String relationType) {
+        nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelfId(), relationType, msg), nodeCtx.getSelf());
+    }
+
+    @Override
+    public void tellSelf(TbMsg msg, long delayMs) {
+
+    }
+
+    @Override
+    public void tellOthers(TbMsg msg) {
+
+    }
+
+    @Override
+    public void tellSibling(TbMsg msg, ServerAddress address) {
+
+    }
+
+    @Override
+    public void spawn(TbMsg msg) {
+
+    }
+
+    @Override
+    public void ack(TbMsg msg) {
+
+    }
+
+    @Override
+    public AttributesService getAttributesService() {
+        return mainCtx.getAttributesService();
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 83536f8..20ea05d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -20,6 +20,9 @@ import org.thingsboard.server.actors.service.ComponentActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
 import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 
 public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> {
 
@@ -30,8 +33,18 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
     }
 
     @Override
-    public void onReceive(Object msg) throws Exception {
-        logger.debug("[{}][{}] Unknown msg type.", tenantId, id, msg.getClass().getName());
+    protected void process(TbActorMsg msg) {
+        switch (msg.getMsgType()) {
+            case COMPONENT_LIFE_CYCLE_MSG:
+                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+                break;
+            case SERVICE_TO_RULE_ENGINE_MSG:
+                processor.onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+                break;
+            case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
+                processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
+                break;
+        }
     }
 
     public static class ActorCreator extends ContextBasedCreator<RuleChainActor> {
@@ -54,6 +67,6 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
 
     @Override
     protected long getErrorPersistFrequency() {
-        return systemContext.getPluginErrorPersistFrequency();
+        return systemContext.getRuleChainErrorPersistFrequency();
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 3075367..5c946bd 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,77 +17,127 @@ package org.thingsboard.server.actors.ruleChain;
 
 import akka.actor.ActorContext;
 import akka.actor.ActorRef;
+import akka.actor.Props;
 import akka.event.LoggingAdapter;
-import com.fasterxml.jackson.core.JsonProcessingException;
 import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.plugin.*;
+import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
-import org.thingsboard.server.common.data.id.PluginId;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
-import org.thingsboard.server.common.data.plugin.ComponentType;
-import org.thingsboard.server.common.data.plugin.PluginMetaData;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.extensions.api.plugins.Plugin;
-import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
-import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
-import org.thingsboard.server.extensions.api.rules.RuleException;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @author Andrew Shvayka
  */
 public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
 
-    private ComponentLifecycleState state;
+    private final ActorRef parent;
+    private final ActorRef self;
+    private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
+    private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
+    private final RuleChainService service;
+
+    private RuleNodeId firstId;
+    private RuleNodeCtx firstNode;
 
-    protected RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId pluginId, ActorSystemContext systemContext
+    RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
             , LoggingAdapter logger, ActorRef parent, ActorRef self) {
-        super(systemContext, logger, tenantId, pluginId);
+        super(systemContext, logger, tenantId, ruleChainId);
+        this.parent = parent;
+        this.self = self;
+        this.nodeActors = new HashMap<>();
+        this.nodeRoutes = new HashMap<>();
+        this.service = systemContext.getRuleChainService();
     }
 
     @Override
-    public void start() throws Exception {
-
+    public void start(ActorContext context) throws Exception {
+        RuleChain ruleChain = service.findRuleChainById(entityId);
+        List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
+        // Creating and starting the actors;
+        for (RuleNode ruleNode : ruleNodeList) {
+            String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ?
+                    DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
+            ActorRef ruleNodeActor = context.actorOf(
+                    Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
+                            .withDispatcher(dispatcherName), ruleNode.toString());
+            nodeActors.put(ruleNode.getId(), new RuleNodeCtx(self, ruleNodeActor, ruleNode.getId()));
+        }
+        // Populating the routes map;
+        for (RuleNode ruleNode : ruleNodeList) {
+            List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
+            for (EntityRelation relation : relations) {
+                if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
+                    RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
+                    if (ruleNodeCtx == null) {
+                        throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
+                    }
+                }
+                nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
+                        .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
+            }
+        }
+
+        firstId = ruleChain.getFirstRuleNodeId();
+        firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId());
     }
 
     @Override
-    public void stop() throws Exception {
-
+    public void stop(ActorContext context) throws Exception {
+        nodeActors.values().stream().map(RuleNodeCtx::getSelf).forEach(context::stop);
+        nodeActors.clear();
+        nodeRoutes.clear();
     }
 
     @Override
-    public void onCreated(ActorContext context) throws Exception {
+    public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
 
     }
 
-    @Override
-    public void onUpdate(ActorContext context) throws Exception {
-
+    void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
+        TbMsg tbMsg = envelope.getTbMsg();
+        //TODO: push to queue and act on ack in async way
+        pushMstToNode(firstNode, tbMsg);
     }
 
-    @Override
-    public void onActivate(ActorContext context) throws Exception {
-
+    void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
+        RuleNodeId originator = envelope.getOriginator();
+        String targetRelationType = envelope.getRelationType();
+        //TODO: log debug output
+        List<RuleNodeRelation> relations = nodeRoutes.get(originator);
+        for (RuleNodeRelation relation : relations) {
+            if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {
+                switch (relation.getOut().getEntityType()) {
+                    case RULE_NODE:
+                        RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
+                        RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
+                        pushMstToNode(targetRuleNode, envelope.getMsg());
+                        break;
+                    case RULE_CHAIN:
+//                        TODO: implement
+                        break;
+                }
+            }
+        }
     }
 
-    @Override
-    public void onSuspend(ActorContext context) throws Exception {
-
-    }
-
-    @Override
-    public void onStop(ActorContext context) throws Exception {
-
+    private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
+        //TODO: log debug input
+        firstNode.getSelf().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
     }
 
-    @Override
-    public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
-
-    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
index 2dca8dd..cb76f04 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
@@ -8,6 +8,7 @@ import org.thingsboard.server.actors.shared.rulechain.RuleChainManager;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.dao.rule.RuleChainService;
 
 /**
  * Created by ashvayka on 15.03.18.
@@ -16,11 +17,13 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
 
     protected final RuleChainManager ruleChainManager;
     protected final PluginManager pluginManager;
+    protected final RuleChainService ruleChainService;
 
     public RuleChainManagerActor(ActorSystemContext systemContext, RuleChainManager ruleChainManager, PluginManager pluginManager) {
         super(systemContext);
         this.ruleChainManager = ruleChainManager;
         this.pluginManager = pluginManager;
+        this.ruleChainService = systemContext.getRuleChainService();
     }
 
     protected void initRuleChains() {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
new file mode 100644
index 0000000..d6e8262
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
@@ -0,0 +1,23 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleChainToRuleNodeMsg implements TbActorMsg {
+
+    private final TbContext ctx;
+    private final TbMsg msg;
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.RULE_CHAIN_TO_RULE_MSG;
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
new file mode 100644
index 0000000..cba19d1
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ComponentActor;
+import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
+
+    private final RuleChainId ruleChainId;
+
+    private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
+        super(systemContext, tenantId, ruleNodeId);
+        this.ruleChainId = ruleChainId;
+        setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext,
+                logger, context().parent(), context().self()));
+    }
+
+    @Override
+    protected void process(TbActorMsg msg) {
+        switch (msg.getMsgType()) {
+            case RULE_CHAIN_TO_RULE_MSG:
+                processor.onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
+                break;
+        }
+    }
+
+    public static class ActorCreator extends ContextBasedCreator<RuleNodeActor> {
+        private static final long serialVersionUID = 1L;
+
+        private final TenantId tenantId;
+        private final RuleChainId ruleChainId;
+        private final RuleNodeId ruleNodeId;
+
+        public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
+            super(context);
+            this.tenantId = tenantId;
+            this.ruleChainId = ruleChainId;
+            this.ruleNodeId = ruleNodeId;
+
+        }
+
+        @Override
+        public RuleNodeActor create() throws Exception {
+            return new RuleNodeActor(context, tenantId, ruleChainId, ruleNodeId);
+        }
+    }
+
+    @Override
+    protected long getErrorPersistFrequency() {
+        return systemContext.getRuleNodeErrorPersistFrequency();
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
new file mode 100644
index 0000000..99e48d8
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.event.LoggingAdapter;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.DefaultActorService;
+import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Andrew Shvayka
+ */
+public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> {
+
+    private final ActorRef parent;
+    private final ActorRef self;
+    private final RuleChainService service;
+
+    RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
+            , LoggingAdapter logger, ActorRef parent, ActorRef self) {
+        super(systemContext, logger, tenantId, ruleNodeId);
+        this.parent = parent;
+        this.self = self;
+        this.service = systemContext.getRuleChainService();
+    }
+
+    @Override
+    public void start(ActorContext context) throws Exception {
+
+    }
+
+    @Override
+    public void stop(ActorContext context) throws Exception {
+    }
+
+    @Override
+    public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+
+    }
+
+    void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
+
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
new file mode 100644
index 0000000..dd25f8b
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
@@ -0,0 +1,15 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorRef;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleNodeCtx {
+    private final ActorRef chainActor;
+    private final ActorRef self;
+    private final RuleNodeId selfId;
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
new file mode 100644
index 0000000..bd2d544
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
@@ -0,0 +1,17 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+
+@Data
+final class RuleNodeRelation {
+
+    private final EntityId in;
+    private final EntityId out;
+    private final String type;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
new file mode 100644
index 0000000..95b3625
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -0,0 +1,24 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
+
+    private final RuleNodeId originator;
+    private final String relationType;
+    private final TbMsg msg;
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG;
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index 0f66d25..0be0385 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -17,6 +17,8 @@ package org.thingsboard.server.actors.service;
 
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
 import org.thingsboard.server.service.cluster.rpc.RpcMsgListener;
@@ -25,6 +27,8 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
 
     void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
 
+    void onMsg(ServiceToRuleEngineMsg msg);
+
     void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
 
     void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index 76b9be9..d0260dd 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -54,7 +54,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
     @Override
     public void preStart() {
         try {
-            processor.start();
+            processor.start(context());
             logLifecycleEvent(ComponentLifecycleEvent.STARTED);
             if (systemContext.isStatisticsEnabled()) {
                 scheduleStatsPersistTick();
@@ -78,7 +78,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
     @Override
     public void postStop() {
         try {
-            processor.stop();
+            processor.stop(context());
             logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
         } catch (Exception e) {
             logger.warning("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 825c971..9cb0fc4 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -16,9 +16,13 @@
 package org.thingsboard.server.actors.service;
 
 import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
 import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.msg.TbActorMsg;
 
 public abstract class ContextAwareActor extends UntypedActor {
+    protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
 
     public static final int ENTITY_PACK_LIMIT = 1024;
 
@@ -28,4 +32,19 @@ public abstract class ContextAwareActor extends UntypedActor {
         super();
         this.systemContext = systemContext;
     }
+
+    @Override
+    public void onReceive(Object msg) throws Exception {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Processing msg: {}", msg);
+        }
+        if (msg instanceof TbActorMsg) {
+            process((TbActorMsg) msg);
+        }
+        else {
+            logger.warning("Unknown message: {}!", msg);
+        }
+    }
+
+    protected abstract void process(TbActorMsg msg);
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 7b62687..166f37b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -37,6 +37,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
@@ -126,6 +127,11 @@ public class DefaultActorService implements ActorService {
     }
 
     @Override
+    public void onMsg(ServiceToRuleEngineMsg msg) {
+        appActor.tell(msg, ActorRef.noSender());
+    }
+
+    @Override
     public void process(SessionAwareMsg msg) {
         log.debug("Processing session aware msg: {}", msg);
         sessionManagerActor.tell(msg, ActorRef.noSender());
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
index 73b221f..e1313d2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -102,9 +102,6 @@ public abstract class AbstractContextAwareMsgProcessor {
             case FILTER:
                 configurationClazz = ((Filter) componentClazz.getAnnotation(Filter.class)).configuration();
                 break;
-            case PROCESSOR:
-                configurationClazz = ((Processor) componentClazz.getAnnotation(Processor.class)).configuration();
-                break;
             case ACTION:
                 configurationClazz = ((Action) componentClazz.getAnnotation(Action.class)).configuration();
                 break;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 18d32d9..fd72bb1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -33,21 +33,36 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr
         this.entityId = id;
     }
 
-    public abstract void start() throws Exception;
+    public abstract void start(ActorContext context) throws Exception;
 
-    public abstract void stop() throws Exception;
+    public abstract void stop(ActorContext context) throws Exception;
 
-    public abstract void onCreated(ActorContext context) throws Exception;
+    public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
 
-    public abstract void onUpdate(ActorContext context) throws Exception;
+    public void onCreated(ActorContext context) throws Exception {
+        start(context);
+    }
 
-    public abstract void onActivate(ActorContext context) throws Exception;
+    public void onUpdate(ActorContext context) throws Exception {
+        restart(context);
+    }
 
-    public abstract void onSuspend(ActorContext context) throws Exception;
+    public void onActivate(ActorContext context) throws Exception {
+        restart(context);
+    }
 
-    public abstract void onStop(ActorContext context) throws Exception;
+    public void onSuspend(ActorContext context) throws Exception {
+        stop(context);
+    }
 
-    public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
+    public void onStop(ActorContext context) throws Exception {
+        stop(context);
+    }
+
+    private void restart(ActorContext context) throws Exception {
+        stop(context);
+        start(context);
+    }
 
     public void scheduleStatsPersistTick(ActorContext context, long statsPersistFrequency) {
         schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
index a98609c..917a645 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
@@ -43,13 +43,16 @@ public abstract class EntityActorsManager<T extends EntityId, A extends UntypedA
 
     public void init(ActorContext context) {
         for (M entity : new PageDataIterable<>(getFetchEntitiesFunction(), ContextAwareActor.ENTITY_PACK_LIMIT)) {
-            log.debug("[{}] Creating plugin actor", entity.getId());
+            T entityId = (T) entity.getId();
+            log.debug("[{}|{}] Creating entity actor", entityId.getEntityType(), entityId.getId());
             //TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa.
-            getOrCreateActor(context, (T) entity.getId());
-            log.debug("[{}] Plugin actor created.", entity.getId());
+            ActorRef actorRef = getOrCreateActor(context, entityId);
+            visit(entity, actorRef);
+            log.debug("[{}|{}] Entity actor created.", entityId.getEntityType(), entityId.getId());
         }
     }
 
+    protected void visit(M entity, ActorRef actorRef) {}
 
     public ActorRef getOrCreateActor(ActorContext context, T entityId) {
         return actors.computeIfAbsent(entityId, eId ->
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
index 0939611..97acb6c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
@@ -1,6 +1,8 @@
 package org.thingsboard.server.actors.shared.rulechain;
 
+import akka.actor.ActorRef;
 import akka.japi.Creator;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.ruleChain.RuleChainActor;
@@ -16,6 +18,10 @@ import org.thingsboard.server.dao.rule.RuleChainService;
 public abstract class RuleChainManager extends EntityActorsManager<RuleChainId, RuleChainActor, RuleChain> {
 
     protected final RuleChainService service;
+    @Getter
+    protected RuleChain rootChain;
+    @Getter
+    protected ActorRef rootChainActor;
 
     public RuleChainManager(ActorSystemContext systemContext) {
         super(systemContext);
@@ -27,4 +33,12 @@ public abstract class RuleChainManager extends EntityActorsManager<RuleChainId, 
         return new RuleChainActor.ActorCreator(systemContext, getTenantId(), entityId);
     }
 
+    @Override
+    protected void visit(RuleChain entity, ActorRef actorRef) {
+        if (entity.isRoot()) {
+            rootChain = entity;
+            rootChainActor = actorRef;
+        }
+    }
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 8621179..a51e7a2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -27,10 +27,12 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
 import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.actors.shared.plugin.TenantPluginManager;
 import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
-import org.thingsboard.server.common.data.id.*;
-import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
 
@@ -39,8 +41,6 @@ import java.util.Map;
 
 public class TenantActor extends RuleChainManagerActor {
 
-    private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
-
     private final TenantId tenantId;
     private final Map<DeviceId, ActorRef> deviceActors;
 
@@ -62,25 +62,42 @@ public class TenantActor extends RuleChainManagerActor {
     }
 
     @Override
-    public void onReceive(Object msg) throws Exception {
-        logger.debug("[{}] Received message: {}", tenantId, msg);
-        if (msg instanceof ToDeviceActorMsg) {
-            onToDeviceActorMsg((ToDeviceActorMsg) msg);
-        } else if (msg instanceof ToPluginActorMsg) {
-            onToPluginMsg((ToPluginActorMsg) msg);
-        } else if (msg instanceof ToDeviceActorNotificationMsg) {
-            onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
-        } else if (msg instanceof ClusterEventMsg) {
-            broadcast(msg);
-        } else if (msg instanceof ComponentLifecycleMsg) {
-            onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
-        } else if (msg instanceof PluginTerminationMsg) {
-            onPluginTerminated((PluginTerminationMsg) msg);
-        } else {
-            logger.warning("[{}] Unknown message: {}!", tenantId, msg);
+    protected void process(TbActorMsg msg) {
+        switch (msg.getMsgType()) {
+            case COMPONENT_LIFE_CYCLE_MSG:
+                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+                break;
+            case SERVICE_TO_RULE_ENGINE_MSG:
+                onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+                break;
         }
     }
 
+    private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
+        ruleChainManager.getRootChainActor().tell(msg, self());
+    }
+
+
+//    @Override
+//    public void onReceive(Object msg) throws Exception {
+//        logger.debug("[{}] Received message: {}", tenantId, msg);
+//        if (msg instanceof ToDeviceActorMsg) {
+//            onToDeviceActorMsg((ToDeviceActorMsg) msg);
+//        } else if (msg instanceof ToPluginActorMsg) {
+//            onToPluginMsg((ToPluginActorMsg) msg);
+//        } else if (msg instanceof ToDeviceActorNotificationMsg) {
+//            onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
+//        } else if (msg instanceof ClusterEventMsg) {
+//            broadcast(msg);
+//        } else if (msg instanceof ComponentLifecycleMsg) {
+//            onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+//        } else if (msg instanceof PluginTerminationMsg) {
+//            onPluginTerminated((PluginTerminationMsg) msg);
+//        } else {
+//            logger.warning("[{}] Unknown message: {}!", tenantId, msg);
+//        }
+//    }
+
     private void broadcast(Object msg) {
         pluginManager.broadcast(msg);
         deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 1c842c8..778c2a3 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -215,6 +215,12 @@ actors:
     termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
     # Errors for particular actor are persisted once per specified amount of milliseconds
     error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
+      chain:
+        # Errors for particular actor are persisted once per specified amount of milliseconds
+        error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
+      node:
+        # Errors for particular actor are persisted once per specified amount of milliseconds
+        error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
   statistics:
     # Enable/disable actor statistics
     enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index a776d7b..5624ed8 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -37,6 +37,8 @@ public class DataConstants {
     public static final String ERROR = "ERROR";
     public static final String LC_EVENT = "LC_EVENT";
     public static final String STATS = "STATS";
+    public static final String RULE_CHAIN_DEBUG = "DEBUG_RULE_CHAIN";
+    public static final String RULE_NODE_DEBUG = "DEBUG_RULE_NODE";
 
     public static final String ONEWAY = "ONEWAY";
     public static final String TWOWAY = "TWOWAY";
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
index 45fb590..ab6acca 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
@@ -20,6 +20,6 @@ package org.thingsboard.server.common.data.plugin;
  */
 public enum ComponentType {
 
-    FILTER, PROCESSOR, ACTION, PLUGIN
+    ENRICHMENT, FILTER, PROCESSOR, TRANSFORMATION, ACTION, PLUGIN
 
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 55c131d..22fc428 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -4,4 +4,34 @@ package org.thingsboard.server.common.msg;
  * Created by ashvayka on 15.03.18.
  */
 public enum MsgType {
+
+    /**
+     * ADDED/UPDATED/DELETED events for main entities.
+     *
+     * @See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
+     */
+    COMPONENT_LIFE_CYCLE_MSG,
+
+    /**
+     * Misc messages from the REST API/SERVICE layer to the new rule engine.
+     *
+     * @See {@link org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg}
+     */
+    SERVICE_TO_RULE_ENGINE_MSG,
+
+
+    SESSION_TO_DEVICE_ACTOR_MSG,
+    DEVICE_ACTOR_TO_SESSION_MSG,
+
+
+    /**
+     * Message that is sent by RuleChainActor to RuleActor with command to process TbMsg.
+     */
+    RULE_CHAIN_TO_RULE_MSG,
+
+    /**
+     * Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain.
+     */
+    RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
+
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index b77722e..315eb86 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 
@@ -30,7 +32,7 @@ import java.util.Optional;
  * @author Andrew Shvayka
  */
 @ToString
-public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
+public class ComponentLifecycleMsg implements TbActorMsg, TenantAwareMsg, ToAllNodesMsg {
     @Getter
     private final TenantId tenantId;
     @Getter
@@ -56,4 +58,8 @@ public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
         return entityId.getEntityType() == EntityType.RULE_CHAIN ? Optional.of((RuleChainId) entityId) : Optional.empty();
     }
 
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.COMPONENT_LIFE_CYCLE_MSG;
+    }
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
new file mode 100644
index 0000000..3d82033
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
@@ -0,0 +1,22 @@
+package org.thingsboard.server.common.msg.system;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+@Data
+public final class ServiceToRuleEngineMsg implements TbActorMsg {
+
+    private final TenantId tenantId;
+    private final TbMsg tbMsg;
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SERVICE_TO_RULE_ENGINE_MSG;
+    }
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 5163b6c..c689b24 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -36,7 +36,7 @@ public final class TbMsg implements Serializable {
     private final String type;
     private final EntityId originator;
     private final TbMsgMetaData metaData;
-
+    private final TbMsgDataType dataType;
     private final byte[] data;
 
     public static ByteBuffer toBytes(TbMsg msg) {
@@ -49,11 +49,10 @@ public final class TbMsg implements Serializable {
         }
 
         if (msg.getMetaData() != null) {
-            MsgProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
-            metadataBuilder.putAllData(msg.getMetaData().getData());
-            builder.addMetaData(metadataBuilder.build());
+            builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
         }
 
+        builder.setDataType(msg.getDataType().ordinal());
         builder.setData(ByteString.copyFrom(msg.getData()));
         byte[] bytes = builder.build().toByteArray();
         return ByteBuffer.wrap(bytes);
@@ -63,16 +62,11 @@ public final class TbMsg implements Serializable {
         try {
             MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
             TbMsgMetaData metaData = new TbMsgMetaData();
-            if (proto.getMetaDataCount() > 0) {
-                metaData.setData(proto.getMetaData(0).getDataMap());
-            }
-
-            EntityId entityId = null;
-            if (proto.getEntityId() != null) {
-                entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
-            }
+            metaData.setData(proto.getMetaData().getDataMap());
 
-            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
+            EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
+            TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
+            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData().toByteArray());
         } catch (InvalidProtocolBufferException e) {
             throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
         }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
new file mode 100644
index 0000000..b6e2d5a
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
@@ -0,0 +1,11 @@
+package org.thingsboard.server.common.msg;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+public enum TbMsgDataType {
+
+    // Do not change ordering. We use ordinal to save some bytes on serialization
+    JSON, TEXT, BINARY;
+
+}
diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto
index 90fa2bd..62acff2 100644
--- a/common/message/src/main/proto/tbmsg.proto
+++ b/common/message/src/main/proto/tbmsg.proto
@@ -19,6 +19,9 @@ package msgqueue;
 option java_package = "org.thingsboard.server.common.msg.gen";
 option java_outer_classname = "MsgProtos";
 
+message TbMsgMetaDataProto {
+    map<string, string> data = 1;
+}
 
 message TbMsgProto {
     string id = 1;
@@ -26,11 +29,8 @@ message TbMsgProto {
     string entityType = 3;
     string entityId = 4;
 
-    message TbMsgMetaDataProto {
-        map<string, string> data = 1;
-    }
+    TbMsgMetaDataProto metaData = 5;
 
-    repeated TbMsgMetaDataProto metaData = 5;
-
-    bytes data = 6;
+    int32 dataType = 6;
+    bytes data = 7;
 }
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
index f1df09e..fff3f6d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
@@ -16,7 +16,6 @@
 package org.thingsboard.server.dao.rule;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -67,67 +66,7 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
 
     @Override
     public RuleMetaData saveRule(RuleMetaData rule) {
-        ruleValidator.validate(rule);
-        if (rule.getTenantId() == null) {
-            log.trace("Save system rule metadata with predefined id {}", systemTenantId);
-            rule.setTenantId(systemTenantId);
-        }
-        if (rule.getId() != null) {
-            RuleMetaData oldVersion = ruleDao.findById(rule.getId());
-            if (rule.getState() == null) {
-                rule.setState(oldVersion.getState());
-            } else if (rule.getState() != oldVersion.getState()) {
-                throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
-            }
-        } else {
-            if (rule.getState() == null) {
-                rule.setState(ComponentLifecycleState.SUSPENDED);
-            } else if (rule.getState() != ComponentLifecycleState.SUSPENDED) {
-                throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
-            }
-        }
-
-        validateFilters(rule.getFilters());
-        if (rule.getProcessor() != null && !rule.getProcessor().isNull()) {
-            validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR);
-        }
-        if (rule.getAction() != null && !rule.getAction().isNull()) {
-            validateComponentJson(rule.getAction(), ComponentType.ACTION);
-        }
-        validateRuleAndPluginState(rule);
-        return ruleDao.save(rule);
-    }
-
-    private void validateFilters(JsonNode filtersJson) {
-        if (filtersJson == null || filtersJson.isNull()) {
-            throw new IncorrectParameterException("Rule filters are required!");
-        }
-        if (!filtersJson.isArray()) {
-            throw new IncorrectParameterException("Filters json is not an array!");
-        }
-        ArrayNode filtersArray = (ArrayNode) filtersJson;
-        for (int i = 0; i < filtersArray.size(); i++) {
-            validateComponentJson(filtersArray.get(i), ComponentType.FILTER);
-        }
-    }
-
-    private void validateComponentJson(JsonNode json, ComponentType type) {
-        if (json == null || json.isNull()) {
-            throw new IncorrectParameterException(type.name() + " is required!");
-        }
-        String clazz = getIfValid(type.name(), json, "clazz", JsonNode::isTextual, JsonNode::asText);
-        String name = getIfValid(type.name(), json, "name", JsonNode::isTextual, JsonNode::asText);
-        JsonNode configuration = getIfValid(type.name(), json, "configuration", JsonNode::isObject, node -> node);
-        ComponentDescriptor descriptor = componentDescriptorService.findByClazz(clazz);
-        if (descriptor == null) {
-            throw new IncorrectParameterException(type.name() + " clazz " + clazz + " is not a valid component!");
-        }
-        if (descriptor.getType() != type) {
-            throw new IncorrectParameterException("Clazz " + clazz + " is not a valid " + type.name() + " component!");
-        }
-        if (!componentDescriptorService.validate(descriptor, configuration)) {
-            throw new IncorrectParameterException(type.name() + " configuration is not valid!");
-        }
+        throw new RuntimeException("Not supported since v1.5!");
     }
 
     private void validateRuleAndPluginState(RuleMetaData rule) {
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 07cd72c..a1c85e2 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -38,7 +38,7 @@ public interface TbContext {
 
     void spawn(TbMsg msg);
 
-    void ack(UUID msg);
+    void ack(TbMsg msg);
 
     AttributesService getAttributesService();
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
index 9edd14c..d1a4adc 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
@@ -44,6 +44,7 @@ public class TbTransformNode implements TbNode {
         try {
             //TODO: refactor this to work async and fetch attributes from cache.
             AttributesService service = ctx.getAttributesService();
+
             fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
             fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
             fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");