thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java 218(+145 -73)
application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java 13(+8 -5)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 7279347..d5260da 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -154,7 +154,7 @@ class DefaultTbContext implements TbContext {
@Override
public ScriptEngine createJsScriptEngine(String script, String... argNames) {
- return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), script, argNames);
+ return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), nodeCtx.getSelf().getId(), script, argNames);
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
index 86b8fda..82ab170 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
@@ -276,7 +276,7 @@ public class RuleChainController extends BaseController {
String errorText = "";
ScriptEngine engine = null;
try {
- engine = new RuleNodeJsScriptEngine(jsSandboxService, script, argNames);
+ engine = new RuleNodeJsScriptEngine(jsSandboxService, getCurrentUser().getId(), script, argNames);
TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L);
switch (scriptType) {
case "update":
diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java
index 724f652..1d89c9d 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.Futures;
@@ -21,8 +20,12 @@ import com.google.common.util.concurrent.ListenableFuture;
import delight.nashornsandbox.NashornSandbox;
import delight.nashornsandbox.NashornSandboxes;
import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
+import org.thingsboard.server.common.data.id.EntityId;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -44,9 +47,10 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
private ExecutorService monitorExecutorService;
private final Map<UUID, String> functionsMap = new ConcurrentHashMap<>();
- private final Map<UUID,AtomicInteger> blackListedFunctions = new ConcurrentHashMap<>();
- private final Map<String, Pair<UUID, AtomicInteger>> scriptToId = new ConcurrentHashMap<>();
- private final Map<UUID, AtomicInteger> scriptIdToCount = new ConcurrentHashMap<>();
+ private final Map<BlackListKey, BlackListInfo> blackListedFunctions = new ConcurrentHashMap<>();
+
+ private final Map<String, ScriptInfo> scriptKeyToInfo = new ConcurrentHashMap<>();
+ private final Map<UUID, ScriptInfo> scriptIdToInfo = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
@@ -65,7 +69,7 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
@PreDestroy
public void stop() {
- if (monitorExecutorService != null) {
+ if (monitorExecutorService != null) {
monitorExecutorService.shutdownNow();
}
}
@@ -80,90 +84,107 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
@Override
public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
- Pair<UUID, AtomicInteger> deduplicated = deduplicate(scriptType, scriptBody);
- UUID scriptId = deduplicated.getLeft();
- AtomicInteger duplicateCount = deduplicated.getRight();
-
- if(duplicateCount.compareAndSet(0, 1)) {
- String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
- String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
- try {
- if (useJsSandbox()) {
- sandbox.eval(jsScript);
- } else {
- engine.eval(jsScript);
+ ScriptInfo scriptInfo = deduplicate(scriptType, scriptBody);
+ UUID scriptId = scriptInfo.getId();
+ AtomicInteger duplicateCount = scriptInfo.getCount();
+
+ synchronized (scriptInfo.getLock()) {
+ if (duplicateCount.compareAndSet(0, 1)) {
+ try {
+ evaluate(scriptId, scriptType, scriptBody, argNames);
+ } catch (Exception e) {
+ duplicateCount.decrementAndGet();
+ log.warn("Failed to compile JS script: {}", e.getMessage(), e);
+ return Futures.immediateFailedFuture(e);
}
- functionsMap.put(scriptId, functionName);
- } catch (Exception e) {
- duplicateCount.decrementAndGet();
- log.warn("Failed to compile JS script: {}", e.getMessage(), e);
- return Futures.immediateFailedFuture(e);
+ } else {
+ duplicateCount.incrementAndGet();
}
- } else {
- duplicateCount.incrementAndGet();
}
return Futures.immediateFuture(scriptId);
}
+ private void evaluate(UUID scriptId, JsScriptType scriptType, String scriptBody, String... argNames) throws ScriptException {
+ String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
+ String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
+ if (useJsSandbox()) {
+ sandbox.eval(jsScript);
+ } else {
+ engine.eval(jsScript);
+ }
+ functionsMap.put(scriptId, functionName);
+ }
+
@Override
- public ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args) {
+ public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
String functionName = functionsMap.get(scriptId);
if (functionName == null) {
- return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
- }
- if (!isBlackListed(scriptId)) {
- try {
- Object result;
- if (useJsSandbox()) {
- result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
- } else {
- result = ((Invocable)engine).invokeFunction(functionName, args);
- }
- return Futures.immediateFuture(result);
- } catch (Exception e) {
- blackListedFunctions.computeIfAbsent(scriptId, key -> new AtomicInteger(0)).incrementAndGet();
- return Futures.immediateFailedFuture(e);
- }
+ String message = "No compiled script found for scriptId: [" + scriptId + "]!";
+ log.warn(message);
+ return Futures.immediateFailedFuture(new RuntimeException(message));
+ }
+
+ BlackListInfo blackListInfo = blackListedFunctions.get(new BlackListKey(scriptId, entityId));
+ if (blackListInfo != null && blackListInfo.getCount() >= getMaxErrors()) {
+ RuntimeException throwable = new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!", blackListInfo.getCause());
+ throwable.printStackTrace();
+ return Futures.immediateFailedFuture(throwable);
+ }
+
+ try {
+ return invoke(functionName, args);
+ } catch (Exception e) {
+ BlackListKey blackListKey = new BlackListKey(scriptId, entityId);
+ blackListedFunctions.computeIfAbsent(blackListKey, key -> new BlackListInfo()).incrementWithReason(e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ private ListenableFuture<Object> invoke(String functionName, Object... args) throws ScriptException, NoSuchMethodException {
+ Object result;
+ if (useJsSandbox()) {
+ result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
} else {
- return Futures.immediateFailedFuture(
- new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!"));
+ result = ((Invocable) engine).invokeFunction(functionName, args);
}
+ return Futures.immediateFuture(result);
}
@Override
- public ListenableFuture<Void> release(UUID scriptId) {
- AtomicInteger count = scriptIdToCount.get(scriptId);
- if(count != null) {
- if(count.decrementAndGet() > 0) {
+ public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
+ ScriptInfo scriptInfo = scriptIdToInfo.get(scriptId);
+ if (scriptInfo == null) {
+ log.warn("Script release called for not existing script id [{}]", scriptId);
+ return Futures.immediateFuture(null);
+ }
+
+ synchronized (scriptInfo.getLock()) {
+ int remainingDuplicates = scriptInfo.getCount().decrementAndGet();
+ if (remainingDuplicates > 0) {
return Futures.immediateFuture(null);
}
- }
- String functionName = functionsMap.get(scriptId);
- if (functionName != null) {
- try {
- if (useJsSandbox()) {
- sandbox.eval(functionName + " = undefined;");
- } else {
- engine.eval(functionName + " = undefined;");
+ String functionName = functionsMap.get(scriptId);
+ if (functionName != null) {
+ try {
+ if (useJsSandbox()) {
+ sandbox.eval(functionName + " = undefined;");
+ } else {
+ engine.eval(functionName + " = undefined;");
+ }
+ functionsMap.remove(scriptId);
+ blackListedFunctions.remove(new BlackListKey(scriptId, entityId));
+ } catch (ScriptException e) {
+ log.error("Could not release script [{}] [{}]", scriptId, remainingDuplicates);
+ return Futures.immediateFailedFuture(e);
}
- functionsMap.remove(scriptId);
- blackListedFunctions.remove(scriptId);
- } catch (ScriptException e) {
- return Futures.immediateFailedFuture(e);
+ } else {
+ log.warn("Function name do not exist for script [{}] [{}]", scriptId, remainingDuplicates);
}
}
return Futures.immediateFuture(null);
}
- private boolean isBlackListed(UUID scriptId) {
- if (blackListedFunctions.containsKey(scriptId)) {
- AtomicInteger errorCount = blackListedFunctions.get(scriptId);
- return errorCount.get() >= getMaxErrors();
- } else {
- return false;
- }
- }
private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
switch (scriptType) {
@@ -174,15 +195,66 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
}
}
- private Pair<UUID, AtomicInteger> deduplicate(JsScriptType scriptType, String scriptBody) {
- Pair<UUID, AtomicInteger> precomputed = Pair.of(UUID.randomUUID(), new AtomicInteger());
-
- Pair<UUID, AtomicInteger> pair = scriptToId.computeIfAbsent(deduplicateKey(scriptType, scriptBody), i -> precomputed);
- AtomicInteger duplicateCount = scriptIdToCount.computeIfAbsent(pair.getLeft(), i -> pair.getRight());
- return Pair.of(pair.getLeft(), duplicateCount);
+ private ScriptInfo deduplicate(JsScriptType scriptType, String scriptBody) {
+ ScriptInfo meta = ScriptInfo.preInit();
+ String key = deduplicateKey(scriptType, scriptBody);
+ ScriptInfo latestMeta = scriptKeyToInfo.computeIfAbsent(key, i -> meta);
+ return scriptIdToInfo.computeIfAbsent(latestMeta.getId(), i -> latestMeta);
}
private String deduplicateKey(JsScriptType scriptType, String scriptBody) {
return scriptType + "_" + scriptBody;
}
+
+ @Getter
+ private static class ScriptInfo {
+ private final UUID id;
+ private final Object lock;
+ private final AtomicInteger count;
+
+ ScriptInfo(UUID id, Object lock, AtomicInteger count) {
+ this.id = id;
+ this.lock = lock;
+ this.count = count;
+ }
+
+ static ScriptInfo preInit() {
+ UUID preId = UUID.randomUUID();
+ AtomicInteger preCount = new AtomicInteger();
+ Object preLock = new Object();
+ return new ScriptInfo(preId, preLock, preCount);
+ }
+ }
+
+ @EqualsAndHashCode
+ @Getter
+ @RequiredArgsConstructor
+ private static class BlackListKey {
+ private final UUID scriptId;
+ private final EntityId entityId;
+
+ }
+
+ @Data
+ private static class BlackListInfo {
+ private final AtomicInteger count;
+ private Exception ex;
+
+ BlackListInfo() {
+ this.count = new AtomicInteger(0);
+ }
+
+ void incrementWithReason(Exception e) {
+ count.incrementAndGet();
+ ex = e;
+ }
+
+ int getCount() {
+ return count.get();
+ }
+
+ Exception getCause() {
+ return ex;
+ }
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java b/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java
index ee86c62..5e1c676 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java
@@ -17,6 +17,7 @@
package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.EntityId;
import java.util.UUID;
@@ -24,8 +25,8 @@ public interface JsSandboxService {
ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames);
- ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args);
+ ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args);
- ListenableFuture<Void> release(UUID scriptId);
+ ListenableFuture<Void> release(UUID scriptId, EntityId entityId);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
index 2ba87ec..767dc05 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -39,9 +40,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
private final JsSandboxService sandboxService;
private final UUID scriptId;
+ private final EntityId entityId;
- public RuleNodeJsScriptEngine(JsSandboxService sandboxService, String script, String... argNames) {
+ public RuleNodeJsScriptEngine(JsSandboxService sandboxService, EntityId entityId, String script, String... argNames) {
this.sandboxService = sandboxService;
+ this.entityId = entityId;
try {
this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get();
} catch (Exception e) {
@@ -162,20 +165,20 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
private JsonNode executeScript(TbMsg msg) throws ScriptException {
try {
String[] inArgs = prepareArgs(msg);
- String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
+ String eval = sandboxService.invokeFunction(this.scriptId, this.entityId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
return mapper.readTree(eval);
} catch (ExecutionException e) {
if (e.getCause() instanceof ScriptException) {
throw (ScriptException)e.getCause();
} else {
- throw new ScriptException("Failed to execute js script: " + e.getMessage());
+ throw new ScriptException(e);
}
} catch (Exception e) {
- throw new ScriptException("Failed to execute js script: " + e.getMessage());
+ throw new ScriptException(e);
}
}
public void destroy() {
- sandboxService.release(this.scriptId);
+ sandboxService.release(this.scriptId, this.entityId);
}
}
diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml
index 978a570..dcfc930 100644
--- a/application/src/main/resources/logback.xml
+++ b/application/src/main/resources/logback.xml
@@ -17,7 +17,7 @@
-->
<!DOCTYPE configuration>
-<configuration>
+<configuration scan="true" scanPeriod="10 seconds">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
diff --git a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
index ea70384..88961dc 100644
--- a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
@@ -21,12 +21,18 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.rule.engine.api.ScriptEngine;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.script.ScriptException;
-
+import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
@@ -35,6 +41,8 @@ public class RuleNodeJsScriptEngineTest {
private ScriptEngine scriptEngine;
private TestNashornJsSandboxService jsSandboxService;
+ private EntityId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
@Before
public void beforeTest() throws Exception {
jsSandboxService = new TestNashornJsSandboxService(false, 1, 100, 3);
@@ -48,7 +56,7 @@ public class RuleNodeJsScriptEngineTest {
@Test
public void msgCanBeUpdated() throws ScriptException {
String function = "metadata.temp = metadata.temp * 10; return {metadata: metadata};";
- scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
+ scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
@@ -65,7 +73,7 @@ public class RuleNodeJsScriptEngineTest {
@Test
public void newAttributesCanBeAddedInMsg() throws ScriptException {
String function = "metadata.newAttr = metadata.humidity - msg.passed; return {metadata: metadata};";
- scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
+ scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
@@ -81,7 +89,7 @@ public class RuleNodeJsScriptEngineTest {
@Test
public void payloadCanBeUpdated() throws ScriptException {
String function = "msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine'; return {msg: msg};";
- scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
+ scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
@@ -99,7 +107,7 @@ public class RuleNodeJsScriptEngineTest {
@Test
public void metadataAccessibleForFilter() throws ScriptException {
String function = "return metadata.humidity < 15;";
- scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
+ scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
@@ -113,7 +121,7 @@ public class RuleNodeJsScriptEngineTest {
@Test
public void dataAccessibleForFilter() throws ScriptException {
String function = "return msg.passed < 15 && msg.name === 'Vit' && metadata.temp == 7 && msg.bigObj.prop == 42;";
- scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
+ scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
metaData.putValue("humidity", "99");
@@ -134,7 +142,7 @@ public class RuleNodeJsScriptEngineTest {
"};\n" +
"\n" +
"return nextRelation(metadata, msg);";
- scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode);
+ scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10");
metaData.putValue("humidity", "99");
@@ -156,7 +164,7 @@ public class RuleNodeJsScriptEngineTest {
"};\n" +
"\n" +
"return nextRelation(metadata, msg);";
- scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode);
+ scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10");
metaData.putValue("humidity", "99");
@@ -168,4 +176,75 @@ public class RuleNodeJsScriptEngineTest {
scriptEngine.destroy();
}
+ @Test
+ public void concurrentReleasedCorrectly() throws InterruptedException, ExecutionException {
+ String code = "metadata.temp = metadata.temp * 10; return {metadata: metadata};";
+
+ int repeat = 1000;
+ ExecutorService service = Executors.newFixedThreadPool(repeat);
+ Map<UUID, Object> scriptIds = new ConcurrentHashMap<>();
+ CountDownLatch startLatch = new CountDownLatch(repeat);
+ CountDownLatch finishLatch = new CountDownLatch(repeat);
+ AtomicInteger failedCount = new AtomicInteger(0);
+
+ for (int i = 0; i < repeat; i++) {
+ service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code));
+ }
+
+ finishLatch.await();
+ assertTrue(scriptIds.size() == 1);
+ assertTrue(failedCount.get() == 0);
+
+ CountDownLatch nextStart = new CountDownLatch(repeat);
+ CountDownLatch nextFinish = new CountDownLatch(repeat);
+ for (int i = 0; i < repeat; i++) {
+ service.submit(() -> runScript(nextStart, nextFinish, failedCount, scriptIds, code));
+ }
+
+ nextFinish.await();
+ assertTrue(scriptIds.size() == 1);
+ assertTrue(failedCount.get() == 0);
+ service.shutdownNow();
+ }
+
+ @Test
+ public void concurrentFailedEvaluationShouldThrowException() throws InterruptedException {
+ String code = "metadata.temp = metadata.temp * 10; urn {metadata: metadata};";
+
+ int repeat = 10000;
+ ExecutorService service = Executors.newFixedThreadPool(repeat);
+ Map<UUID, Object> scriptIds = new ConcurrentHashMap<>();
+ CountDownLatch startLatch = new CountDownLatch(repeat);
+ CountDownLatch finishLatch = new CountDownLatch(repeat);
+ AtomicInteger failedCount = new AtomicInteger(0);
+ for (int i = 0; i < repeat; i++) {
+ service.submit(() -> {
+ service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code));
+ });
+ }
+
+ finishLatch.await();
+ assertTrue(scriptIds.isEmpty());
+ assertEquals(repeat, failedCount.get());
+ service.shutdownNow();
+ }
+
+ private void runScript(CountDownLatch startLatch, CountDownLatch finishLatch, AtomicInteger failedCount,
+ Map<UUID, Object> scriptIds, String code) {
+ try {
+ for (int k = 0; k < 10; k++) {
+ startLatch.countDown();
+ startLatch.await();
+ UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get();
+ scriptIds.put(scriptId, new Object());
+ jsSandboxService.invokeFunction(scriptId, ruleNodeId, "{}", "{}", "TEXT").get();
+ jsSandboxService.release(scriptId, ruleNodeId).get();
+ }
+ } catch (Throwable th) {
+ failedCount.incrementAndGet();
+ } finally {
+ finishLatch.countDown();
+ }
+ }
+
}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java
index fe35ef6..260669f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java
@@ -26,10 +26,8 @@ import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.api.util.DonAsynchron;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
@@ -39,8 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
-import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL;
-import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE;
+import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.*;
import static org.thingsboard.server.common.data.kv.Aggregation.NONE;
/**
@@ -64,6 +61,7 @@ public class TbGetTelemetryNode implements TbNode {
private long endTsOffset;
private int limit;
private ObjectMapper mapper;
+ private String fetchMode;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
@@ -72,6 +70,7 @@ public class TbGetTelemetryNode implements TbNode {
startTsOffset = TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval());
endTsOffset = TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval());
limit = config.getFetchMode().equals(FETCH_MODE_ALL) ? MAX_FETCH_SIZE : 1;
+ fetchMode = config.getFetchMode();
mapper = new ObjectMapper();
mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
@@ -96,14 +95,18 @@ public class TbGetTelemetryNode implements TbNode {
}
}
- //TODO: handle direction;
private List<ReadTsKvQuery> buildQueries() {
long ts = System.currentTimeMillis();
long startTs = ts - startTsOffset;
long endTs = ts - endTsOffset;
-
+ String orderBy;
+ if (fetchMode.equals(FETCH_MODE_FIRST) || fetchMode.equals(FETCH_MODE_ALL)) {
+ orderBy = "ASC";
+ } else {
+ orderBy = "DESC";
+ }
return tsKeyNames.stream()
- .map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE))
+ .map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE, orderBy))
.collect(Collectors.toList());
}
@@ -116,7 +119,7 @@ public class TbGetTelemetryNode implements TbNode {
}
for (String key : tsKeyNames) {
- if(resultNode.has(key)){
+ if (resultNode.has(key)) {
msg.getMetaData().putValue(key, resultNode.get(key).toString());
}
}
@@ -127,11 +130,11 @@ public class TbGetTelemetryNode implements TbNode {
}
private void processArray(ObjectNode node, TsKvEntry entry) {
- if(node.has(entry.getKey())){
+ if (node.has(entry.getKey())) {
ArrayNode arrayNode = (ArrayNode) node.get(entry.getKey());
ObjectNode obj = buildNode(entry);
arrayNode.add(obj);
- }else {
+ } else {
ArrayNode arrayNode = mapper.createArrayNode();
ObjectNode obj = buildNode(entry);
arrayNode.add(obj);