thingsboard-aplcache

Merge branch 'master' of https://github.com/thingsboard/thingsboard

7/14/2018 1:20:14 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index d453e59..8e7a9ae 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -111,6 +111,7 @@ public class AppActor extends RuleChainManagerActor {
             case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
             case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
             case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
+            case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
                 onToDeviceActorMsg((TenantAwareMsg) msg);
                 break;
             case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 3f3f70b..9e38c17 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -29,11 +29,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import java.util.*;
 
 /**
  * @author Andrew Shvayka
@@ -88,7 +84,17 @@ public class RpcManagerActor extends ContextAwareActor {
 
     private void onMsg(RpcBroadcastMsg msg) {
         log.debug("Forwarding msg to session actors {}", msg);
-        sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
+        sessionActors.keySet().forEach(address -> {
+            ClusterAPIProtos.ClusterMessage msgWithServerAddress = msg.getMsg()
+                    .toBuilder()
+                    .setServerAddress(ClusterAPIProtos.ServerAddress
+                            .newBuilder()
+                            .setHost(address.getHost())
+                            .setPort(address.getPort())
+                            .build())
+                    .build();
+            onMsg(msgWithServerAddress);
+        });
         pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RemoteToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RemoteToRuleChainTellNextMsg.java
new file mode 100644
index 0000000..5c1769a
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RemoteToRuleChainTellNextMsg.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
+import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RemoteToRuleChainTellNextMsg extends RuleNodeToRuleChainTellNextMsg implements TenantAwareMsg, RuleChainAwareMsg {
+
+    private final TenantId tenantId;
+    private final RuleChainId ruleChainId;
+
+    public RemoteToRuleChainTellNextMsg(RuleNodeToRuleChainTellNextMsg original, TenantId tenantId, RuleChainId ruleChainId) {
+        super(original.getOriginator(), original.getRelationTypes(), original.getMsg());
+        this.tenantId = tenantId;
+        this.ruleChainId = ruleChainId;
+    }
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG;
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 3ba646a..c1a55fb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -49,6 +49,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
                 processor.onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg);
                 break;
             case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
+            case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
                 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
                 break;
             case RULE_CHAIN_TO_RULE_CHAIN_MSG:
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 7d560db..fe02335 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -20,6 +20,9 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.event.LoggingAdapter;
 import com.datastax.driver.core.utils.UUIDs;
+
+import java.util.Optional;
+
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
@@ -37,6 +40,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
 import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.dao.rule.RuleChainService;
@@ -217,16 +221,36 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
         checkActive();
-        RuleNodeId originator = envelope.getOriginator();
-        List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
-                .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
-                .collect(Collectors.toList());
+        TbMsg msg = envelope.getMsg();
+        EntityId originatorEntityId = msg.getOriginator();
+        Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(originatorEntityId);
+
+        if (address.isPresent()) {
+            onRemoteTellNext(address.get(), envelope);
+        } else {
+            onLocalTellNext(envelope);
+        }
+    }
+
+    private void onRemoteTellNext(ServerAddress serverAddress, RuleNodeToRuleChainTellNextMsg envelope) {
+        TbMsg msg = envelope.getMsg();
+        logger.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
+        envelope = new RemoteToRuleChainTellNextMsg(envelope, tenantId, entityId);
+        systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(serverAddress, envelope));
+    }
 
+    private void onLocalTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
         TbMsg msg = envelope.getMsg();
+        RuleNodeId originatorNodeId = envelope.getOriginator();
+        List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
+                .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
+                .collect(Collectors.toList());
         int relationsCount = relations.size();
         EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
         if (relationsCount == 0) {
-            queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+            if (ackId != null) {
+                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+            }
         } else if (relationsCount == 1) {
             for (RuleNodeRelation relation : relations) {
                 pushToTarget(msg, relation.getOut(), relation.getType());
@@ -244,7 +268,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
                 }
             }
             //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
-            queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+            if (ackId != null) {
+                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+            }
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
index 8b13747..ae4ae45 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
@@ -20,12 +20,13 @@ import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
 
 /**
  * Created by ashvayka on 19.03.18.
  */
 @Data
-public final class RuleChainToRuleChainMsg implements TbActorMsg {
+public final class RuleChainToRuleChainMsg implements TbActorMsg, RuleChainAwareMsg {
 
     private final RuleChainId target;
     private final RuleChainId source;
@@ -34,6 +35,11 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg {
     private final boolean enqueue;
 
     @Override
+    public RuleChainId getRuleChainId() {
+        return target;
+    }
+
+    @Override
     public MsgType getMsgType() {
         return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
index c0a475c..9414892 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -27,7 +27,7 @@ import java.util.Set;
  * Created by ashvayka on 19.03.18.
  */
 @Data
-final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
+class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
 
     private final RuleNodeId originator;
     private final Set<String> relationTypes;
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index dc48e88..b4ab0d2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.rule.RuleChain;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
+import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import scala.concurrent.duration.Duration;
@@ -94,7 +95,8 @@ public class TenantActor extends RuleChainManagerActor {
                 onToDeviceActorMsg((DeviceAwareMsg) msg);
                 break;
             case RULE_CHAIN_TO_RULE_CHAIN_MSG:
-                onRuleChainMsg((RuleChainToRuleChainMsg) msg);
+            case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
+                onRuleChainMsg((RuleChainAwareMsg) msg);
                 break;
             default:
                 return false;
@@ -120,8 +122,8 @@ public class TenantActor extends RuleChainManagerActor {
     	else logger.info("[{}] No Root Chain", msg);
     }
 
-    private void onRuleChainMsg(RuleChainToRuleChainMsg msg) {
-        ruleChainManager.getOrCreateActor(context(), msg.getTarget()).tell(msg, self());
+    private void onRuleChainMsg(RuleChainAwareMsg msg) {
+        ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
     }
 
 
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java
new file mode 100644
index 0000000..e261cbb
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.aware;
+
+import org.thingsboard.server.common.data.id.RuleChainId;
+
+public interface RuleChainAwareMsg {
+
+	RuleChainId getRuleChainId();
+	
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 7702788..c8b5c4e 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -63,6 +63,11 @@ public enum MsgType {
     RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
 
     /**
+     * Message forwarded from original rule chain to remote rule chain due to change in the cluster structure or originator entity of the TbMsg.
+     */
+    REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG,
+
+    /**
      * Message that is sent by RuleActor implementation to RuleActor itself to log the error.
      */
     RULE_TO_SELF_ERROR_MSG,
@@ -101,6 +106,10 @@ public enum MsgType {
     /**
      * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
      */
-    RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG;
+    RULE_ENGINE_QUEUE_PUT_ACK_MSG,
+    ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
+    TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
+    SESSION_TIMEOUT_MSG,
+    SESSION_CTRL_MSG;
 
 }
diff --git a/docker/cassandra-upgrade/Dockerfile b/docker/cassandra-upgrade/Dockerfile
new file mode 100644
index 0000000..312db0d
--- /dev/null
+++ b/docker/cassandra-upgrade/Dockerfile
@@ -0,0 +1,24 @@
+#
+# Copyright © 2016-2018 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-jre
+
+ADD upgrade.sh /upgrade.sh
+ADD thingsboard.deb /thingsboard.deb
+
+RUN apt-get update \
+        && apt-get install -y nmap \
+        && chmod +x /upgrade.sh
diff --git a/docker/cassandra-upgrade/Makefile b/docker/cassandra-upgrade/Makefile
new file mode 100644
index 0000000..f4c6825
--- /dev/null
+++ b/docker/cassandra-upgrade/Makefile
@@ -0,0 +1,12 @@
+VERSION=2.0.3
+PROJECT=thingsboard
+APP=cassandra-upgrade
+
+build:
+	cp ../../application/target/thingsboard.deb .
+	docker build --pull -t ${PROJECT}/${APP}:${VERSION} -t ${PROJECT}/${APP}:latest .
+	rm thingsboard.deb
+
+push: build
+	docker push ${PROJECT}/${APP}:${VERSION}
+	docker push ${PROJECT}/${APP}:latest
diff --git a/docker/cassandra-upgrade/upgrade.sh b/docker/cassandra-upgrade/upgrade.sh
new file mode 100755
index 0000000..dac4919
--- /dev/null
+++ b/docker/cassandra-upgrade/upgrade.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+#
+# Copyright © 2016-2018 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+dpkg -i /thingsboard.deb
+
+until nmap $CASSANDRA_HOST -p $CASSANDRA_PORT | grep "$CASSANDRA_PORT/tcp open"
+do
+  echo "Wait for cassandra db to start..."
+  sleep 10
+done
+
+echo "Upgrading 'Thingsboard' schema..."
+/usr/share/thingsboard/bin/install/upgrade.sh --fromVersion=$UPGRADE_FROM_VERSION
diff --git a/docker/k8s/cassandra-upgrade.yaml b/docker/k8s/cassandra-upgrade.yaml
new file mode 100644
index 0000000..881c5b8
--- /dev/null
+++ b/docker/k8s/cassandra-upgrade.yaml
@@ -0,0 +1,43 @@
+#
+# Copyright © 2016-2018 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: v1
+kind: Pod
+metadata:
+  name: cassandra-upgrade
+spec:
+  containers:
+  - name: cassandra-upgrade
+    imagePullPolicy: Always
+    image: thingsboard/cassandra-upgrade:2.0.3
+    env:
+    - name: ADD_DEMO_DATA
+      value: "true"
+    - name : CASSANDRA_HOST
+      value: "cassandra-headless"
+    - name : CASSANDRA_PORT
+      value: "9042"
+    - name : DATABASE_TYPE
+      value: "cassandra"
+    - name : CASSANDRA_URL
+      value: "cassandra-headless:9042"
+    - name : UPGRADE_FROM_VERSION
+      value: "1.4.0"
+    command:
+    - sh
+    - -c
+    - /upgrade.sh
+  restartPolicy: Never
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
index 5a30dcb..e7a54af 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
@@ -47,11 +47,12 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 
 public class TbMsgGeneratorNode implements TbNode {
 
-    public static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
+    private static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
 
     private TbMsgGeneratorNodeConfiguration config;
     private ScriptEngine jsEngine;
     private long delay;
+    private long lastScheduledTs;
     private EntityId originatorId;
     private UUID nextTickId;
     private TbMsg prevMsg;
@@ -66,28 +67,40 @@ public class TbMsgGeneratorNode implements TbNode {
             originatorId = ctx.getSelfId();
         }
         this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType");
-        sentTickMsg(ctx);
+        scheduleTickMsg(ctx);
     }
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) {
         if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
             withCallback(generate(ctx),
-                    m -> {ctx.tellNext(m, SUCCESS); sentTickMsg(ctx);},
-                    t -> {ctx.tellFailure(msg, t); sentTickMsg(ctx);});
+                    m -> {
+                        ctx.tellNext(m, SUCCESS);
+                        scheduleTickMsg(ctx);
+                    },
+                    t -> {
+                        ctx.tellFailure(msg, t);
+                        scheduleTickMsg(ctx);
+                    });
         }
     }
 
-    private void sentTickMsg(TbContext ctx) {
+    private void scheduleTickMsg(TbContext ctx) {
+        long curTs = System.currentTimeMillis();
+        if (lastScheduledTs == 0L) {
+            lastScheduledTs = curTs;
+        }
+        lastScheduledTs = lastScheduledTs + delay;
+        long curDelay = Math.max(0L, (lastScheduledTs - curTs));
         TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
         nextTickId = tickMsg.getId();
-        ctx.tellSelf(tickMsg, delay);
+        ctx.tellSelf(tickMsg, curDelay);
     }
 
     private ListenableFuture<TbMsg> generate(TbContext ctx) {
         return ctx.getJsExecutor().executeAsync(() -> {
             if (prevMsg == null) {
-                prevMsg = ctx.newMsg( "", originatorId, new TbMsgMetaData(), "{}");
+                prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}");
             }
             TbMsg generated = jsEngine.executeGenerate(prevMsg);
             prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());