thingsboard-memoizeit

Details

diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 83630ce..18e35c6 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -157,6 +157,13 @@ cassandra:
     # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
     ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
 
+# SQL configuration parameters
+sql:
+    # Specify executor service type used to perform timeseries insert tasks: SINGLE FIXED CACHED
+    ts_inserts_executor_type: "${SQL_TS_INSERTS_EXECUTOR_TYPE:fixed}"
+    # Specify thread pool size for FIXED executor service type
+    ts_inserts_fixed_thread_pool_size: "${SQL_TS_INSERTS_FIXED_THREAD_POOL_SIZE:10}"
+
 # Actor system parameters
 actors:
   tenant:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 170f9a6..d0acb33 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -20,6 +20,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.domain.PageRequest;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.data.UUIDConverter;
@@ -31,14 +32,17 @@ import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
 import org.thingsboard.server.dao.model.sql.TsKvLatestEntity;
 import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
+import org.thingsboard.server.dao.timeseries.TsInsertExecutorType;
 import org.thingsboard.server.dao.util.SqlDao;
 
 import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
@@ -50,7 +54,13 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
 @SqlDao
 public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao {
 
-    private ListeningExecutorService insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+    @Value("${sql.ts_inserts_executor_type}")
+    private String insertExecutorType;
+
+    @Value("${sql.ts_inserts_fixed_thread_pool_size}")
+    private int insertFixedThreadPoolSize;
+
+    private ListeningExecutorService insertService;
 
     @Autowired
     private TsKvRepository tsKvRepository;
@@ -58,6 +68,32 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
     @Autowired
     private TsKvLatestRepository tsKvLatestRepository;
 
+    @PostConstruct
+    public void init() {
+        Optional<TsInsertExecutorType> executorTypeOptional = TsInsertExecutorType.parse(insertExecutorType);
+        TsInsertExecutorType executorType;
+        if (executorTypeOptional.isPresent()) {
+            executorType = executorTypeOptional.get();
+        } else {
+            executorType = TsInsertExecutorType.FIXED;
+        }
+        switch (executorType) {
+            case SINGLE:
+                insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+                break;
+            case FIXED:
+                int poolSize = insertFixedThreadPoolSize;
+                if (poolSize <= 0) {
+                    poolSize = 10;
+                }
+                insertService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(poolSize));
+                break;
+            case CACHED:
+                insertService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+                break;
+        }
+    }
+
     @Override
     public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
         List<ListenableFuture<List<TsKvEntry>>> futures = queries
@@ -265,7 +301,9 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
 
     @PreDestroy
     void onDestroy() {
-        insertService.shutdown();
+        if (insertService != null) {
+            insertService.shutdown();
+        }
     }
 
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsInsertExecutorType.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsInsertExecutorType.java
new file mode 100644
index 0000000..6756f0f
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsInsertExecutorType.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import java.util.Optional;
+
+public enum TsInsertExecutorType {
+    SINGLE,
+    FIXED,
+    CACHED;
+
+    public static Optional<TsInsertExecutorType> parse(String name) {
+        TsInsertExecutorType executorType = null;
+        if (name != null) {
+            for (TsInsertExecutorType type : TsInsertExecutorType.values()) {
+                if (type.name().equalsIgnoreCase(name)) {
+                    executorType = type;
+                    break;
+                }
+            }
+        }
+        return Optional.of(executorType);
+    }
+}
diff --git a/dao/src/test/resources/sql-test.properties b/dao/src/test/resources/sql-test.properties
index 0bcc789..1f34b98 100644
--- a/dao/src/test/resources/sql-test.properties
+++ b/dao/src/test/resources/sql-test.properties
@@ -1,4 +1,7 @@
- database.type=sql
+database.type=sql
+
+sql.ts_inserts_executor_type=fixed
+sql.ts_inserts_fixed_thread_pool_size=10
 
 spring.jpa.show-sql=false
 spring.jpa.hibernate.ddl-auto=validate