thingsboard-memoizeit
Changes
application/pom.xml 4(+4 -0)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java 2(+2 -0)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java 5(+5 -0)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java 10(+9 -1)
application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java 102(+102 -0)
application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java 195(+22 -173)
application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java 158(+149 -9)
application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java 29(+29 -0)
application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java 32(+32 -0)
application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java 4(+2 -2)
application/src/main/proto/jsinvoke.proto 62(+53 -9)
application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java 4(+2 -2)
common/pom.xml 3(+2 -1)
common/queue/pom.xml 95(+95 -0)
common/queue/src/main/resources/logback.xml 35(+35 -0)
pom.xml 27(+11 -16)
Details
application/pom.xml 4(+4 -0)
diff --git a/application/pom.xml b/application/pom.xml
index 3d7cbe1..eadcf7b 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -77,6 +77,10 @@
<artifactId>dao</artifactId>
</dependency>
<dependency>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>queue</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.thingsboard</groupId>
<artifactId>dao</artifactId>
<type>test-jar</type>
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
index f9caafa..04c4ced 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
@@ -26,6 +26,8 @@ public interface DiscoveryService {
void unpublishCurrentServer();
+ String getNodeId();
+
ServerInstance getCurrentServer();
List<ServerInstance> getOtherServers();
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
index fd91643..64d1456 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
@@ -43,6 +43,11 @@ public class DummyDiscoveryService implements DiscoveryService {
}
@Override
+ public String getNodeId() {
+ return null;
+ }
+
+ @Override
public void publishCurrentServer() {
//Do nothing
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 14196a6..b4829f2 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.service.cluster.discovery;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -93,11 +94,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
private CuratorFramework client;
private PathChildrenCache cache;
private String nodePath;
-
+ //TODO: make persistent?
+ private String nodeId;
@PostConstruct
public void init() {
log.info("Initializing...");
+ this.nodeId = RandomStringUtils.randomAlphabetic(10);
Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
@@ -181,6 +184,11 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
@Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
public ServerInstance getCurrentServer() {
return serverInstance.getSelf();
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java
new file mode 100644
index 0000000..461b894
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Created by ashvayka on 26.09.18.
+ */
+@Slf4j
+public abstract class AbstractJsInvokeService implements JsInvokeService {
+
+ protected Map<UUID, String> scriptIdToNameMap = new ConcurrentHashMap<>();
+ protected Map<UUID, AtomicInteger> blackListedFunctions = new ConcurrentHashMap<>();
+
+ @Override
+ public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
+ UUID scriptId = UUID.randomUUID();
+ String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
+ String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
+ return doEval(scriptId, functionName, jsScript);
+ }
+
+ @Override
+ public ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args) {
+ String functionName = scriptIdToNameMap.get(scriptId);
+ if (functionName == null) {
+ return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
+ }
+ if (!isBlackListed(scriptId)) {
+ return doInvokeFunction(scriptId, functionName, args);
+ } else {
+ return Futures.immediateFailedFuture(
+ new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!"));
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> release(UUID scriptId) {
+ String functionName = scriptIdToNameMap.get(scriptId);
+ if (functionName != null) {
+ try {
+ scriptIdToNameMap.remove(scriptId);
+ blackListedFunctions.remove(scriptId);
+ doRelease(scriptId, functionName);
+ } catch (Exception e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+ return Futures.immediateFuture(null);
+ }
+
+ protected abstract ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody);
+
+ protected abstract ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args);
+
+ protected abstract void doRelease(UUID scriptId, String functionName) throws Exception;
+
+ protected abstract int getMaxErrors();
+
+ protected void onScriptExecutionError(UUID scriptId) {
+ blackListedFunctions.computeIfAbsent(scriptId, key -> new AtomicInteger(0)).incrementAndGet();
+ }
+
+ private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
+ switch (scriptType) {
+ case RULE_NODE_SCRIPT:
+ return RuleNodeScriptFactory.generateRuleNodeScript(functionName, scriptBody, argNames);
+ default:
+ throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
+ }
+ }
+
+ private boolean isBlackListed(UUID scriptId) {
+ if (blackListedFunctions.containsKey(scriptId)) {
+ AtomicInteger errorCount = blackListedFunctions.get(scriptId);
+ return errorCount.get() >= getMaxErrors();
+ } else {
+ return false;
+ }
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java
index 4acef05..7c8eb2a 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java
@@ -20,38 +20,24 @@ import com.google.common.util.concurrent.ListenableFuture;
import delight.nashornsandbox.NashornSandbox;
import delight.nashornsandbox.NashornSandboxes;
import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.thingsboard.server.common.data.id.EntityId;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.script.Invocable;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
-import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
-public abstract class AbstractNashornJsInvokeService implements JsInvokeService {
+public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeService {
private NashornSandbox sandbox;
private ScriptEngine engine;
private ExecutorService monitorExecutorService;
- private final Map<UUID, String> functionsMap = new ConcurrentHashMap<>();
- private final Map<BlackListKey, BlackListInfo> blackListedFunctions = new ConcurrentHashMap<>();
-
- private final Map<String, ScriptInfo> scriptKeyToInfo = new ConcurrentHashMap<>();
- private final Map<UUID, ScriptInfo> scriptIdToInfo = new ConcurrentHashMap<>();
-
@PostConstruct
public void init() {
if (useJsSandbox()) {
@@ -80,181 +66,44 @@ public abstract class AbstractNashornJsInvokeService implements JsInvokeService
protected abstract long getMaxCpuTime();
- protected abstract int getMaxErrors();
-
@Override
- public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
- ScriptInfo scriptInfo = deduplicate(scriptType, scriptBody);
- UUID scriptId = scriptInfo.getId();
- AtomicInteger duplicateCount = scriptInfo.getCount();
-
- synchronized (scriptInfo.getLock()) {
- if (duplicateCount.compareAndSet(0, 1)) {
- try {
- evaluate(scriptId, scriptType, scriptBody, argNames);
- } catch (Exception e) {
- duplicateCount.decrementAndGet();
- log.warn("Failed to compile JS script: {}", e.getMessage(), e);
- return Futures.immediateFailedFuture(e);
- }
+ protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String jsScript) {
+ try {
+ if (useJsSandbox()) {
+ sandbox.eval(jsScript);
} else {
- duplicateCount.incrementAndGet();
+ engine.eval(jsScript);
}
+ scriptIdToNameMap.put(scriptId, functionName);
+ } catch (Exception e) {
+ log.warn("Failed to compile JS script: {}", e.getMessage(), e);
+ return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(scriptId);
}
- private void evaluate(UUID scriptId, JsScriptType scriptType, String scriptBody, String... argNames) throws ScriptException {
- String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
- String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
- if (useJsSandbox()) {
- sandbox.eval(jsScript);
- } else {
- engine.eval(jsScript);
- }
- functionsMap.put(scriptId, functionName);
- }
-
@Override
- public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
- String functionName = functionsMap.get(scriptId);
- if (functionName == null) {
- String message = "No compiled script found for scriptId: [" + scriptId + "]!";
- log.warn(message);
- return Futures.immediateFailedFuture(new RuntimeException(message));
- }
-
- BlackListInfo blackListInfo = blackListedFunctions.get(new BlackListKey(scriptId, entityId));
- if (blackListInfo != null && blackListInfo.getCount() >= getMaxErrors()) {
- RuntimeException throwable = new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!", blackListInfo.getCause());
- throwable.printStackTrace();
- return Futures.immediateFailedFuture(throwable);
- }
-
+ protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
try {
- return invoke(functionName, args);
+ Object result;
+ if (useJsSandbox()) {
+ result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
+ } else {
+ result = ((Invocable) engine).invokeFunction(functionName, args);
+ }
+ return Futures.immediateFuture(result);
} catch (Exception e) {
- BlackListKey blackListKey = new BlackListKey(scriptId, entityId);
- blackListedFunctions.computeIfAbsent(blackListKey, key -> new BlackListInfo()).incrementWithReason(e);
+ onScriptExecutionError(scriptId);
return Futures.immediateFailedFuture(e);
}
}
- private ListenableFuture<Object> invoke(String functionName, Object... args) throws ScriptException, NoSuchMethodException {
- Object result;
+ protected void doRelease(UUID scriptId, String functionName) throws ScriptException {
if (useJsSandbox()) {
- result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
+ sandbox.eval(functionName + " = undefined;");
} else {
- result = ((Invocable) engine).invokeFunction(functionName, args);
- }
- return Futures.immediateFuture(result);
- }
-
- @Override
- public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
- ScriptInfo scriptInfo = scriptIdToInfo.get(scriptId);
- if (scriptInfo == null) {
- log.warn("Script release called for not existing script id [{}]", scriptId);
- return Futures.immediateFuture(null);
- }
-
- synchronized (scriptInfo.getLock()) {
- int remainingDuplicates = scriptInfo.getCount().decrementAndGet();
- if (remainingDuplicates > 0) {
- return Futures.immediateFuture(null);
- }
-
- String functionName = functionsMap.get(scriptId);
- if (functionName != null) {
- try {
- if (useJsSandbox()) {
- sandbox.eval(functionName + " = undefined;");
- } else {
- engine.eval(functionName + " = undefined;");
- }
- functionsMap.remove(scriptId);
- blackListedFunctions.remove(new BlackListKey(scriptId, entityId));
- } catch (ScriptException e) {
- log.error("Could not release script [{}] [{}]", scriptId, remainingDuplicates);
- return Futures.immediateFailedFuture(e);
- }
- } else {
- log.warn("Function name do not exist for script [{}] [{}]", scriptId, remainingDuplicates);
- }
+ engine.eval(functionName + " = undefined;");
}
- return Futures.immediateFuture(null);
- }
-
-
- private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
- switch (scriptType) {
- case RULE_NODE_SCRIPT:
- return RuleNodeScriptFactory.generateRuleNodeScript(functionName, scriptBody, argNames);
- default:
- throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
- }
- }
-
- private ScriptInfo deduplicate(JsScriptType scriptType, String scriptBody) {
- ScriptInfo meta = ScriptInfo.preInit();
- String key = deduplicateKey(scriptType, scriptBody);
- ScriptInfo latestMeta = scriptKeyToInfo.computeIfAbsent(key, i -> meta);
- return scriptIdToInfo.computeIfAbsent(latestMeta.getId(), i -> latestMeta);
}
- private String deduplicateKey(JsScriptType scriptType, String scriptBody) {
- return scriptType + "_" + scriptBody;
- }
-
- @Getter
- private static class ScriptInfo {
- private final UUID id;
- private final Object lock;
- private final AtomicInteger count;
-
- ScriptInfo(UUID id, Object lock, AtomicInteger count) {
- this.id = id;
- this.lock = lock;
- this.count = count;
- }
-
- static ScriptInfo preInit() {
- UUID preId = UUID.randomUUID();
- AtomicInteger preCount = new AtomicInteger();
- Object preLock = new Object();
- return new ScriptInfo(preId, preLock, preCount);
- }
- }
-
- @EqualsAndHashCode
- @Getter
- @RequiredArgsConstructor
- private static class BlackListKey {
- private final UUID scriptId;
- private final EntityId entityId;
-
- }
-
- @Data
- private static class BlackListInfo {
- private final AtomicInteger count;
- private Exception ex;
-
- BlackListInfo() {
- this.count = new AtomicInteger(0);
- }
-
- void incrementWithReason(Exception e) {
- count.incrementAndGet();
- ex = e;
- }
-
- int getCount() {
- return count.get();
- }
-
- Exception getCause() {
- return ex;
- }
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeRequest.java b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeRequest.java
new file mode 100644
index 0000000..1041a26
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeRequest.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public class JsInvokeRequest {
+
+ private String scriptId;
+ private String scriptBody;
+ private List<String> args;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeResponse.java b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeResponse.java
new file mode 100644
index 0000000..1638de8
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeResponse.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public class JsInvokeResponse {
+
+ private String scriptId;
+ private String scriptBody;
+ private List<String> args;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java
index 1da9859..031a5ee 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java
@@ -25,8 +25,8 @@ public interface JsInvokeService {
ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames);
- ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args);
+ ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args);
- ListenableFuture<Void> release(UUID scriptId, EntityId entityId);
+ ListenableFuture<Void> release(UUID scriptId);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index 3a85cc5..da8d62a 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -15,32 +15,172 @@
*/
package org.thingsboard.server.service.script;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
-import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
@Slf4j
-@ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "local", matchIfMissing = true)
+@ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true)
@Service
-public class RemoteJsInvokeService implements JsInvokeService {
+public class RemoteJsInvokeService extends AbstractJsInvokeService {
+
+ @Autowired
+ private DiscoveryService discoveryService;
+
+ @Autowired
+ private TbKafkaSettings kafkaSettings;
+
+ @Value("${js.remote.use_js_sandbox}")
+ private boolean useJsSandbox;
+
+ @Value("${js.remote.request_topic}")
+ private String requestTopic;
+
+ @Value("${js.remote.response_topic_prefix}")
+ private String responseTopicPrefix;
+
+ @Value("${js.remote.max_pending_requests}")
+ private long maxPendingRequests;
+
+ @Value("${js.remote.max_requests_timeout}")
+ private long maxRequestsTimeout;
+
+ @Value("${js.remote.response_poll_duration}")
+ private long responsePollDuration;
+
+ @Getter
+ @Value("${js.remote.max_errors}")
+ private int maxErrors;
+
+ private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate;
+ protected Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>();
+
+ @PostConstruct
+ public void init() {
+ TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<JsInvokeProtos.RemoteJsRequest> requestBuilder = TBKafkaProducerTemplate.builder();
+ requestBuilder.settings(kafkaSettings);
+ requestBuilder.defaultTopic(requestTopic);
+ requestBuilder.encoder(new RemoteJsRequestEncoder());
+
+ TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder();
+ responseBuilder.settings(kafkaSettings);
+ responseBuilder.topic(responseTopicPrefix + "." + discoveryService.getNodeId());
+ responseBuilder.clientId(discoveryService.getNodeId());
+ responseBuilder.groupId("rule-engine-node");
+ responseBuilder.autoCommit(true);
+ responseBuilder.autoCommitIntervalMs(100);
+ responseBuilder.decoder(new RemoteJsResponseDecoder());
+
+ TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
+ <JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> builder = TbKafkaRequestTemplate.builder();
+ builder.requestTemplate(requestBuilder.build());
+ builder.responseTemplate(responseBuilder.build());
+ builder.maxPendingRequests(maxPendingRequests);
+ builder.maxRequestTimeout(maxRequestsTimeout);
+ builder.pollInterval(responsePollDuration);
+ kafkaTemplate = builder.build();
+ }
+
+ @PreDestroy
+ public void destroy(){
+ if(kafkaTemplate != null){
+ kafkaTemplate.stop();
+ }
+ }
@Override
- public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
- return null;
+ protected ListenableFuture<UUID> doEval(UUID scriptId, String functionName, String scriptBody) {
+ JsInvokeProtos.JsCompileRequest jsRequest = JsInvokeProtos.JsCompileRequest.newBuilder()
+ .setScriptIdMSB(scriptId.getMostSignificantBits())
+ .setScriptIdLSB(scriptId.getLeastSignificantBits())
+ .setFunctionName(functionName)
+ .setScriptBody(scriptBody).build();
+
+ JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
+ .setCompileRequest(jsRequest)
+ .build();
+
+ ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
+ return Futures.transform(future, response -> {
+ JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();
+ UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
+ if (compilationResult.getSuccess()) {
+ scriptIdToNameMap.put(scriptId, functionName);
+ scriptIdToBodysMap.put(scriptId, scriptBody);
+ return compiledScriptId;
+ } else {
+ log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
+ throw new RuntimeException(compilationResult.getErrorCode().name());
+ }
+ });
}
@Override
- public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
- return null;
+ protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
+ String scriptBody = scriptIdToBodysMap.get(scriptId);
+ if (scriptBody == null) {
+ return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
+ }
+ JsInvokeProtos.JsInvokeRequest jsRequest = JsInvokeProtos.JsInvokeRequest.newBuilder()
+ .setScriptIdMSB(scriptId.getMostSignificantBits())
+ .setScriptIdLSB(scriptId.getLeastSignificantBits())
+ .setFunctionName(functionName)
+ .setScriptBody(scriptIdToBodysMap.get(scriptId)).build();
+
+ JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
+ .setInvokeRequest(jsRequest)
+ .build();
+
+ ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
+ return Futures.transform(future, response -> {
+ JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse();
+ if (invokeResult.getSuccess()) {
+ return invokeResult.getResult();
+ } else {
+ log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
+ throw new RuntimeException(invokeResult.getErrorCode().name());
+ }
+ });
}
@Override
- public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
- return null;
+ protected void doRelease(UUID scriptId, String functionName) throws Exception {
+ JsInvokeProtos.JsReleaseRequest jsRequest = JsInvokeProtos.JsReleaseRequest.newBuilder()
+ .setScriptIdMSB(scriptId.getMostSignificantBits())
+ .setScriptIdLSB(scriptId.getLeastSignificantBits())
+ .setFunctionName(functionName).build();
+
+ JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
+ .setReleaseRequest(jsRequest)
+ .build();
+
+ ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
+ JsInvokeProtos.RemoteJsResponse response = future.get();
+
+ JsInvokeProtos.JsReleaseResponse compilationResult = response.getReleaseResponse();
+ UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
+ if (compilationResult.getSuccess()) {
+ scriptIdToBodysMap.remove(scriptId);
+ } else {
+ log.debug("[{}] Failed to release script due", compiledScriptId);
+ }
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java
new file mode 100644
index 0000000..7d5d547
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public class RemoteJsRequestEncoder implements TbKafkaEncoder<JsInvokeProtos.RemoteJsRequest> {
+ @Override
+ public byte[] encode(JsInvokeProtos.RemoteJsRequest value) {
+ return value.toByteArray();
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java
new file mode 100644
index 0000000..1975988
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.script;
+
+import org.thingsboard.server.gen.js.JsInvokeProtos;
+import org.thingsboard.server.kafka.TbKafkaDecoder;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public class RemoteJsResponseDecoder implements TbKafkaDecoder<JsInvokeProtos.RemoteJsResponse> {
+
+ @Override
+ public JsInvokeProtos.RemoteJsResponse decode(byte[] data) throws IOException {
+ return JsInvokeProtos.RemoteJsResponse.parseFrom(data);
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
index c001a1a..021e6da 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
@@ -165,7 +165,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
private JsonNode executeScript(TbMsg msg) throws ScriptException {
try {
String[] inArgs = prepareArgs(msg);
- String eval = sandboxService.invokeFunction(this.scriptId, this.entityId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
+ String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
return mapper.readTree(eval);
} catch (ExecutionException e) {
if (e.getCause() instanceof ScriptException) {
@@ -179,6 +179,6 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
}
public void destroy() {
- sandboxService.release(this.scriptId, this.entityId);
+ sandboxService.release(this.scriptId);
}
}
application/src/main/proto/jsinvoke.proto 62(+53 -9)
diff --git a/application/src/main/proto/jsinvoke.proto b/application/src/main/proto/jsinvoke.proto
index c465309..475c7bd 100644
--- a/application/src/main/proto/jsinvoke.proto
+++ b/application/src/main/proto/jsinvoke.proto
@@ -14,24 +14,68 @@
* limitations under the License.
*/
syntax = "proto3";
-package cluster;
+package js;
option java_package = "org.thingsboard.server.gen.js";
option java_outer_classname = "JsInvokeProtos";
-service JsInvokeRpcService {
- rpc handleMsgs(stream JsInvokeRequest) returns (stream JsInvokeResponse) {}
+enum JsInvokeErrorCode{
+ COMPILATION_ERROR = 0;
+ RUNTIME_ERROR = 1;
+ CPU_USAGE_ERROR = 2;
+}
+
+message RemoteJsRequest {
+ JsCompileRequest compileRequest = 1;
+ JsInvokeRequest invokeRequest = 2;
+ JsReleaseRequest releaseRequest = 3;
+}
+
+message RemoteJsResponse {
+ JsCompileResponse compileResponse = 1;
+ JsInvokeResponse invokeResponse = 2;
+ JsReleaseResponse releaseResponse = 3;
+}
+
+message JsCompileRequest {
+ int64 scriptIdMSB = 1;
+ int64 scriptIdLSB = 2;
+ string functionName = 3;
+ string scriptBody = 4;
+}
+
+message JsReleaseRequest {
+ int64 scriptIdMSB = 1;
+ int64 scriptIdLSB = 2;
+ string functionName = 3;
+}
+
+message JsReleaseResponse {
+ bool success = 1;
+ int64 scriptIdMSB = 2;
+ int64 scriptIdLSB = 3;
+}
+
+message JsCompileResponse {
+ bool success = 1;
+ int64 scriptIdMSB = 2;
+ int64 scriptIdLSB = 3;
+ JsInvokeErrorCode errorCode = 4;
+ string errorDetails = 5;
}
message JsInvokeRequest {
- string scriptId = 1;
- string scriptBody = 2;
- repeated string args = 3;
+ int64 scriptIdMSB = 1;
+ int64 scriptIdLSB = 2;
+ string functionName = 3;
+ string scriptBody = 4;
+ repeated string args = 5;
}
message JsInvokeResponse {
- string result = 1;
- string errorName = 2;
- string errorDetails = 3;
+ bool success = 1;
+ string result = 2;
+ JsInvokeErrorCode errorCode = 3;
+ string errorDetails = 4;
}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 6851b58..eb40ac5 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -405,8 +405,17 @@ state:
defaultInactivityTimeoutInSec: 10
defaultStateCheckIntervalInSec: 10
+kafka:
+ enabled: true
+ bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
+ acks: "${TB_KAFKA_ACKS:all}"
+ retries: "${TB_KAFKA_RETRIES:1}"
+ batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
+ linger.ms: "${TB_KAFKA_LINGER_MS:1}"
+ buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
+
js:
- evaluator: "${JS_EVALUATOR:external}" # local/external
+ evaluator: "${JS_EVALUATOR:local}" # local/external
# Built-in JVM JavaScript environment properties
local:
# Use Sandboxed (secured) JVM JavaScript environment
@@ -421,3 +430,15 @@ js:
remote:
# Use Sandboxed (secured) JVM JavaScript environment
use_js_sandbox: "${USE_REMOTE_JS_SANDBOX:true}"
+ # JS Eval request topic
+ request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
+ # JS Eval responses topic prefix that is combined with node id
+ response_topic_prefix: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.responses}"
+ # JS Eval max pending requests
+ max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
+ # JS Eval max request timeout
+ max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:20000}"
+ # JS response poll interval
+ response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
+ # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
+ max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
diff --git a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
index 730121f..fe0f381 100644
--- a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
@@ -236,8 +236,8 @@ public class RuleNodeJsScriptEngineTest {
startLatch.await();
UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get();
scriptIds.put(scriptId, new Object());
- jsSandboxService.invokeFunction(scriptId, ruleNodeId, "{}", "{}", "TEXT").get();
- jsSandboxService.release(scriptId, ruleNodeId).get();
+ jsSandboxService.invokeFunction(scriptId, "{}", "{}", "TEXT").get();
+ jsSandboxService.release(scriptId).get();
}
} catch (Throwable th) {
failedCount.incrementAndGet();
common/pom.xml 3(+2 -1)
diff --git a/common/pom.xml b/common/pom.xml
index fbff206..9cac6a7 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -37,7 +37,8 @@
<modules>
<module>data</module>
<module>message</module>
- <module>transport</module>
+ <module>transport</module>
+ <module>queue</module>
</modules>
</project>
common/queue/pom.xml 95(+95 -0)
diff --git a/common/queue/pom.xml b/common/queue/pom.xml
new file mode 100644
index 0000000..eb72264
--- /dev/null
+++ b/common/queue/pom.xml
@@ -0,0 +1,95 @@
+<!--
+
+ Copyright © 2016-2018 The Thingsboard Authors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.thingsboard</groupId>
+ <version>2.1.1-SNAPSHOT</version>
+ <artifactId>common</artifactId>
+ </parent>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>queue</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Thingsboard Server Queue components</name>
+ <url>https://thingsboard.io</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <main.dir>${basedir}/../..</main.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>data</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>message</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context-support</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-autoconfigure</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbJsEvaluator.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbJsEvaluator.java
new file mode 100644
index 0000000..92e46b5
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbJsEvaluator.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+@Slf4j
+public class TbJsEvaluator {
+
+// public static void main(String[] args) {
+// ExecutorService executorService = Executors.newCachedThreadPool();
+//
+// TBKafkaConsumerTemplate requestConsumer = new TBKafkaConsumerTemplate();
+// requestConsumer.subscribe("requests");
+//
+// LongAdder responseCounter = new LongAdder();
+// TBKafkaProducerTemplate responseProducer = new TBKafkaProducerTemplate();
+// executorService.submit((Runnable) () -> {
+// while (true) {
+// ConsumerRecords<String, String> requests = requestConsumer.poll(100);
+// requests.forEach(request -> {
+// Header header = request.headers().lastHeader("responseTopic");
+// ProducerRecord<String, String> response = new ProducerRecord<>(new String(header.value(), StandardCharsets.UTF_8),
+// request.key(), request.value());
+// responseProducer.send(response);
+// responseCounter.add(1);
+// });
+// }
+// });
+//
+// executorService.submit((Runnable) () -> {
+// while (true) {
+// log.warn("Requests: [{}], Responses: [{}]", responseCounter.longValue(), responseCounter.longValue());
+// try {
+// Thread.sleep(1000L);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+// }
+// });
+//
+// }
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java
new file mode 100644
index 0000000..7ea40cd
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaAdmin.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+public class TBKafkaAdmin {
+
+ AdminClient client;
+
+ public TBKafkaAdmin() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("group.id", "test");
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ client = AdminClient.create(props);
+ }
+
+ public CreateTopicsResult createTopic(NewTopic topic){
+ return client.createTopics(Collections.singletonList(topic));
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
new file mode 100644
index 0000000..1b1bb5b
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+public class TBKafkaConsumerTemplate<T> {
+
+ private final KafkaConsumer<String, byte[]> consumer;
+ private final TbKafkaDecoder<T> decoder;
+ @Getter
+ private final String topic;
+
+ @Builder
+ private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
+ String clientId, String groupId, String topic,
+ boolean autoCommit, long autoCommitIntervalMs) {
+ Properties props = settings.toProps();
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ this.consumer = new KafkaConsumer<>(props);
+ this.decoder = decoder;
+ this.topic = topic;
+ }
+
+ public void subscribe() {
+ consumer.subscribe(Collections.singletonList(topic));
+ }
+
+ public void unsubscribe() {
+ consumer.unsubscribe();
+ }
+
+ public ConsumerRecords<String, byte[]> poll(Duration duration) {
+ return consumer.poll(duration);
+ }
+
+ public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
+ return decoder.decode(record.value());
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java
new file mode 100644
index 0000000..746dfda
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaDecoder.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public interface TbKafkaDecoder<T> {
+
+ T decode(byte[] data) throws IOException;
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEncoder.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEncoder.java
new file mode 100644
index 0000000..a3616cb
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEncoder.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public interface TbKafkaEncoder<T> {
+
+ byte[] encode(T value);
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaPartitioner.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaPartitioner.java
new file mode 100644
index 0000000..2d6e0d2
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaPartitioner.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+public interface TbKafkaPartitioner<T> extends Partitioner {
+
+ int partition(String topic, String key, T value, byte[] encodedValue, List<PartitionInfo> partitions);
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
new file mode 100644
index 0000000..b39e63e
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.header.Header;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+public class TBKafkaProducerTemplate<T> {
+
+ private final KafkaProducer<String, byte[]> producer;
+ private final TbKafkaEncoder<T> encoder;
+ private final TbKafkaPartitioner<T> partitioner;
+ private final List<PartitionInfo> partitionInfoList;
+ @Getter
+ private final String defaultTopic;
+
+ @Builder
+ private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaPartitioner<T> partitioner, String defaultTopic) {
+ Properties props = settings.toProps();
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ this.producer = new KafkaProducer<>(props);
+ //Maybe this should not be cached, but we don't plan to change size of partitions
+ this.partitionInfoList = producer.partitionsFor(defaultTopic);
+ this.encoder = encoder;
+ this.partitioner = partitioner;
+ this.defaultTopic = defaultTopic;
+ }
+
+ public Future<RecordMetadata> send(String key, T value) {
+ return send(key, value, null, null);
+ }
+
+ public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers) {
+ return send(key, value, null, headers);
+ }
+
+ public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers) {
+ return send(this.defaultTopic, key, value, timestamp, headers);
+ }
+
+ public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers) {
+ byte[] data = encoder.encode(value);
+ ProducerRecord<String, byte[]> record;
+ Integer partition = getPartition(topic, key, value, data);
+ record = new ProducerRecord<>(this.defaultTopic, partition, timestamp, key, data, headers);
+ return producer.send(record);
+ }
+
+ private Integer getPartition(String topic, String key, T value, byte[] data) {
+ if (partitioner == null) {
+ return null;
+ } else {
+ return partitioner.partition(this.defaultTopic, key, value, data, partitionInfoList);
+ }
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaProperty.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaProperty.java
new file mode 100644
index 0000000..784a5c5
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaProperty.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Data
+public class TbKafkaProperty {
+
+ private String key;
+ private String value;
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
new file mode 100644
index 0000000..be8c087
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Slf4j
+public class TbKafkaRequestTemplate<Request, Response> {
+
+ private final TBKafkaProducerTemplate<Request> requestTemplate;
+ private final TBKafkaConsumerTemplate<Response> responseTemplate;
+ private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests;
+ private final ExecutorService executor;
+ private final long maxRequestTimeout;
+ private final long maxPendingRequests;
+ private final long pollInterval;
+ private volatile long tickTs = 0L;
+ private volatile long tickSize = 0L;
+ private volatile boolean stopped = false;
+
+ @Builder
+ public TbKafkaRequestTemplate(TBKafkaProducerTemplate<Request> requestTemplate, TBKafkaConsumerTemplate<Response> responseTemplate,
+ long maxRequestTimeout,
+ long maxPendingRequests,
+ long pollInterval,
+ ExecutorService executor) {
+ this.requestTemplate = requestTemplate;
+ this.responseTemplate = responseTemplate;
+ this.pendingRequests = new ConcurrentHashMap<>();
+ this.maxRequestTimeout = maxRequestTimeout;
+ this.maxPendingRequests = maxPendingRequests;
+ this.pollInterval = pollInterval;
+ if (executor != null) {
+ this.executor = executor;
+ } else {
+ this.executor = Executors.newSingleThreadExecutor();
+ }
+ }
+
+ public void init() {
+ try {
+ TBKafkaAdmin admin = new TBKafkaAdmin();
+ CreateTopicsResult result = admin.createTopic(new NewTopic(responseTemplate.getTopic(), 1, (short) 1));
+ result.all().get();
+ } catch (Exception e) {
+ log.trace("Failed to create topic: {}", e.getMessage(), e);
+ }
+ tickTs = System.currentTimeMillis();
+ responseTemplate.subscribe();
+ executor.submit(() -> {
+ long nextCleanupMs = 0L;
+ while (!stopped) {
+ ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
+ responses.forEach(response -> {
+ Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
+ if (requestIdHeader == null) {
+ log.error("[{}] Missing requestIdHeader", response);
+ }
+ UUID requestId = bytesToUuid(requestIdHeader.value());
+ ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
+ if (expectedResponse == null) {
+ log.trace("[{}] Invalid or stale request", requestId);
+ } else {
+ try {
+ expectedResponse.future.set(responseTemplate.decode(response));
+ } catch (IOException e) {
+ expectedResponse.future.setException(e);
+ }
+ }
+ });
+ tickTs = System.currentTimeMillis();
+ tickSize = pendingRequests.size();
+ if (nextCleanupMs < tickTs) {
+ //cleanup;
+ pendingRequests.entrySet().forEach(kv -> {
+ if (kv.getValue().expTime < tickTs) {
+ ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
+ if (staleRequest != null) {
+ staleRequest.future.setException(new TimeoutException());
+ }
+ }
+ });
+ nextCleanupMs = tickTs + maxRequestTimeout;
+ }
+ }
+ });
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ public ListenableFuture<Response> post(String key, Request request) {
+ if (tickSize > maxPendingRequests) {
+ return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
+ }
+ UUID requestId = UUID.randomUUID();
+ List<Header> headers = new ArrayList<>(2);
+ headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
+ headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
+ SettableFuture<Response> future = SettableFuture.create();
+ pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
+ requestTemplate.send(key, request, headers);
+ return future;
+ }
+
+ private byte[] uuidToBytes(UUID uuid) {
+ ByteBuffer buf = ByteBuffer.allocate(16);
+ buf.putLong(uuid.getMostSignificantBits());
+ buf.putLong(uuid.getLeastSignificantBits());
+ return buf.array();
+ }
+
+ private static UUID bytesToUuid(byte[] bytes) {
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ long firstLong = bb.getLong();
+ long secondLong = bb.getLong();
+ return new UUID(firstLong, secondLong);
+ }
+
+ private byte[] stringToBytes(String string) {
+ return string.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static class ResponseMetaData<T> {
+ private final long expTime;
+ private final SettableFuture<T> future;
+
+ ResponseMetaData(long ts, SettableFuture<T> future) {
+ this.expTime = ts;
+ this.future = future;
+ }
+ }
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java
new file mode 100644
index 0000000..757902f
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Slf4j
+@ConditionalOnProperty(prefix = "kafka", value = "enabled", havingValue = "true", matchIfMissing = false)
+@Component
+public class TbKafkaSettings {
+
+ public static final String REQUEST_ID_HEADER = "requestId";
+ public static final String RESPONSE_TOPIC_HEADER = "responseTopic";
+
+
+ @Value("${kafka.bootstrap.server}")
+ private String servers;
+
+ @Value("${kafka.acks}")
+ private String acks;
+
+ @Value("${kafka.retries}")
+ private int retries;
+
+ @Value("${kafka.batch.size}")
+ private long batchSize;
+
+ @Value("${kafka.linger.ms}")
+ private long lingerMs;
+
+ @Value("${kafka.buffer.memory}")
+ private long bufferMemory;
+
+ @Value("${kafka.other:null}")
+ private List<TbKafkaProperty> other;
+
+ public Properties toProps() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+ props.put(ProducerConfig.ACKS_CONFIG, acks);
+ props.put(ProducerConfig.RETRIES_CONFIG, retries);
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+ props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+ if(other != null){
+ other.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
+ }
+ return props;
+ }
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbRuleEngineEmulator.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbRuleEngineEmulator.java
new file mode 100644
index 0000000..ecac2e6
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbRuleEngineEmulator.java
@@ -0,0 +1,114 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Created by ashvayka on 24.09.18.
+ */
+@Slf4j
+public class TbRuleEngineEmulator {
+//
+// public static void main(String[] args) throws InterruptedException, ExecutionException {
+// ConcurrentMap<String, String> pendingRequestsMap = new ConcurrentHashMap<>();
+//
+// ExecutorService executorService = Executors.newCachedThreadPool();
+//
+// String responseTopic = "server" + Math.abs((int) (5000.0 * Math.random()));
+// try {
+// TBKafkaAdmin admin = new TBKafkaAdmin();
+// CreateTopicsResult result = admin.createTopic(new NewTopic(responseTopic, 1, (short) 1));
+// result.all().get();
+// } catch (Exception e) {
+// log.warn("Failed to create topic: {}", e.getMessage(), e);
+// }
+//
+// List<Header> headers = Collections.singletonList(new RecordHeader("responseTopic", responseTopic.getBytes(StandardCharsets.UTF_8)));
+//
+// TBKafkaConsumerTemplate responseConsumer = new TBKafkaConsumerTemplate();
+// TBKafkaProducerTemplate requestProducer = new TBKafkaProducerTemplate();
+//
+// LongAdder requestCounter = new LongAdder();
+// LongAdder responseCounter = new LongAdder();
+//
+// responseConsumer.subscribe(responseTopic);
+// executorService.submit((Runnable) () -> {
+// while (true) {
+// ConsumerRecords<String, String> responses = responseConsumer.poll(100);
+// responses.forEach(response -> {
+// String expectedResponse = pendingRequestsMap.remove(response.key());
+// if (expectedResponse == null) {
+// log.error("[{}] Invalid request", response.key());
+// } else if (!expectedResponse.equals(response.value())) {
+// log.error("[{}] Invalid response: {} instead of {}", response.key(), response.value(), expectedResponse);
+// }
+// responseCounter.add(1);
+// });
+// }
+// });
+//
+// executorService.submit((Runnable) () -> {
+// int i = 0;
+// while (true) {
+// String requestId = UUID.randomUUID().toString();
+// String expectedResponse = UUID.randomUUID().toString();
+// pendingRequestsMap.put(requestId, expectedResponse);
+// requestProducer.send(new ProducerRecord<>("requests", null, requestId, expectedResponse, headers));
+// requestCounter.add(1);
+// i++;
+// if (i % 10000 == 0) {
+// try {
+// Thread.sleep(500L);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+// }
+// }
+// });
+//
+// executorService.submit((Runnable) () -> {
+// while (true) {
+// log.warn("Requests: [{}], Responses: [{}]", requestCounter.longValue(), responseCounter.longValue());
+// try {
+// Thread.sleep(1000L);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+// }
+// });
+//
+// Thread.sleep(60000);
+// }
+
+}
common/queue/src/main/resources/logback.xml 35(+35 -0)
diff --git a/common/queue/src/main/resources/logback.xml b/common/queue/src/main/resources/logback.xml
new file mode 100644
index 0000000..dcfc930
--- /dev/null
+++ b/common/queue/src/main/resources/logback.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Copyright © 2016-2018 The Thingsboard Authors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<!DOCTYPE configuration>
+<configuration scan="true" scanPeriod="10 seconds">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.thingsboard.server" level="INFO" />
+ <logger name="akka" level="INFO" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
\ No newline at end of file
pom.xml 27(+11 -16)
diff --git a/pom.xml b/pom.xml
index aa54e72..159dad1 100755
--- a/pom.xml
+++ b/pom.xml
@@ -67,7 +67,6 @@
<netty.version>4.1.22.Final</netty.version>
<os-maven-plugin.version>1.5.0</os-maven-plugin.version>
<rabbitmq.version>3.6.5</rabbitmq.version>
- <kafka.version>0.9.0.0</kafka.version>
<surfire.version>2.19.1</surfire.version>
<jar-plugin.version>3.0.2</jar-plugin.version>
<springfox-swagger.version>2.6.1</springfox-swagger.version>
@@ -82,6 +81,7 @@
</sonar.exclusions>
<elasticsearch.version>5.0.2</elasticsearch.version>
<delight-nashorn-sandbox.version>0.1.14</delight-nashorn-sandbox.version>
+ <kafka.version>2.0.0</kafka.version>
</properties>
<modules>
@@ -371,6 +371,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>queue</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.thingsboard</groupId>
<artifactId>tools</artifactId>
<version>${project.version}</version>
@@ -415,6 +420,11 @@
<version>${spring-boot.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.driver.version}</version>
@@ -691,21 +701,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${paho.client.version}</version>
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index e52e44b..39f877c 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -78,7 +78,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>