thingsboard-aplcache

New system parameters: default cassandra ts key/val ttl; allow

5/23/2018 11:06:51 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index f7c7f1a..4840f2a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -235,6 +235,10 @@ public class ActorSystemContext {
     @Getter
     private boolean tenantComponentsInitEnabled;
 
+    @Value("${actors.rule.allow_system_mail_service}")
+    @Getter
+    private boolean allowSystemMailService;
+
     @Getter
     @Setter
     private ActorSystem actorSystem;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index b888bc3..70509fb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -209,7 +209,11 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public MailService getMailService() {
-        return mainCtx.getMailService();
+        if (mainCtx.isAllowSystemMailService()) {
+            return mainCtx.getMailService();
+        } else {
+            throw new RuntimeException("Access to System Mail Service is forbidden!");
+        }
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
index a4558eb..9275847 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
@@ -40,10 +40,10 @@ import java.util.concurrent.atomic.AtomicLong;
 @Slf4j
 public class DefaultMsgQueueService implements MsgQueueService {
 
-    @Value("${rule.queue.max_size}")
+    @Value("${actors.rule.queue.max_size}")
     private long queueMaxSize;
 
-    @Value("${rule.queue.cleanup_period}")
+    @Value("${actors.rule.queue.cleanup_period}")
     private long queueCleanUpPeriod;
 
     @Autowired
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 9a10895..a10ef7d 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -203,6 +203,7 @@ cassandra:
     default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
     # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
     ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
+    ts_key_value_ttl: "${TS_KV_TTL:0}"
     buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
     concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
     permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
@@ -236,6 +237,8 @@ actors:
     js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
     # Specify thread pool size for mail sender executor service
     mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}"
+    # Whether to allow usage of system mail service for rules
+    allow_system_mail_service: "${ACTORS_RULE_ALLOW_SYSTEM_MAIL_SERVICE:true}"
     # Specify thread pool size for external call service
     external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:10}"
     js_sandbox:
@@ -253,6 +256,13 @@ actors:
     node:
       # Errors for particular actor are persisted once per specified amount of milliseconds
       error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
+    queue:
+      # Message queue type (memory or db)
+      type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
+      # Message queue maximum size (per tenant)
+      max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
+      # Message queue cleanup period in seconds
+      cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}"
   statistics:
     # Enable/disable actor statistics
     enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -333,16 +343,6 @@ spring:
     username: "${SPRING_DATASOURCE_USERNAME:sa}"
     password: "${SPRING_DATASOURCE_PASSWORD:}"
 
-rule:
-  queue:
-    #Message queue type (memory or db)
-    type: "${RULE_QUEUE_TYPE:memory}"
-    #Message queue maximum size (per tenant)
-    max_size: "${RULE_QUEUE_MAX_SIZE:100}"
-    #Message queue cleanup period in seconds
-    cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"
-
-
 # PostgreSQL DAO Configuration
 #spring:
 #  data:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java
index ce481b7..9cc87b4 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java
@@ -36,7 +36,7 @@ import java.util.List;
 import java.util.UUID;
 
 @Component
-@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "db")
+@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "db")
 @Slf4j
 @NoSqlDao
 public class CassandraMsgQueue implements MsgQueue {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
index 4532e02..9305778 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Executors;
  * Created by ashvayka on 27.04.18.
  */
 @Component
-@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
+@ConditionalOnProperty(prefix = "actors.rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
 @Slf4j
 public class InMemoryMsgQueue implements MsgQueue {
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 0fa9653..7aa317c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -82,6 +82,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     @Value("${cassandra.query.ts_key_value_partitioning}")
     private String partitioning;
 
+    @Value("${cassandra.query.ts_key_value_ttl}")
+    private long systemTtl;
+
     private TsPartitionDate tsFormat;
 
     private PreparedStatement partitionInsertStmt;
@@ -287,6 +290,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     @Override
     public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
+        ttl = computeTtl(ttl);
         long partition = toPartitionTs(tsKvEntry.getTs());
         DataType type = tsKvEntry.getDataType();
         BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
@@ -304,6 +308,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     @Override
     public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
+        ttl = computeTtl(ttl);
         long partition = toPartitionTs(tsKvEntryTs);
         log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
         BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
@@ -317,6 +322,17 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return getFuture(executeAsyncWrite(stmt), rs -> null);
     }
 
+    private long computeTtl(long ttl) {
+        if (systemTtl > 0) {
+            if (ttl == 0) {
+                ttl = systemTtl;
+            } else {
+                ttl = Math.min(systemTtl, ttl);
+            }
+        }
+        return ttl;
+    }
+
     @Override
     public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
         BoundStatement stmt = getLatestStmt().bind()
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 737687f..cf07b22 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -46,6 +46,8 @@ cassandra.query.default_fetch_size=2000
 
 cassandra.query.ts_key_value_partitioning=HOURS
 
+cassandra.query.ts_key_value_ttl=0
+
 cassandra.query.max_limit_per_request=1000
 cassandra.query.buffer_size=100000
 cassandra.query.concurrent_limit=1000