thingsboard-memoizeit

Implementation draft

9/26/2018 8:54:59 AM

Changes

common/pom.xml 3(+2 -1)

pom.xml 27(+11 -16)

Details

diff --git a/application/pom.xml b/application/pom.xml
index 3d7cbe1..eadcf7b 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -77,6 +77,10 @@
             <artifactId>dao</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.thingsboard.common</groupId>
+            <artifactId>queue</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.thingsboard</groupId>
             <artifactId>dao</artifactId>
             <type>test-jar</type>
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
index f9caafa..04c4ced 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
@@ -26,6 +26,8 @@ public interface DiscoveryService {
 
     void unpublishCurrentServer();
 
+    String getNodeId();
+
     ServerInstance getCurrentServer();
 
     List<ServerInstance> getOtherServers();
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
index fd91643..64d1456 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
@@ -43,6 +43,11 @@ public class DummyDiscoveryService implements DiscoveryService {
     }
 
     @Override
+    public String getNodeId() {
+        return null;
+    }
+
+    @Override
     public void publishCurrentServer() {
         //Do nothing
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 14196a6..b4829f2 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.service.cluster.discovery;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -93,11 +94,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     private CuratorFramework client;
     private PathChildrenCache cache;
     private String nodePath;
-
+    //TODO: make persistent?
+    private String nodeId;
 
     @PostConstruct
     public void init() {
         log.info("Initializing...");
+        this.nodeId = RandomStringUtils.randomAlphabetic(10);
         Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
         Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
         Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
@@ -181,6 +184,11 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     }
 
     @Override
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
     public ServerInstance getCurrentServer() {
         return serverInstance.getSelf();
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java
new file mode 100644
index 0000000..461b894
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Created by ashvayka on 26.09.18.
+ */
+@Slf4j
+public abstract class AbstractJsInvokeService implements JsInvokeService {
+
+    protected Map<UUID, String> scriptIdToNameMap = new ConcurrentHashMap<>();
+    protected Map<UUID, AtomicInteger> blackListedFunctions = new ConcurrentHashMap<>();
+
+    @Override
+    public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
+        UUID scriptId = UUID.randomUUID();
+        String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
+        String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
+        return doEval(scriptId, functionName, jsScript);
+    }
+
+    @Override
+    public ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args) {
+        String functionName = scriptIdToNameMap.get(scriptId);
+        if (functionName == null) {
+            return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
+        }
+        if (!isBlackListed(scriptId)) {
+            return doInvokeFunction(scriptId, functionName, args);
+        } else {
+            return Futures.immediateFailedFuture(
+                    new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!"));
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> release(UUID scriptId) {
+        String functionName = scriptIdToNameMap.get(scriptId);
+        if (functionName != null) {
+            try {
+                scriptIdToNameMap.remove(scriptId);
+                blackListedFunctions.remove(scriptId);
+                doRelease(scriptId, functionName);
+            } catch (Exception e) {
+                return Futures.immediateFailedFuture(e);
+            }
+        }
+        return Futures.immediateFuture(null);
+    }
+
+    protected abstract ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody);
+
+    protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args);
+
+    protected abstract void doRelease(UUID scriptId, String functionName) throws Exception;
+
+    protected abstract int getMaxErrors();
+
+    protected void onScriptExecutionError(UUID scriptId) {
+        blackListedFunctions.computeIfAbsent(scriptId, key -> new AtomicInteger(0)).incrementAndGet();
+    }
+
+    private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
+        switch (scriptType) {
+            case RULE_NODE_SCRIPT:
+                return RuleNodeScriptFactory.generateRuleNodeScript(functionName, scriptBody, argNames);
+            default:
+                throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
+        }
+    }
+
+    private boolean isBlackListed(UUID scriptId) {
+        if (blackListedFunctions.containsKey(scriptId)) {
+            AtomicInteger errorCount = blackListedFunctions.get(scriptId);
+            return errorCount.get() >= getMaxErrors();
+        } else {
+            return false;
+        }
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java
index 4acef05..7c8eb2a 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java
@@ -20,38 +20,24 @@ import com.google.common.util.concurrent.ListenableFuture;
 import delight.nashornsandbox.NashornSandbox;
 import delight.nashornsandbox.NashornSandboxes;
 import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.thingsboard.server.common.data.id.EntityId;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.script.Invocable;
 import javax.script.ScriptEngine;
 import javax.script.ScriptException;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
 
 @Slf4j
-public abstract class AbstractNashornJsInvokeService implements JsInvokeService {
+public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeService {
 
     private NashornSandbox sandbox;
     private ScriptEngine engine;
     private ExecutorService monitorExecutorService;
 
-    private final Map<UUID, String> functionsMap = new ConcurrentHashMap<>();
-    private final Map<BlackListKey, BlackListInfo> blackListedFunctions = new ConcurrentHashMap<>();
-
-    private final Map<String, ScriptInfo> scriptKeyToInfo = new ConcurrentHashMap<>();
-    private final Map<UUID, ScriptInfo> scriptIdToInfo = new ConcurrentHashMap<>();
-
     @PostConstruct
     public void init() {
         if (useJsSandbox()) {
@@ -80,181 +66,44 @@ public abstract class AbstractNashornJsInvokeService implements JsInvokeService 
 
     protected abstract long getMaxCpuTime();
 
-    protected abstract int getMaxErrors();
-
     @Override
-    public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
-        ScriptInfo scriptInfo = deduplicate(scriptType, scriptBody);
-        UUID scriptId = scriptInfo.getId();
-        AtomicInteger duplicateCount = scriptInfo.getCount();
-
-        synchronized (scriptInfo.getLock()) {
-            if (duplicateCount.compareAndSet(0, 1)) {
-                try {
-                    evaluate(scriptId, scriptType, scriptBody, argNames);
-                } catch (Exception e) {
-                    duplicateCount.decrementAndGet();
-                    log.warn("Failed to compile JS script: {}", e.getMessage(), e);
-                    return Futures.immediateFailedFuture(e);
-                }
+    protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String jsScript) {
+        try {
+            if (useJsSandbox()) {
+                sandbox.eval(jsScript);
             } else {
-                duplicateCount.incrementAndGet();
+                engine.eval(jsScript);
             }
+            scriptIdToNameMap.put(scriptId, functionName);
+        } catch (Exception e) {
+            log.warn("Failed to compile JS script: {}", e.getMessage(), e);
+            return Futures.immediateFailedFuture(e);
         }
         return Futures.immediateFuture(scriptId);
     }
 
-    private void evaluate(UUID scriptId, JsScriptType scriptType, String scriptBody, String... argNames) throws ScriptException {
-        String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
-        String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
-        if (useJsSandbox()) {
-            sandbox.eval(jsScript);
-        } else {
-            engine.eval(jsScript);
-        }
-        functionsMap.put(scriptId, functionName);
-    }
-
     @Override
-    public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
-        String functionName = functionsMap.get(scriptId);
-        if (functionName == null) {
-            String message = "No compiled script found for scriptId: [" + scriptId + "]!";
-            log.warn(message);
-            return Futures.immediateFailedFuture(new RuntimeException(message));
-        }
-
-        BlackListInfo blackListInfo = blackListedFunctions.get(new BlackListKey(scriptId, entityId));
-        if (blackListInfo != null && blackListInfo.getCount() >= getMaxErrors()) {
-            RuntimeException throwable = new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!", blackListInfo.getCause());
-            throwable.printStackTrace();
-            return Futures.immediateFailedFuture(throwable);
-        }
-
+    protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
         try {
-            return invoke(functionName, args);
+            Object result;
+            if (useJsSandbox()) {
+                result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
+            } else {
+                result = ((Invocable) engine).invokeFunction(functionName, args);
+            }
+            return Futures.immediateFuture(result);
         } catch (Exception e) {
-            BlackListKey blackListKey = new BlackListKey(scriptId, entityId);
-            blackListedFunctions.computeIfAbsent(blackListKey, key -> new BlackListInfo()).incrementWithReason(e);
+            onScriptExecutionError(scriptId);
             return Futures.immediateFailedFuture(e);
         }
     }
 
-    private ListenableFuture<Object> invoke(String functionName, Object... args) throws ScriptException, NoSuchMethodException {
-        Object result;
+    protected void doRelease(UUID scriptId, String functionName) throws ScriptException {
         if (useJsSandbox()) {
-            result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
+            sandbox.eval(functionName + " = undefined;");
         } else {
-            result = ((Invocable) engine).invokeFunction(functionName, args);
-        }
-        return Futures.immediateFuture(result);
-    }
-
-    @Override
-    public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
-        ScriptInfo scriptInfo = scriptIdToInfo.get(scriptId);
-        if (scriptInfo == null) {
-            log.warn("Script release called for not existing script id [{}]", scriptId);
-            return Futures.immediateFuture(null);
-        }
-
-        synchronized (scriptInfo.getLock()) {
-            int remainingDuplicates = scriptInfo.getCount().decrementAndGet();
-            if (remainingDuplicates > 0) {
-                return Futures.immediateFuture(null);
-            }
-
-            String functionName = functionsMap.get(scriptId);
-            if (functionName != null) {
-                try {
-                    if (useJsSandbox()) {
-                        sandbox.eval(functionName + " = undefined;");
-                    } else {
-                        engine.eval(functionName + " = undefined;");
-                    }
-                    functionsMap.remove(scriptId);
-                    blackListedFunctions.remove(new BlackListKey(scriptId, entityId));
-                } catch (ScriptException e) {
-                    log.error("Could not release script [{}] [{}]", scriptId, remainingDuplicates);
-                    return Futures.immediateFailedFuture(e);
-                }
-            } else {
-                log.warn("Function name do not exist for script [{}] [{}]", scriptId, remainingDuplicates);
-            }
+            engine.eval(functionName + " = undefined;");
         }
-        return Futures.immediateFuture(null);
-    }
-
-
-    private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
-        switch (scriptType) {
-            case RULE_NODE_SCRIPT:
-                return RuleNodeScriptFactory.generateRuleNodeScript(functionName, scriptBody, argNames);
-            default:
-                throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
-        }
-    }
-
-    private ScriptInfo deduplicate(JsScriptType scriptType, String scriptBody) {
-        ScriptInfo meta = ScriptInfo.preInit();
-        String key = deduplicateKey(scriptType, scriptBody);
-        ScriptInfo latestMeta = scriptKeyToInfo.computeIfAbsent(key, i -> meta);
-        return scriptIdToInfo.computeIfAbsent(latestMeta.getId(), i -> latestMeta);
     }
 
-    private String deduplicateKey(JsScriptType scriptType, String scriptBody) {
-        return scriptType + "_" + scriptBody;
-    }
-
-    @Getter
-    private static class ScriptInfo {
-        private final UUID id;
-        private final Object lock;
-        private final AtomicInteger count;
-
-        ScriptInfo(UUID id, Object lock, AtomicInteger count) {
-            this.id = id;
-            this.lock = lock;
-            this.count = count;
-        }
-
-        static ScriptInfo preInit() {
-            UUID preId = UUID.randomUUID();
-            AtomicInteger preCount = new AtomicInteger();
-            Object preLock = new Object();
-            return new ScriptInfo(preId, preLock, preCount);
-        }
-    }
-
-    @EqualsAndHashCode
-    @Getter
-    @RequiredArgsConstructor
-    private static class BlackListKey {
-        private final UUID scriptId;
-        private final EntityId entityId;
-
-    }
-
-    @Data
-    private static class BlackListInfo {
-        private final AtomicInteger count;
-        private Exception ex;
-
-        BlackListInfo() {
-            this.count = new AtomicInteger(0);
-        }
-
-        void incrementWithReason(Exception e) {
-            count.incrementAndGet();
-            ex = e;
-        }
-
-        int getCount() {
-            return count.get();
-        }
-
-        Exception getCause() {
-            return ex;
-        }
-    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeRequest.java b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeRequest.java
new file mode 100644
index 0000000..1041a26
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeRequest.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public class JsInvokeRequest {
+
+    private String scriptId;
+    private String scriptBody;
+    private List<String> args;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeResponse.java b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeResponse.java
new file mode 100644
index 0000000..1638de8
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeResponse.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public class JsInvokeResponse {
+
+    private String scriptId;
+    private String scriptBody;
+    private List<String> args;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java
index 1da9859..031a5ee 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java
@@ -25,8 +25,8 @@ public interface JsInvokeService {
 
     ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames);
 
-    ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args);
+    ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args);
 
-    ListenableFuture<Void> release(UUID scriptId, EntityId entityId);
+    ListenableFuture<Void> release(UUID scriptId);
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index 3a85cc5..da8d62a 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -15,32 +15,172 @@
  */
 package org.thingsboard.server.service.script;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Service;
-import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Slf4j
-@ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "local", matchIfMissing = true)
+@ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true)
 @Service
-public class RemoteJsInvokeService implements JsInvokeService {
+public class RemoteJsInvokeService extends AbstractJsInvokeService {
+
+    @Autowired
+    private DiscoveryService discoveryService;
+
+    @Autowired
+    private TbKafkaSettings kafkaSettings;
+
+    @Value("${js.remote.use_js_sandbox}")
+    private boolean useJsSandbox;
+
+    @Value("${js.remote.request_topic}")
+    private String requestTopic;
+
+    @Value("${js.remote.response_topic_prefix}")
+    private String responseTopicPrefix;
+
+    @Value("${js.remote.max_pending_requests}")
+    private long maxPendingRequests;
+
+    @Value("${js.remote.max_requests_timeout}")
+    private long maxRequestsTimeout;
+
+    @Value("${js.remote.response_poll_duration}")
+    private long responsePollDuration;
+
+    @Getter
+    @Value("${js.remote.max_errors}")
+    private int maxErrors;
+
+    private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate;
+    protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
+
+    @PostConstruct
+    public void init() {
+        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<JsInvokeProtos.RemoteJsRequest> requestBuilder = TBKafkaProducerTemplate.builder();
+        requestBuilder.settings(kafkaSettings);
+        requestBuilder.defaultTopic(requestTopic);
+        requestBuilder.encoder(new RemoteJsRequestEncoder());
+
+        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder();
+        responseBuilder.settings(kafkaSettings);
+        responseBuilder.topic(responseTopicPrefix + "." + discoveryService.getNodeId());
+        responseBuilder.clientId(discoveryService.getNodeId());
+        responseBuilder.groupId("rule-engine-node");
+        responseBuilder.autoCommit(true);
+        responseBuilder.autoCommitIntervalMs(100);
+        responseBuilder.decoder(new RemoteJsResponseDecoder());
+
+        TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
+                <JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> builder = TbKafkaRequestTemplate.builder();
+        builder.requestTemplate(requestBuilder.build());
+        builder.responseTemplate(responseBuilder.build());
+        builder.maxPendingRequests(maxPendingRequests);
+        builder.maxRequestTimeout(maxRequestsTimeout);
+        builder.pollInterval(responsePollDuration);
+        kafkaTemplate = builder.build();
+    }
+
+    @PreDestroy
+    public void destroy(){
+        if(kafkaTemplate != null){
+            kafkaTemplate.stop();
+        }
+    }
 
     @Override
-    public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
-        return null;
+    protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody) {
+        JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder()
+                .setScriptIdMSB(scriptId.getMostSignificantBits())
+                .setScriptIdLSB(scriptId.getLeastSignificantBits())
+                .setFunctionName(functionName)
+                .setScriptBody(scriptBody).build();
+
+        JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
+                .setCompileRequest(jsRequest)
+                .build();
+
+        ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
+        return Futures.transform(future, response -> {
+            JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();
+            UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
+            if (compilationResult.getSuccess()) {
+                scriptIdToNameMap.put(scriptId, functionName);
+                scriptIdToBodysMap.put(scriptId, scriptBody);
+                return compiledScriptId;
+            } else {
+                log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
+                throw new RuntimeException(compilationResult.getErrorCode().name());
+            }
+        });
     }
 
     @Override
-    public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
-        return null;
+    protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
+        String scriptBody = scriptIdToBodysMap.get(scriptId);
+        if (scriptBody == null) {
+            return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
+        }
+        JsInvokeProtos.JsInvokeRequest jsRequest = JsInvokeProtos.JsInvokeRequest.newBuilder()
+                .setScriptIdMSB(scriptId.getMostSignificantBits())
+                .setScriptIdLSB(scriptId.getLeastSignificantBits())
+                .setFunctionName(functionName)
+                .setScriptBody(scriptIdToBodysMap.get(scriptId)).build();
+
+        JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
+                .setInvokeRequest(jsRequest)
+                .build();
+
+        ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
+        return Futures.transform(future, response -> {
+            JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse();
+            if (invokeResult.getSuccess()) {
+                return invokeResult.getResult();
+            } else {
+                log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
+                throw new RuntimeException(invokeResult.getErrorCode().name());
+            }
+        });
     }
 
     @Override
-    public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
-        return null;
+    protected void doRelease(UUID scriptId, String functionName) throws Exception {
+        JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder()
+                .setScriptIdMSB(scriptId.getMostSignificantBits())
+                .setScriptIdLSB(scriptId.getLeastSignificantBits())
+                .setFunctionName(functionName).build();
+
+        JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
+                .setReleaseRequest(jsRequest)
+                .build();
+
+        ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
+        JsInvokeProtos.RemoteJsResponse response = future.get();
+
+        JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse();
+        UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
+        if (compilationResult.getSuccess()) {
+            scriptIdToBodysMap.remove(scriptId);
+        } else {
+            log.debug("[{}] Failed to release script due", compiledScriptId);
+        }
     }
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java
new file mode 100644
index 0000000..7d5d547
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public class RemoteJsRequestEncoder implements TbKafkaEncoder<JsInvokeProtos.RemoteJsRequest> {
+    @Override
+    public byte[] encode(JsInvokeProtos.RemoteJsRequest value) {
+        return value.toByteArray();
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java
new file mode 100644
index 0000000..1975988
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.kafka.TbKafkaDecoder;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public class RemoteJsResponseDecoder implements TbKafkaDecoder<JsInvokeProtos.RemoteJsResponse> {
+
+    @Override
+    public JsInvokeProtos.RemoteJsResponse decode(byte[] data) throws IOException {
+        return JsInvokeProtos.RemoteJsResponse.parseFrom(data);
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
index c001a1a..021e6da 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
@@ -165,7 +165,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
     private JsonNode executeScript(TbMsg msg) throws ScriptException {
         try {
             String[] inArgs = prepareArgs(msg);
-            String eval = sandboxService.invokeFunction(this.scriptId, this.entityId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
+            String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
             return mapper.readTree(eval);
         } catch (ExecutionException e) {
             if (e.getCause() instanceof ScriptException) {
@@ -179,6 +179,6 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
     }
 
     public void destroy() {
-        sandboxService.release(this.scriptId, this.entityId);
+        sandboxService.release(this.scriptId);
     }
 }
diff --git a/application/src/main/proto/jsinvoke.proto b/application/src/main/proto/jsinvoke.proto
index c465309..475c7bd 100644
--- a/application/src/main/proto/jsinvoke.proto
+++ b/application/src/main/proto/jsinvoke.proto
@@ -14,24 +14,68 @@
  * limitations under the License.
  */
 syntax = "proto3";
-package cluster;
+package js;
 
 option java_package = "org.thingsboard.server.gen.js";
 option java_outer_classname = "JsInvokeProtos";
 
-service JsInvokeRpcService {
-  rpc handleMsgs(stream JsInvokeRequest) returns (stream JsInvokeResponse) {}
+enum JsInvokeErrorCode{
+  COMPILATION_ERROR = 0;
+  RUNTIME_ERROR = 1;
+  CPU_USAGE_ERROR = 2;
+}
+
+message RemoteJsRequest {
+  JsCompileRequest compileRequest = 1;
+  JsInvokeRequest invokeRequest = 2;
+  JsReleaseRequest releaseRequest = 3;
+}
+
+message RemoteJsResponse {
+  JsCompileResponse compileResponse = 1;
+  JsInvokeResponse invokeResponse = 2;
+  JsReleaseResponse releaseResponse = 3;
+}
+
+message JsCompileRequest {
+  int64 scriptIdMSB = 1;
+  int64 scriptIdLSB = 2;
+  string functionName = 3;
+  string scriptBody = 4;
+}
+
+message JsReleaseRequest {
+  int64 scriptIdMSB = 1;
+  int64 scriptIdLSB = 2;
+  string functionName = 3;
+}
+
+message JsReleaseResponse {
+  bool success = 1;
+  int64 scriptIdMSB = 2;
+  int64 scriptIdLSB = 3;
+}
+
+message JsCompileResponse {
+  bool success = 1;
+  int64 scriptIdMSB = 2;
+  int64 scriptIdLSB = 3;
+  JsInvokeErrorCode errorCode = 4;
+  string errorDetails = 5;
 }
 
 message JsInvokeRequest {
-  string scriptId = 1;
-  string scriptBody = 2;
-  repeated string args = 3;
+  int64 scriptIdMSB = 1;
+  int64 scriptIdLSB = 2;
+  string functionName = 3;
+  string scriptBody = 4;
+  repeated string args = 5;
 }
 
 message JsInvokeResponse {
-  string result = 1;
-  string errorName = 2;
-  string errorDetails = 3;
+  bool success = 1;
+  string result = 2;
+  JsInvokeErrorCode errorCode = 3;
+  string errorDetails = 4;
 }
 
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 6851b58..eb40ac5 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -405,8 +405,17 @@ state:
   defaultInactivityTimeoutInSec: 10
   defaultStateCheckIntervalInSec: 10
 
+kafka:
+  enabled: true
+  bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
+  acks: "${TB_KAFKA_ACKS:all}"
+  retries: "${TB_KAFKA_RETRIES:1}"
+  batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
+  linger.ms: "${TB_KAFKA_LINGER_MS:1}"
+  buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
+
 js:
-  evaluator: "${JS_EVALUATOR:external}" # local/external
+  evaluator: "${JS_EVALUATOR:local}" # local/external
   # Built-in JVM JavaScript environment properties
   local:
     # Use Sandboxed (secured) JVM JavaScript environment
@@ -421,3 +430,15 @@ js:
   remote:
     # Use Sandboxed (secured) JVM JavaScript environment
     use_js_sandbox: "${USE_REMOTE_JS_SANDBOX:true}"
+    # JS Eval request topic
+    request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
+    # JS Eval responses topic prefix that is combined with node id
+    response_topic_prefix: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.responses}"
+    # JS Eval max pending requests
+    max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
+    # JS Eval max request timeout
+    max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:20000}"
+    # JS response poll interval
+    response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
+    # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
+    max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
diff --git a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
index 730121f..fe0f381 100644
--- a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
@@ -236,8 +236,8 @@ public class RuleNodeJsScriptEngineTest {
                 startLatch.await();
                 UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get();
                 scriptIds.put(scriptId, new Object());
-                jsSandboxService.invokeFunction(scriptId, ruleNodeId, "{}", "{}", "TEXT").get();
-                jsSandboxService.release(scriptId, ruleNodeId).get();
+                jsSandboxService.invokeFunction(scriptId, "{}", "{}", "TEXT").get();
+                jsSandboxService.release(scriptId).get();
             }
         } catch (Throwable th) {
             failedCount.incrementAndGet();

common/pom.xml 3(+2 -1)

diff --git a/common/pom.xml b/common/pom.xml
index fbff206..9cac6a7 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -37,7 +37,8 @@
     <modules>
         <module>data</module>
         <module>message</module>
-	<module>transport</module>
+        <module>transport</module>
+        <module>queue</module>
     </modules>
 
 </project>
diff --git a/common/queue/pom.xml b/common/queue/pom.xml
new file mode 100644
index 0000000..eb72264
--- /dev/null
+++ b/common/queue/pom.xml
@@ -0,0 +1,95 @@
+<!--
+
+    Copyright © 2016-2018 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.thingsboard</groupId>
+        <version>2.1.1-SNAPSHOT</version>
+        <artifactId>common</artifactId>
+    </parent>
+    <groupId>org.thingsboard.common</groupId>
+    <artifactId>queue</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Thingsboard Server Queue components</name>
+    <url>https://thingsboard.io</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <main.dir>${basedir}/../..</main.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.thingsboard.common</groupId>
+            <artifactId>data</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.thingsboard.common</groupId>
+            <artifactId>message</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-autoconfigure</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbJsEvaluator.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbJsEvaluator.java
new file mode 100644
index 0000000..92e46b5
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbJsEvaluator.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+@Slf4j
+public class TbJsEvaluator {
+
+//    public static void main(String[] args) {
+//        ExecutorService executorService = Executors.newCachedThreadPool();
+//
+//        TBKafkaConsumerTemplate requestConsumer = new TBKafkaConsumerTemplate();
+//        requestConsumer.subscribe("requests");
+//
+//        LongAdder responseCounter = new LongAdder();
+//        TBKafkaProducerTemplate responseProducer = new TBKafkaProducerTemplate();
+//        executorService.submit((Runnable) () -> {
+//            while (true) {
+//                ConsumerRecords<String, String> requests = requestConsumer.poll(100);
+//                requests.forEach(request -> {
+//                    Header header = request.headers().lastHeader("responseTopic");
+//                    ProducerRecord<String, String> response = new ProducerRecord<>(new String(header.value(), StandardCharsets.UTF_8),
+//                            request.key(), request.value());
+//                    responseProducer.send(response);
+//                    responseCounter.add(1);
+//                });
+//            }
+//        });
+//
+//        executorService.submit((Runnable) () -> {
+//            while (true) {
+//                log.warn("Requests: [{}], Responses: [{}]", responseCounter.longValue(), responseCounter.longValue());
+//                try {
+//                    Thread.sleep(1000L);
+//                } catch (InterruptedException e) {
+//                    e.printStackTrace();
+//                }
+//            }
+//        });
+//
+//    }
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java
new file mode 100644
index 0000000..7ea40cd
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+public class TBKafkaAdmin {
+
+    AdminClient client;
+
+    public TBKafkaAdmin() {
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("group.id", "test");
+        props.put("enable.auto.commit", "true");
+        props.put("auto.commit.interval.ms", "1000");
+        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        client = AdminClient.create(props);
+    }
+
+    public CreateTopicsResult createTopic(NewTopic topic){
+        return client.createTopics(Collections.singletonList(topic));
+    }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
new file mode 100644
index 0000000..1b1bb5b
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+public class TBKafkaConsumerTemplate<T> {
+
+    private final KafkaConsumer<String, byte[]> consumer;
+    private final TbKafkaDecoder<T> decoder;
+    @Getter
+    private final String topic;
+
+    @Builder
+    private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
+                                    String clientId, String groupId, String topic,
+                                    boolean autoCommit, long autoCommitIntervalMs) {
+        Properties props = settings.toProps();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        this.consumer = new KafkaConsumer<>(props);
+        this.decoder = decoder;
+        this.topic = topic;
+    }
+
+    public void subscribe() {
+        consumer.subscribe(Collections.singletonList(topic));
+    }
+
+    public void unsubscribe() {
+        consumer.unsubscribe();
+    }
+
+    public ConsumerRecords<String, byte[]> poll(Duration duration) {
+        return consumer.poll(duration);
+    }
+
+    public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
+        return decoder.decode(record.value());
+    }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java
new file mode 100644
index 0000000..746dfda
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public interface TbKafkaDecoder<T> {
+
+    T decode(byte[] data) throws IOException;
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEncoder.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEncoder.java
new file mode 100644
index 0000000..a3616cb
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEncoder.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public interface TbKafkaEncoder<T> {
+
+    byte[] encode(T value);
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaPartitioner.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaPartitioner.java
new file mode 100644
index 0000000..2d6e0d2
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaPartitioner.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public interface TbKafkaPartitioner<T> extends Partitioner {
+
+    int partition(String topic, String key, T value, byte[] encodedValue, List<PartitionInfo> partitions);
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
new file mode 100644
index 0000000..b39e63e
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.header.Header;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+public class TBKafkaProducerTemplate<T> {
+
+    private final KafkaProducer<String, byte[]> producer;
+    private final TbKafkaEncoder<T> encoder;
+    private final TbKafkaPartitioner<T> partitioner;
+    private final List<PartitionInfo> partitionInfoList;
+    @Getter
+    private final String defaultTopic;
+
+    @Builder
+    private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaPartitioner<T> partitioner, String defaultTopic) {
+        Properties props = settings.toProps();
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        this.producer = new KafkaProducer<>(props);
+        //Maybe this should not be cached, but we don't plan to change size of partitions
+        this.partitionInfoList = producer.partitionsFor(defaultTopic);
+        this.encoder = encoder;
+        this.partitioner = partitioner;
+        this.defaultTopic = defaultTopic;
+    }
+
+    public Future<RecordMetadata> send(String key, T value) {
+        return send(key, value, null, null);
+    }
+
+    public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers) {
+        return send(key, value, null, headers);
+    }
+
+    public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers) {
+        return send(this.defaultTopic, key, value, timestamp, headers);
+    }
+
+    public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers) {
+        byte[] data = encoder.encode(value);
+        ProducerRecord<String, byte[]> record;
+        Integer partition = getPartition(topic, key, value, data);
+        record = new ProducerRecord<>(this.defaultTopic, partition, timestamp, key, data, headers);
+        return producer.send(record);
+    }
+
+    private Integer getPartition(String topic, String key, T value, byte[] data) {
+        if (partitioner == null) {
+            return null;
+        } else {
+            return partitioner.partition(this.defaultTopic, key, value, data, partitionInfoList);
+        }
+    }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaProperty.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaProperty.java
new file mode 100644
index 0000000..784a5c5
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaProperty.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Data
+public class TbKafkaProperty {
+
+    private String key;
+    private String value;
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
new file mode 100644
index 0000000..be8c087
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Slf4j
+public class TbKafkaRequestTemplate<Request, Response> {
+
+    private final TBKafkaProducerTemplate<Request> requestTemplate;
+    private final TBKafkaConsumerTemplate<Response> responseTemplate;
+    private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests;
+    private final ExecutorService executor;
+    private final long maxRequestTimeout;
+    private final long maxPendingRequests;
+    private final long pollInterval;
+    private volatile long tickTs = 0L;
+    private volatile long tickSize = 0L;
+    private volatile boolean stopped = false;
+
+    @Builder
+    public TbKafkaRequestTemplate(TBKafkaProducerTemplate<Request> requestTemplate, TBKafkaConsumerTemplate<Response> responseTemplate,
+                                  long maxRequestTimeout,
+                                  long maxPendingRequests,
+                                  long pollInterval,
+                                  ExecutorService executor) {
+        this.requestTemplate = requestTemplate;
+        this.responseTemplate = responseTemplate;
+        this.pendingRequests = new ConcurrentHashMap<>();
+        this.maxRequestTimeout = maxRequestTimeout;
+        this.maxPendingRequests = maxPendingRequests;
+        this.pollInterval = pollInterval;
+        if (executor != null) {
+            this.executor = executor;
+        } else {
+            this.executor = Executors.newSingleThreadExecutor();
+        }
+    }
+
+    public void init() {
+        try {
+            TBKafkaAdmin admin = new TBKafkaAdmin();
+            CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1));
+            result.all().get();
+        } catch (Exception e) {
+            log.trace("Failed to create topic: {}", e.getMessage(), e);
+        }
+        tickTs = System.currentTimeMillis();
+        responseTemplate.subscribe();
+        executor.submit(() -> {
+            long nextCleanupMs = 0L;
+            while (!stopped) {
+                ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
+                responses.forEach(response -> {
+                    Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
+                    if (requestIdHeader == null) {
+                        log.error("[{}] Missing requestIdHeader", response);
+                    }
+                    UUID requestId = bytesToUuid(requestIdHeader.value());
+                    ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
+                    if (expectedResponse == null) {
+                        log.trace("[{}] Invalid or stale request", requestId);
+                    } else {
+                        try {
+                            expectedResponse.future.set(responseTemplate.decode(response));
+                        } catch (IOException e) {
+                            expectedResponse.future.setException(e);
+                        }
+                    }
+                });
+                tickTs = System.currentTimeMillis();
+                tickSize = pendingRequests.size();
+                if (nextCleanupMs < tickTs) {
+                    //cleanup;
+                    pendingRequests.entrySet().forEach(kv -> {
+                        if (kv.getValue().expTime < tickTs) {
+                            ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
+                            if (staleRequest != null) {
+                                staleRequest.future.setException(new TimeoutException());
+                            }
+                        }
+                    });
+                    nextCleanupMs = tickTs + maxRequestTimeout;
+                }
+            }
+        });
+    }
+
+    public void stop() {
+        stopped = true;
+    }
+
+    public ListenableFuture<Response> post(String key, Request request) {
+        if (tickSize > maxPendingRequests) {
+            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
+        }
+        UUID requestId = UUID.randomUUID();
+        List<Header> headers = new ArrayList<>(2);
+        headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
+        headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
+        SettableFuture<Response> future = SettableFuture.create();
+        pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
+        requestTemplate.send(key, request, headers);
+        return future;
+    }
+
+    private byte[] uuidToBytes(UUID uuid) {
+        ByteBuffer buf = ByteBuffer.allocate(16);
+        buf.putLong(uuid.getMostSignificantBits());
+        buf.putLong(uuid.getLeastSignificantBits());
+        return buf.array();
+    }
+
+    private static UUID bytesToUuid(byte[] bytes) {
+        ByteBuffer bb = ByteBuffer.wrap(bytes);
+        long firstLong = bb.getLong();
+        long secondLong = bb.getLong();
+        return new UUID(firstLong, secondLong);
+    }
+
+    private byte[] stringToBytes(String string) {
+        return string.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private static class ResponseMetaData<T> {
+        private final long expTime;
+        private final SettableFuture<T> future;
+
+        ResponseMetaData(long ts, SettableFuture<T> future) {
+            this.expTime = ts;
+            this.future = future;
+        }
+    }
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java
new file mode 100644
index 0000000..757902f
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Slf4j
+@ConditionalOnProperty(prefix = "kafka", value = "enabled", havingValue = "true", matchIfMissing = false)
+@Component
+public class TbKafkaSettings {
+
+    public static final String REQUEST_ID_HEADER = "requestId";
+    public static final String RESPONSE_TOPIC_HEADER = "responseTopic";
+
+
+    @Value("${kafka.bootstrap.server}")
+    private String servers;
+
+    @Value("${kafka.acks}")
+    private String acks;
+
+    @Value("${kafka.retries}")
+    private int retries;
+
+    @Value("${kafka.batch.size}")
+    private long batchSize;
+
+    @Value("${kafka.linger.ms}")
+    private long lingerMs;
+
+    @Value("${kafka.buffer.memory}")
+    private long bufferMemory;
+
+    @Value("${kafka.other:null}")
+    private List<TbKafkaProperty> other;
+
+    public Properties toProps() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+        props.put(ProducerConfig.ACKS_CONFIG, acks);
+        props.put(ProducerConfig.RETRIES_CONFIG, retries);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+        if(other != null){
+            other.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
+        }
+        return props;
+    }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbRuleEngineEmulator.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbRuleEngineEmulator.java
new file mode 100644
index 0000000..ecac2e6
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbRuleEngineEmulator.java
@@ -0,0 +1,114 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+@Slf4j
+public class TbRuleEngineEmulator {
+//
+//    public static void main(String[] args) throws InterruptedException, ExecutionException {
+//        ConcurrentMap<String, String> pendingRequestsMap = new ConcurrentHashMap<>();
+//
+//        ExecutorService executorService = Executors.newCachedThreadPool();
+//
+//        String responseTopic = "server" + Math.abs((int) (5000.0 * Math.random()));
+//        try {
+//            TBKafkaAdmin admin = new TBKafkaAdmin();
+//            CreateTopicsResult result = admin.createTopic(new NewTopic(responseTopic, 1, (short) 1));
+//            result.all().get();
+//        } catch (Exception e) {
+//            log.warn("Failed to create topic: {}", e.getMessage(), e);
+//        }
+//
+//        List<Header> headers = Collections.singletonList(new RecordHeader("responseTopic", responseTopic.getBytes(StandardCharsets.UTF_8)));
+//
+//        TBKafkaConsumerTemplate responseConsumer = new TBKafkaConsumerTemplate();
+//        TBKafkaProducerTemplate requestProducer = new TBKafkaProducerTemplate();
+//
+//        LongAdder requestCounter = new LongAdder();
+//        LongAdder responseCounter = new LongAdder();
+//
+//        responseConsumer.subscribe(responseTopic);
+//        executorService.submit((Runnable) () -> {
+//            while (true) {
+//                ConsumerRecords<String, String> responses = responseConsumer.poll(100);
+//                responses.forEach(response -> {
+//                    String expectedResponse = pendingRequestsMap.remove(response.key());
+//                    if (expectedResponse == null) {
+//                        log.error("[{}] Invalid request", response.key());
+//                    } else if (!expectedResponse.equals(response.value())) {
+//                        log.error("[{}] Invalid response: {} instead of {}", response.key(), response.value(), expectedResponse);
+//                    }
+//                    responseCounter.add(1);
+//                });
+//            }
+//        });
+//
+//        executorService.submit((Runnable) () -> {
+//            int i = 0;
+//            while (true) {
+//                String requestId = UUID.randomUUID().toString();
+//                String expectedResponse = UUID.randomUUID().toString();
+//                pendingRequestsMap.put(requestId, expectedResponse);
+//                requestProducer.send(new ProducerRecord<>("requests", null, requestId, expectedResponse, headers));
+//                requestCounter.add(1);
+//                i++;
+//                if (i % 10000 == 0) {
+//                    try {
+//                        Thread.sleep(500L);
+//                    } catch (InterruptedException e) {
+//                        e.printStackTrace();
+//                    }
+//                }
+//            }
+//        });
+//
+//        executorService.submit((Runnable) () -> {
+//            while (true) {
+//                log.warn("Requests: [{}], Responses: [{}]", requestCounter.longValue(), responseCounter.longValue());
+//                try {
+//                    Thread.sleep(1000L);
+//                } catch (InterruptedException e) {
+//                    e.printStackTrace();
+//                }
+//            }
+//        });
+//
+//        Thread.sleep(60000);
+//    }
+
+}
diff --git a/common/queue/src/main/resources/logback.xml b/common/queue/src/main/resources/logback.xml
new file mode 100644
index 0000000..dcfc930
--- /dev/null
+++ b/common/queue/src/main/resources/logback.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+    Copyright © 2016-2018 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<!DOCTYPE configuration>
+<configuration scan="true" scanPeriod="10 seconds">
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.thingsboard.server" level="INFO" />
+    <logger name="akka" level="INFO" />
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>
\ No newline at end of file

pom.xml 27(+11 -16)

diff --git a/pom.xml b/pom.xml
index aa54e72..159dad1 100755
--- a/pom.xml
+++ b/pom.xml
@@ -67,7 +67,6 @@
         <netty.version>4.1.22.Final</netty.version>
         <os-maven-plugin.version>1.5.0</os-maven-plugin.version>
         <rabbitmq.version>3.6.5</rabbitmq.version>
-        <kafka.version>0.9.0.0</kafka.version>
         <surfire.version>2.19.1</surfire.version>
         <jar-plugin.version>3.0.2</jar-plugin.version>
         <springfox-swagger.version>2.6.1</springfox-swagger.version>
@@ -82,6 +81,7 @@
         </sonar.exclusions>
         <elasticsearch.version>5.0.2</elasticsearch.version>
         <delight-nashorn-sandbox.version>0.1.14</delight-nashorn-sandbox.version>
+        <kafka.version>2.0.0</kafka.version>
     </properties>
 
     <modules>
@@ -371,6 +371,11 @@
                 <version>${project.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.thingsboard.common</groupId>
+                <artifactId>queue</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.thingsboard</groupId>
                 <artifactId>tools</artifactId>
                 <version>${project.version}</version>
@@ -415,6 +420,11 @@
                 <version>${spring-boot.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>${kafka.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.postgresql</groupId>
                 <artifactId>postgresql</artifactId>
                 <version>${postgresql.driver.version}</version>
@@ -691,21 +701,6 @@
                 <scope>provided</scope>
             </dependency>
             <dependency>
-                <groupId>org.apache.kafka</groupId>
-                <artifactId>kafka_2.10</artifactId>
-                <version>${kafka.version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>slf4j-log4j12</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>log4j</groupId>
-                        <artifactId>log4j</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-            <dependency>
                 <groupId>org.eclipse.paho</groupId>
                 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                 <version>${paho.client.version}</version>
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index e52e44b..39f877c 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -78,7 +78,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka-clients</artifactId>
         </dependency>
         <dependency>
             <groupId>com.amazonaws</groupId>