thingsboard-aplcache

Changes

dao/src/main/java/org/thingsboard/server/dao/queue/db/MsgAck.java 32(+0 -32)

dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java 87(+0 -87)

dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/QueuePartitioner.java 86(+0 -86)

dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepository.java 68(+0 -68)

dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraMsgRepository.java 67(+0 -67)

dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraProcessedPartitionRepository.java 64(+0 -64)

dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/AckRepository.java 29(+0 -29)

dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/MsgRepository.java 30(+0 -30)

dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/ProcessedPartitionRepository.java 29(+0 -29)

dao/src/main/java/org/thingsboard/server/dao/queue/db/sql/SqlMsgQueue.java 20(+0 -20)

dao/src/main/java/org/thingsboard/server/dao/queue/db/UnprocessedMsgFilter.java 35(+0 -35)

dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/QueuePartitionerTest.java 81(+0 -81)

dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepositoryTest.java 82(+0 -82)

dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraMsgRepositoryTest.java 87(+0 -87)

dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraProcessedPartitionRepositoryTest.java 83(+0 -83)

dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/UnprocessedMsgFilterTest.java 47(+0 -47)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index f7c7f1a..4840f2a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -235,6 +235,10 @@ public class ActorSystemContext {
     @Getter
     private boolean tenantComponentsInitEnabled;
 
+    @Value("${actors.rule.allow_system_mail_service}")
+    @Getter
+    private boolean allowSystemMailService;
+
     @Getter
     @Setter
     private ActorSystem actorSystem;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index b888bc3..70509fb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -209,7 +209,11 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public MailService getMailService() {
-        return mainCtx.getMailService();
+        if (mainCtx.isAllowSystemMailService()) {
+            return mainCtx.getMailService();
+        } else {
+            throw new RuntimeException("Access to System Mail Service is forbidden!");
+        }
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index ac902a7..7d560db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -183,23 +183,35 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
         checkActive();
-        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+        if (firstNode != null) {
+            putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+        }
     }
 
     void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
         checkActive();
-        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
-            pushMsgToNode(firstNode, msg, "");
-            envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
-        });
+        if (firstNode != null) {
+            putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
+                pushMsgToNode(firstNode, msg, "");
+                envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
+            });
+        }
     }
 
     void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
         checkActive();
         if (envelope.isEnqueue()) {
-            putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+            if (firstNode != null) {
+                putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+            }
         } else {
-            pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
+            if (firstNode != null) {
+                pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
+            } else {
+                TbMsg msg = envelope.getMsg();
+                EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+                queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
+            }
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
index d2ae77c..f863d0b 100644
--- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
+++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
@@ -82,7 +82,7 @@ public class ThingsboardInstallService {
                         databaseUpgradeService.upgradeDatabase("1.3.1");
 
                     case "1.4.0":
-                        log.info("Upgrading ThingsBoard from version 1.4.0 to 1.5.0 ...");
+                        log.info("Upgrading ThingsBoard from version 1.4.0 to 2.0.0 ...");
 
                         databaseUpgradeService.upgradeDatabase("1.4.0");
 
diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
index d38cddd..4d2adea 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java
@@ -198,7 +198,7 @@ public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService {
             case "1.4.0":
 
                 log.info("Updating schema ...");
-                schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.5.0", SCHEMA_UPDATE_CQL);
+                schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.0.0", SCHEMA_UPDATE_CQL);
                 loadCql(schemaUpdateFile);
                 log.info("Schema updated.");
 
diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java
index 37e5b30..b372368 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultDataUpdateService.java
@@ -47,7 +47,7 @@ public class DefaultDataUpdateService implements DataUpdateService {
     public void updateData(String fromVersion) throws Exception {
         switch (fromVersion) {
             case "1.4.0":
-                log.info("Updating data from version 1.4.0 to 1.5.0 ...");
+                log.info("Updating data from version 1.4.0 to 2.0.0 ...");
                 tenantsDefaultRuleChainUpdater.updateEntities(null);
                 break;
             default:
diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
index ecb6070..29d5c65 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
@@ -104,7 +104,7 @@ public class SqlDatabaseUpgradeService implements DatabaseUpgradeService {
             case "1.4.0":
                 try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
                     log.info("Updating schema ...");
-                    schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.5.0", SCHEMA_UPDATE_SQL);
+                    schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.0.0", SCHEMA_UPDATE_SQL);
                     String sql = new String(Files.readAllBytes(schemaUpdateFile), Charset.forName("UTF-8"));
                     conn.createStatement().execute(sql); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
                     log.info("Schema updated.");
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
index a4558eb..9275847 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
@@ -40,10 +40,10 @@ import java.util.concurrent.atomic.AtomicLong;
 @Slf4j
 public class DefaultMsgQueueService implements MsgQueueService {
 
-    @Value("${rule.queue.max_size}")
+    @Value("${actors.rule.queue.max_size}")
     private long queueMaxSize;
 
-    @Value("${rule.queue.cleanup_period}")
+    @Value("${actors.rule.queue.cleanup_period}")
     private long queueCleanUpPeriod;
 
     @Autowired
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 9a10895..188291a 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -203,6 +203,7 @@ cassandra:
     default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
     # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
     ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
+    ts_key_value_ttl: "${TS_KV_TTL:0}"
     buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
     concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
     permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
@@ -236,6 +237,8 @@ actors:
     js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
     # Specify thread pool size for mail sender executor service
     mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}"
+    # Whether to allow usage of system mail service for rules
+    allow_system_mail_service: "${ACTORS_RULE_ALLOW_SYSTEM_MAIL_SERVICE:true}"
     # Specify thread pool size for external call service
     external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}"
     js_sandbox:
@@ -253,6 +256,13 @@ actors:
     node:
       # Errors for particular actor are persisted once per specified amount of milliseconds
       error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
+    queue:
+      # Message queue type
+      type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
+      # Message queue maximum size (per tenant)
+      max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
+      # Message queue cleanup period in seconds
+      cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}"
   statistics:
     # Enable/disable actor statistics
     enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -333,16 +343,6 @@ spring:
     username: "${SPRING_DATASOURCE_USERNAME:sa}"
     password: "${SPRING_DATASOURCE_PASSWORD:}"
 
-rule:
-  queue:
-    #Message queue type (memory or db)
-    type: "${RULE_QUEUE_TYPE:memory}"
-    #Message queue maximum size (per tenant)
-    max_size: "${RULE_QUEUE_MAX_SIZE:100}"
-    #Message queue cleanup period in seconds
-    cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"
-
-
 # PostgreSQL DAO Configuration
 #spring:
 #  data:
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 6c2d3db..1b042a8 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -32,6 +32,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
 import org.thingsboard.server.service.queue.MsgQueueService;
 
 import java.io.IOException;
+import java.util.function.Predicate;
 
 /**
  * Created by ashvayka on 20.03.18.
@@ -75,4 +76,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
             throw new RuntimeException(e);
         }
     }
+
+    protected Predicate<Event> filterByCustomEvent() {
+        return event -> event.getBody().get("msgType").textValue().equals("CUSTOM");
+    }
+
 }
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index 356dfee..c86d496 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -47,6 +47,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
@@ -157,15 +159,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
 
         Thread.sleep(3000);
 
-        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
+        Assert.assertEquals(2, events.size());
 
-        Assert.assertEquals(2, events.getData().size());
-
-        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
@@ -174,15 +176,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
         RuleChain finalRuleChain = ruleChain;
         RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
 
-        events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+        eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+        events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
@@ -274,15 +277,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
 
         Thread.sleep(3000);
 
-        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
@@ -291,15 +295,17 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
         RuleChain finalRuleChain = rootRuleChain;
         RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
 
-        events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+        eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+        events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
+
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 2f25b97..7ac0789 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -45,6 +45,8 @@ import org.thingsboard.server.dao.attributes.AttributesService;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
@@ -144,15 +146,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
 
         Thread.sleep(3000);
 
-        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
@@ -212,15 +215,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
 
         Thread.sleep(3000);
 
-        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
index 4532e02..9305778 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Executors;
  * Created by ashvayka on 27.04.18.
  */
 @Component
-@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
+@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
 @Slf4j
 public class InMemoryMsgQueue implements MsgQueue {
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 0fa9653..7aa317c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -82,6 +82,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     @Value("${cassandra.query.ts_key_value_partitioning}")
     private String partitioning;
 
+    @Value("${cassandra.query.ts_key_value_ttl}")
+    private long systemTtl;
+
     private TsPartitionDate tsFormat;
 
     private PreparedStatement partitionInsertStmt;
@@ -287,6 +290,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     @Override
     public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
+        ttl = computeTtl(ttl);
         long partition = toPartitionTs(tsKvEntry.getTs());
         DataType type = tsKvEntry.getDataType();
         BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
@@ -304,6 +308,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     @Override
     public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
+        ttl = computeTtl(ttl);
         long partition = toPartitionTs(tsKvEntryTs);
         log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
         BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
@@ -317,6 +322,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return getFuture(executeAsyncWrite(stmt), rs -> null);
     }
 
+    private long computeTtl(long ttl) {
+        if (systemTtl > 0) {
+            if (ttl == 0) {
+                ttl = systemTtl;
+            } else {
+                ttl = Math.min(systemTtl, ttl);
+            }
+        }
+        return ttl;
+    }
+
     @Override
     public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
         BoundStatement stmt = getLatestStmt().bind()
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 737687f..cf07b22 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000
 
 cassandra.query.ts_key_value_partitioning=HOURS
 
+cassandra.query.ts_key_value_ttl=0
+
 cassandra.query.max_limit_per_request=1000
 cassandra.query.buffer_size=100000
 cassandra.query.concurrent_limit=1000
diff --git a/docker/tb/run-application.sh b/docker/tb/run-application.sh
index e297335..a2a1e2b 100755
--- a/docker/tb/run-application.sh
+++ b/docker/tb/run-application.sh
@@ -18,6 +18,11 @@
 
 dpkg -i /thingsboard.deb
 
+# Copying env variables into conf files
+printenv | awk -F "=" '{print "export " $1 "='\''" $2 "'\''"}' >> /usr/share/thingsboard/conf/thingsboard.conf
+
+cat /usr/share/thingsboard/conf/thingsboard.conf
+
 if [ "$DATABASE_TYPE" == "cassandra" ]; then
     until nmap $CASSANDRA_HOST -p $CASSANDRA_PORT | grep "$CASSANDRA_PORT/tcp open\|filtered"
     do
@@ -46,12 +51,6 @@ if [ "$ADD_SCHEMA_AND_SYSTEM_DATA" == "true" ]; then
     fi
 fi
 
-
-# Copying env variables into conf files
-printenv | awk -F "=" '{print "export " $1 "='\''" $2 "'\''"}' >> /usr/share/thingsboard/conf/thingsboard.conf
-
-cat /usr/share/thingsboard/conf/thingsboard.conf
-
 echo "Starting 'Thingsboard' service..."
 service thingsboard start
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
index e9fa8f6..af8201f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
@@ -15,7 +15,6 @@
  */
 package org.thingsboard.rule.engine.metadata;
 
-import com.google.common.base.Function;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.collections.CollectionUtils;
@@ -33,9 +32,11 @@ import java.util.List;
 import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
 import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
-import static org.thingsboard.server.common.data.DataConstants.*;
+import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
+import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
+import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
 
-public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId>  implements TbNode {
+public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
 
     protected C config;
 
@@ -59,7 +60,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
     }
 
     private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
-        if(entityId == null || entityId.isNullUid()) {
+        if (entityId == null || entityId.isNullUid()) {
             ctx.tellNext(msg, FAILURE);
             return;
         }
@@ -78,7 +79,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
         }
         ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, scope, keys);
         return Futures.transform(latest, l -> {
-            l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
+            l.forEach(r -> {
+                if (r.getValue() != null) {
+                    msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString());
+                } else {
+                    throw new RuntimeException("[" + scope + "][" + r.getKey() + "] attribute value is not present in the DB!");
+                }
+            });
             return null;
         });
     }
@@ -89,7 +96,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
         }
         ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, keys);
         return Futures.transform(latest, l -> {
-            l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString()));
+            l.forEach(r -> {
+                if (r.getValue() != null) {
+                    msg.getMetaData().putValue(r.getKey(), r.getValueAsString());
+                } else {
+                    throw new RuntimeException("[" + r.getKey() + "] telemetry value is not present in the DB!");
+                }
+            });
             return null;
         });
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
index ea4446d..dc0fcdd 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
@@ -69,10 +69,8 @@ public class TbMsgTimeseriesNode implements TbNode {
             try {
                 ts = Long.parseLong(tsStr);
             } catch (NumberFormatException e) {}
-        }
-        if (ts == -1) {
-            ctx.tellFailure(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData()));
-            return;
+        } else {
+            ts = System.currentTimeMillis();
         }
         String src = msg.getData();
         TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index 1b37ed4..bb8d4ad 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -119,8 +119,8 @@ public class MqttTransportService {
         try {
             serverChannel.close().sync();
         } finally {
-            bossGroup.shutdownGracefully();
             workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
         }
         log.info("MQTT transport stopped!");
     }
