thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 26(+19 -7)
application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java 2(+1 -1)
application/src/main/resources/thingsboard.yml 20(+10 -10)
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java 6(+6 -0)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 40(+23 -17)
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java 20(+12 -8)
dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepository.java 68(+0 -68)
dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraMsgRepository.java 67(+0 -67)
dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraProcessedPartitionRepository.java 64(+0 -64)
dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/ProcessedPartitionRepository.java 29(+0 -29)
dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepositoryTest.java 82(+0 -82)
dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraMsgRepositoryTest.java 87(+0 -87)
dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraProcessedPartitionRepositoryTest.java 83(+0 -83)
dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/UnprocessedMsgFilterTest.java 47(+0 -47)
docker/tb/run-application.sh 11(+5 -6)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java 25(+19 -6)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java 6(+2 -4)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java 2(+1 -1)
ui/src/app/locale/locale.constant-zh.js 32(+32 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index f7c7f1a..4840f2a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -235,6 +235,10 @@ public class ActorSystemContext {
@Getter
private boolean tenantComponentsInitEnabled;
+ @Value("${actors.rule.allow_system_mail_service}")
+ @Getter
+ private boolean allowSystemMailService;
+
@Getter
@Setter
private ActorSystem actorSystem;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index b888bc3..70509fb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -209,7 +209,11 @@ class DefaultTbContext implements TbContext {
@Override
public MailService getMailService() {
- return mainCtx.getMailService();
+ if (mainCtx.isAllowSystemMailService()) {
+ return mainCtx.getMailService();
+ } else {
+ throw new RuntimeException("Access to System Mail Service is forbidden!");
+ }
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index ac902a7..7d560db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -183,23 +183,35 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
checkActive();
- putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+ if (firstNode != null) {
+ putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+ }
}
void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
checkActive();
- putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
- pushMsgToNode(firstNode, msg, "");
- envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
- });
+ if (firstNode != null) {
+ putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
+ pushMsgToNode(firstNode, msg, "");
+ envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
+ });
+ }
}
void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
checkActive();
if (envelope.isEnqueue()) {
- putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+ if (firstNode != null) {
+ putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+ }
} else {
- pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
+ if (firstNode != null) {
+ pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
+ } else {
+ TbMsg msg = envelope.getMsg();
+ EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+ queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
+ }
}
}
diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
index d2ae77c..f863d0b 100644
--- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
+++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
@@ -82,7 +82,7 @@ public class ThingsboardInstallService {
databaseUpgradeService.upgradeDatabase("1.3.1");
case "1.4.0":
- log.info("Upgrading ThingsBoard from version 1.4.0 to 1.5.0 ...");
+ log.info("Upgrading ThingsBoard from version 1.4.0 to 2.0.0 ...");
databaseUpgradeService.upgradeDatabase("1.4.0");
diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
index d38cddd..4d2adea 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
@@ -198,7 +198,7 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService {
case "1.4.0":
log.info("Updating schema ...");
- schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.5.0", SCHEMA_UPDATE_CQL);
+ schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.0.0", SCHEMA_UPDATE_CQL);
loadCql(schemaUpdateFile);
log.info("Schema updated.");
diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java
index 37e5b30..b372368 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java
@@ -47,7 +47,7 @@ public class DefaultDataUpdateService implements DataUpdateService {
public void updateData(String fromVersion) throws Exception {
switch (fromVersion) {
case "1.4.0":
- log.info("Updating data from version 1.4.0 to 1.5.0 ...");
+ log.info("Updating data from version 1.4.0 to 2.0.0 ...");
tenantsDefaultRuleChainUpdater.updateEntities(null);
break;
default:
diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
index ecb6070..29d5c65 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
@@ -104,7 +104,7 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService {
case "1.4.0":
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
log.info("Updating schema ...");
- schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.5.0", SCHEMA_UPDATE_SQL);
+ schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.0.0", SCHEMA_UPDATE_SQL);
String sql = new String(Files.readAllBytes(schemaUpdateFile), Charset.forName("UTF-8"));
conn.createStatement().execute(sql); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
log.info("Schema updated.");
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
index a4558eb..9275847 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
@@ -40,10 +40,10 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class DefaultMsgQueueService implements MsgQueueService {
- @Value("${rule.queue.max_size}")
+ @Value("${actors.rule.queue.max_size}")
private long queueMaxSize;
- @Value("${rule.queue.cleanup_period}")
+ @Value("${actors.rule.queue.cleanup_period}")
private long queueCleanUpPeriod;
@Autowired
application/src/main/resources/thingsboard.yml 20(+10 -10)
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 9a10895..188291a 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -203,6 +203,7 @@ cassandra:
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
# Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
+ ts_key_value_ttl: "${TS_KV_TTL:0}"
buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
@@ -236,6 +237,8 @@ actors:
js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
# Specify thread pool size for mail sender executor service
mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}"
+ # Whether to allow usage of system mail service for rules
+ allow_system_mail_service: "${ACTORS_RULE_ALLOW_SYSTEM_MAIL_SERVICE:true}"
# Specify thread pool size for external call service
external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}"
js_sandbox:
@@ -253,6 +256,13 @@ actors:
node:
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
+ queue:
+ # Message queue type
+ type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
+ # Message queue maximum size (per tenant)
+ max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
+ # Message queue cleanup period in seconds
+ cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -333,16 +343,6 @@ spring:
username: "${SPRING_DATASOURCE_USERNAME:sa}"
password: "${SPRING_DATASOURCE_PASSWORD:}"
-rule:
- queue:
- #Message queue type (memory or db)
- type: "${RULE_QUEUE_TYPE:memory}"
- #Message queue maximum size (per tenant)
- max_size: "${RULE_QUEUE_MAX_SIZE:100}"
- #Message queue cleanup period in seconds
- cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"
-
-
# PostgreSQL DAO Configuration
#spring:
# data:
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 6c2d3db..1b042a8 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -32,6 +32,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.service.queue.MsgQueueService;
import java.io.IOException;
+import java.util.function.Predicate;
/**
* Created by ashvayka on 20.03.18.
@@ -75,4 +76,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
throw new RuntimeException(e);
}
}
+
+ protected Predicate<Event> filterByCustomEvent() {
+ return event -> event.getBody().get("msgType").textValue().equals("CUSTOM");
+ }
+
}
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index 356dfee..c86d496 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -47,6 +47,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -157,15 +159,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Thread.sleep(3000);
- TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
+ Assert.assertEquals(2, events.size());
- Assert.assertEquals(2, events.getData().size());
-
- Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
@@ -174,15 +176,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
RuleChain finalRuleChain = ruleChain;
RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
- events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+ eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+ events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
@@ -274,15 +277,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Thread.sleep(3000);
- TimePageData<Event> events = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+ TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+ List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
@@ -291,15 +295,17 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
RuleChain finalRuleChain = rootRuleChain;
RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
- events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+ eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+ events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
+
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 2f25b97..7ac0789 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -45,6 +45,8 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -144,15 +146,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Thread.sleep(3000);
- TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
@@ -212,15 +215,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Thread.sleep(3000);
- TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
index 4532e02..9305778 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Executors;
* Created by ashvayka on 27.04.18.
*/
@Component
-@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
+@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
@Slf4j
public class InMemoryMsgQueue implements MsgQueue {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 0fa9653..7aa317c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -82,6 +82,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
@Value("${cassandra.query.ts_key_value_partitioning}")
private String partitioning;
+ @Value("${cassandra.query.ts_key_value_ttl}")
+ private long systemTtl;
+
private TsPartitionDate tsFormat;
private PreparedStatement partitionInsertStmt;
@@ -287,6 +290,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
@Override
public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
+ ttl = computeTtl(ttl);
long partition = toPartitionTs(tsKvEntry.getTs());
DataType type = tsKvEntry.getDataType();
BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
@@ -304,6 +308,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
@Override
public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
+ ttl = computeTtl(ttl);
long partition = toPartitionTs(tsKvEntryTs);
log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
@@ -317,6 +322,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return getFuture(executeAsyncWrite(stmt), rs -> null);
}
+ private long computeTtl(long ttl) {
+ if (systemTtl > 0) {
+ if (ttl == 0) {
+ ttl = systemTtl;
+ } else {
+ ttl = Math.min(systemTtl, ttl);
+ }
+ }
+ return ttl;
+ }
+
@Override
public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
BoundStatement stmt = getLatestStmt().bind()
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 737687f..cf07b22 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000
cassandra.query.ts_key_value_partitioning=HOURS
+cassandra.query.ts_key_value_ttl=0
+
cassandra.query.max_limit_per_request=1000
cassandra.query.buffer_size=100000
cassandra.query.concurrent_limit=1000
docker/tb/run-application.sh 11(+5 -6)
diff --git a/docker/tb/run-application.sh b/docker/tb/run-application.sh
index e297335..a2a1e2b 100755
--- a/docker/tb/run-application.sh
+++ b/docker/tb/run-application.sh
@@ -18,6 +18,11 @@
dpkg -i /thingsboard.deb
+# Copying env variables into conf files
+printenv | awk -F "=" '{print "export " $1 "='\''" $2 "'\''"}' >> /usr/share/thingsboard/conf/thingsboard.conf
+
+cat /usr/share/thingsboard/conf/thingsboard.conf
+
if [ "$DATABASE_TYPE" == "cassandra" ]; then
until nmap $CASSANDRA_HOST -p $CASSANDRA_PORT | grep "$CASSANDRA_PORT/tcp open\|filtered"
do
@@ -46,12 +51,6 @@ if [ "$ADD_SCHEMA_AND_SYSTEM_DATA" == "true" ]; then
fi
fi
-
-# Copying env variables into conf files
-printenv | awk -F "=" '{print "export " $1 "='\''" $2 "'\''"}' >> /usr/share/thingsboard/conf/thingsboard.conf
-
-cat /usr/share/thingsboard/conf/thingsboard.conf
-
echo "Starting 'Thingsboard' service..."
service thingsboard start
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
index e9fa8f6..af8201f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
@@ -15,7 +15,6 @@
*/
package org.thingsboard.rule.engine.metadata;
-import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.collections.CollectionUtils;
@@ -33,9 +32,11 @@ import java.util.List;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
-import static org.thingsboard.server.common.data.DataConstants.*;
+import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
+import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
+import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
-public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
+public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
protected C config;
@@ -59,7 +60,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
- if(entityId == null || entityId.isNullUid()) {
+ if (entityId == null || entityId.isNullUid()) {
ctx.tellNext(msg, FAILURE);
return;
}
@@ -78,7 +79,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, scope, keys);
return Futures.transform(latest, l -> {
- l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
+ l.forEach(r -> {
+ if (r.getValue() != null) {
+ msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString());
+ } else {
+ throw new RuntimeException("[" + scope + "][" + r.getKey() + "] attribute value is not present in the DB!");
+ }
+ });
return null;
});
}
@@ -89,7 +96,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, keys);
return Futures.transform(latest, l -> {
- l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString()));
+ l.forEach(r -> {
+ if (r.getValue() != null) {
+ msg.getMetaData().putValue(r.getKey(), r.getValueAsString());
+ } else {
+ throw new RuntimeException("[" + r.getKey() + "] telemetry value is not present in the DB!");
+ }
+ });
return null;
});
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
index ea4446d..dc0fcdd 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
@@ -69,10 +69,8 @@ public class TbMsgTimeseriesNode implements TbNode {
try {
ts = Long.parseLong(tsStr);
} catch (NumberFormatException e) {}
- }
- if (ts == -1) {
- ctx.tellFailure(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData()));
- return;
+ } else {
+ ts = System.currentTimeMillis();
}
String src = msg.getData();
TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index 1b37ed4..bb8d4ad 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -119,8 +119,8 @@ public class MqttTransportService {
try {
serverChannel.close().sync();
} finally {
- bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
}
log.info("MQTT transport stopped!");
}
ui/src/app/locale/locale.constant-zh.js 32(+32 -0)
diff --git a/ui/src/app/locale/locale.constant-zh.js b/ui/src/app/locale/locale.constant-zh.js
index d94141c..c25291a 100644
--- a/ui/src/app/locale/locale.constant-zh.js
+++ b/ui/src/app/locale/locale.constant-zh.js
@@ -280,6 +280,38 @@ export default function addLocaleChinese(locales) {
"selected-attributes": "{ count, select, 1 {1 属性} other {# 属性} } 被选中",
"selected-telemetry": "{ count, select, 1 {1 遥测} other {# 遥测} } 被选中"
},
+ "audit-log": {
+ "audit": "审计",
+ "audit-logs": "审计日志",
+ "timestamp": "时间戳",
+ "entity-type": "实体类型",
+ "entity-name": "实体名称",
+ "user": "用户",
+ "type": "类型",
+ "status": "状态",
+ "details": "详情",
+ "type-added": "添加",
+ "type-deleted": "删除",
+ "type-updated": "更新",
+ "type-attributes-updated": "更新属性",
+ "type-attributes-deleted": "删除属性",
+ "type-rpc-call": "RPC调用",
+ "type-credentials-updated": "更新凭证",
+ "type-assigned-to-customer": "分配给客户",
+ "type-unassigned-from-customer": "未分配给客户",
+ "type-activated": "激活",
+ "type-suspended": "暂停",
+ "type-credentials-read": "读取凭证",
+ "type-attributes-read": "读取属性",
+ "status-success": "成功",
+ "status-failure": "失败",
+ "audit-log-details": "审计日志详情",
+ "no-audit-logs-prompt": "找不到日志",
+ "action-data": "活动数据",
+ "failure-details": "失败详情",
+ "search": "查找审计日志",
+ "clear-search": "清空查找"
+ },
"confirm-on-exit": {
"message": "您有未保存的更改。确定要离开此页吗?",
"html-message": "您有未保存的更改。<br/> 确定要离开此页面吗?",
diff --git a/ui/src/app/widget/widget-editor.controller.js b/ui/src/app/widget/widget-editor.controller.js
index 301fc5a..6f0ad87 100644
--- a/ui/src/app/widget/widget-editor.controller.js
+++ b/ui/src/app/widget/widget-editor.controller.js
@@ -20,6 +20,7 @@ import 'brace/mode/javascript';
import 'brace/mode/html';
import 'brace/mode/css';
import 'brace/mode/json';
+import 'ace-builds/src-min-noconflict/ace';
import 'ace-builds/src-min-noconflict/snippets/javascript';
import 'ace-builds/src-min-noconflict/snippets/text';
import 'ace-builds/src-min-noconflict/snippets/html';
@@ -662,4 +663,4 @@ export default function WidgetEditorController(widgetService, userService, types
}
-/* eslint-enable angular/angularelement */
\ No newline at end of file
+/* eslint-enable angular/angularelement */