Details
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 83630ce..18e35c6 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -157,6 +157,13 @@ cassandra:
# Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
+# SQL configuration parameters
+sql:
+ # Specify executor service type used to perform timeseries insert tasks: SINGLE FIXED CACHED
+ ts_inserts_executor_type: "${SQL_TS_INSERTS_EXECUTOR_TYPE:fixed}"
+ # Specify thread pool size for FIXED executor service type
+ ts_inserts_fixed_thread_pool_size: "${SQL_TS_INSERTS_FIXED_THREAD_POOL_SIZE:10}"
+
# Actor system parameters
actors:
tenant:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 170f9a6..d0acb33 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -20,6 +20,7 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.UUIDConverter;
@@ -31,14 +32,17 @@ import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
import org.thingsboard.server.dao.model.sql.TsKvLatestEntity;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
+import org.thingsboard.server.dao.timeseries.TsInsertExecutorType;
import org.thingsboard.server.dao.util.SqlDao;
import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -50,7 +54,13 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
@SqlDao
public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao {
- private ListeningExecutorService insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ @Value("${sql.ts_inserts_executor_type}")
+ private String insertExecutorType;
+
+ @Value("${sql.ts_inserts_fixed_thread_pool_size}")
+ private int insertFixedThreadPoolSize;
+
+ private ListeningExecutorService insertService;
@Autowired
private TsKvRepository tsKvRepository;
@@ -58,6 +68,32 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
@Autowired
private TsKvLatestRepository tsKvLatestRepository;
+ @PostConstruct
+ public void init() {
+ Optional<TsInsertExecutorType> executorTypeOptional = TsInsertExecutorType.parse(insertExecutorType);
+ TsInsertExecutorType executorType;
+ if (executorTypeOptional.isPresent()) {
+ executorType = executorTypeOptional.get();
+ } else {
+ executorType = TsInsertExecutorType.FIXED;
+ }
+ switch (executorType) {
+ case SINGLE:
+ insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ break;
+ case FIXED:
+ int poolSize = insertFixedThreadPoolSize;
+ if (poolSize <= 0) {
+ poolSize = 10;
+ }
+ insertService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(poolSize));
+ break;
+ case CACHED:
+ insertService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ break;
+ }
+ }
+
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
List<ListenableFuture<List<TsKvEntry>>> futures = queries
@@ -265,7 +301,9 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
@PreDestroy
void onDestroy() {
- insertService.shutdown();
+ if (insertService != null) {
+ insertService.shutdown();
+ }
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsInsertExecutorType.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsInsertExecutorType.java
new file mode 100644
index 0000000..6756f0f
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsInsertExecutorType.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import java.util.Optional;
+
+public enum TsInsertExecutorType {
+ SINGLE,
+ FIXED,
+ CACHED;
+
+ public static Optional<TsInsertExecutorType> parse(String name) {
+ TsInsertExecutorType executorType = null;
+ if (name != null) {
+ for (TsInsertExecutorType type : TsInsertExecutorType.values()) {
+ if (type.name().equalsIgnoreCase(name)) {
+ executorType = type;
+ break;
+ }
+ }
+ }
+ return Optional.of(executorType);
+ }
+}
diff --git a/dao/src/test/resources/sql-test.properties b/dao/src/test/resources/sql-test.properties
index 0bcc789..1f34b98 100644
--- a/dao/src/test/resources/sql-test.properties
+++ b/dao/src/test/resources/sql-test.properties
@@ -1,4 +1,7 @@
- database.type=sql
+database.type=sql
+
+sql.ts_inserts_executor_type=fixed
+sql.ts_inserts_fixed_thread_pool_size=10
spring.jpa.show-sql=false
spring.jpa.hibernate.ddl-auto=validate