diff --git a/ui/src/app/locale/locale.constant-zh.js b/ui/src/app/locale/locale.constant-zh.js
index d94141c..c25291a 100644
--- a/ui/src/app/locale/locale.constant-zh.js
+++ b/ui/src/app/locale/locale.constant-zh.js
@@ -280,6 +280,38 @@ export default function addLocaleChinese(locales) {
             "selected-attributes": "{ count, select, 1 {1 属性} other {# 属性} } 被选中",
             "selected-telemetry": "{ count, select, 1 {1 遥测} other {# 遥测} } 被选中"
         },
+        "audit-log": {
+            "audit": "审计",
+            "audit-logs": "审计日志",
+            "timestamp": "时间戳",
+            "entity-type": "实体类型",
+            "entity-name": "实体名称",
+            "user": "用户",
+            "type": "类型",
+            "status": "状态",
+            "details": "详情",
+            "type-added": "添加",
+            "type-deleted": "删除",
+            "type-updated": "更新",
+            "type-attributes-updated": "更新属性",
+            "type-attributes-deleted": "删除属性",
+            "type-rpc-call": "RPC调用",
+            "type-credentials-updated": "更新凭证",
+            "type-assigned-to-customer": "分配给客户",
+            "type-unassigned-from-customer": "未分配给客户",
+            "type-activated": "激活",
+            "type-suspended": "暂停",
+            "type-credentials-read": "读取凭证",
+            "type-attributes-read": "读取属性",
+            "status-success": "成功",
+            "status-failure": "失败",
+            "audit-log-details": "审计日志详情",
+            "no-audit-logs-prompt": "找不到日志",
+            "action-data": "活动数据",
+            "failure-details": "失败详情",
+            "search": "查找审计日志",
+            "clear-search": "清空查找"
+        },
         "confirm-on-exit": {
             "message": "您有未保存的更改。确定要离开此页吗?",
             "html-message": "您有未保存的更改。<br/> 确定要离开此页面吗?",
diff --git a/ui/src/app/widget/widget-editor.controller.js b/ui/src/app/widget/widget-editor.controller.js
index 301fc5a..6f0ad87 100644
--- a/ui/src/app/widget/widget-editor.controller.js
+++ b/ui/src/app/widget/widget-editor.controller.js
@@ -20,6 +20,7 @@ import 'brace/mode/javascript';
 import 'brace/mode/html';
 import 'brace/mode/css';
 import 'brace/mode/json';
+import 'ace-builds/src-min-noconflict/ace';
 import 'ace-builds/src-min-noconflict/snippets/javascript';
 import 'ace-builds/src-min-noconflict/snippets/text';
 import 'ace-builds/src-min-noconflict/snippets/html';
@@ -662,4 +663,4 @@ export default function WidgetEditorController(widgetService, userService, types
 
 }
 
-/* eslint-enable angular/angularelement */
\ No newline at end of file
+/* eslint-enable angular/angularelement */