thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
index 2994810..9463b08 100644
--- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
+++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
@@ -18,11 +18,13 @@ package org.thingsboard.server;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.SpringBootConfiguration;
 import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableAsync;
 import springfox.documentation.swagger2.annotations.EnableSwagger2;
 
 import java.util.Arrays;
 
 @SpringBootConfiguration
+@EnableAsync
 @EnableSwagger2
 @ComponentScan({"org.thingsboard.server"})
 public class ThingsboardServerApplication {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
index c5739f1..74356ad 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
@@ -34,7 +34,9 @@ public final class TsKvEntity implements ToData<TsKvEntry> {
     }
 
     public TsKvEntity(Double avgLongValue, Double avgDoubleValue) {
-        this.longValue = avgLongValue.longValue();
+        if(avgLongValue != null) {
+            this.longValue = avgLongValue.longValue();
+        }
         this.doubleValue = avgDoubleValue;
     }
 
@@ -104,4 +106,8 @@ public final class TsKvEntity implements ToData<TsKvEntry> {
         }
         return new BasicTsKvEntry(ts, kvEntry);
     }
+
+    public boolean isNotEmpty() {
+        return strValue != null || longValue != null || doubleValue != null || booleanValue != null;
+    }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 2b77d08..3219d58 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -19,6 +19,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.domain.PageRequest;
@@ -39,6 +40,8 @@ import org.thingsboard.server.dao.util.SqlDao;
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
@@ -80,7 +83,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
             return findAllAsyncWithLimit(entityId, query);
         } else {
             long stepTs = query.getStartTs();
-            List<ListenableFuture<TsKvEntry>> futures = new ArrayList<>();
+            List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
             while (stepTs < query.getEndTs()) {
                 long startTs = stepTs;
                 long endTs = stepTs + query.getInterval();
@@ -88,16 +91,30 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
                 futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
                 stepTs = endTs;
             }
-            return Futures.allAsList(futures);
+            ListenableFuture<List<Optional<TsKvEntry>>> future = Futures.allAsList(futures);
+            return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
+                @Nullable
+                @Override
+                public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> results) {
+                    if (results == null || results.isEmpty()) {
+                        return null;
+                    }
+                    return results.stream()
+                            .filter(Optional::isPresent)
+                            .map(Optional::get)
+                            .collect(Collectors.toList());
+                }
+            }, service);
         }
     }
 
-    private ListenableFuture<TsKvEntry> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
-        TsKvEntity entity;
+    private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
+        CompletableFuture<TsKvEntity> entity;
+        String entityIdStr = fromTimeUUID(entityId.getId());
         switch (aggregation) {
             case AVG:
                 entity = tsKvRepository.findAvg(
-                        fromTimeUUID(entityId.getId()),
+                        entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
@@ -106,7 +123,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
                 break;
             case MAX:
                 entity = tsKvRepository.findMax(
-                        fromTimeUUID(entityId.getId()),
+                        entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
@@ -115,7 +132,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
                 break;
             case MIN:
                 entity = tsKvRepository.findMin(
-                        fromTimeUUID(entityId.getId()),
+                        entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
@@ -124,7 +141,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
                 break;
             case SUM:
                 entity = tsKvRepository.findSum(
-                        fromTimeUUID(entityId.getId()),
+                        entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
@@ -133,7 +150,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
                 break;
             case COUNT:
                 entity = tsKvRepository.findCount(
-                        fromTimeUUID(entityId.getId()),
+                        entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
@@ -141,12 +158,32 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
 
                 break;
             default:
-                entity = null;
+                throw new IllegalArgumentException("Not supported aggregation type: " + aggregation);
         }
-        if (entity != null) {
-            entity.setTs(ts);
-        }
-        return service.submit(() -> DaoUtil.getData(entity));
+
+        SettableFuture<TsKvEntity> listenableFuture = SettableFuture.create();
+        entity.whenComplete((tsKvEntity, throwable) -> {
+            if (throwable != null) {
+                listenableFuture.setException(throwable);
+            } else {
+                listenableFuture.set(tsKvEntity);
+            }
+        });
+        return Futures.transform(listenableFuture, new Function<TsKvEntity, Optional<TsKvEntry>>() {
+            @Nullable
+            @Override
+            public Optional<TsKvEntry> apply(@Nullable TsKvEntity entity) {
+                if (entity != null && entity.isNotEmpty()) {
+                    entity.setEntityId(entityIdStr);
+                    entity.setEntityType(entityId.getEntityType());
+                    entity.setKey(key);
+                    entity.setTs(ts);
+                    return Optional.of(DaoUtil.getData(entity));
+                } else {
+                    return Optional.empty();
+                }
+            }
+        });
     }
 
     private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index 2c7751d..d1e206e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -19,12 +19,14 @@ import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 import org.springframework.data.repository.query.Param;
+import org.springframework.scheduling.annotation.Async;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
 import org.thingsboard.server.dao.model.sql.TsKvEntity;
 import org.thingsboard.server.dao.util.SqlDao;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 @SqlDao
 public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
@@ -39,49 +41,52 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
                                       @Param("endTs") long endTs,
                                       Pageable pageable);
 
+    @Async
     @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
-    TsKvEntity findMax(@Param("entityId") String entityId,
-                       @Param("entityType") EntityType entityType,
-                       @Param("entityKey") String entityKey,
-                       @Param("startTs") long startTs,
-                       @Param("endTs") long endTs);
+    CompletableFuture<TsKvEntity> findMax(@Param("entityId") String entityId,
+                                          @Param("entityType") EntityType entityType,
+                                          @Param("entityKey") String entityKey,
+                                          @Param("startTs") long startTs,
+                                          @Param("endTs") long endTs);
 
+    @Async
     @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
-    TsKvEntity findMin(@Param("entityId") String entityId,
+    CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
                        @Param("entityType") EntityType entityType,
                        @Param("entityKey") String entityKey,
                        @Param("startTs") long startTs,
                        @Param("endTs") long endTs);
 
-
+    @Async
     @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
-    TsKvEntity findCount(@Param("entityId") String entityId,
-                         @Param("entityType") EntityType entityType,
-                         @Param("entityKey") String entityKey,
-                         @Param("startTs") long startTs,
-                         @Param("endTs") long endTs);
-
+    CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
+                                 @Param("entityType") EntityType entityType,
+                                 @Param("entityKey") String entityKey,
+                                 @Param("startTs") long startTs,
+                                 @Param("endTs") long endTs);
 
+    @Async
     @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
-    TsKvEntity findAvg(@Param("entityId") String entityId,
+    CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
                        @Param("entityType") EntityType entityType,
                        @Param("entityKey") String entityKey,
                        @Param("startTs") long startTs,
                        @Param("endTs") long endTs);
 
 
+    @Async
     @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
-    TsKvEntity findSum(@Param("entityId") String entityId,
+    CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
                        @Param("entityType") EntityType entityType,
                        @Param("entityKey") String entityKey,
                        @Param("startTs") long startTs,
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionUpdate.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionUpdate.java
index 4d8cf53..618ddc7 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionUpdate.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionUpdate.java
@@ -32,11 +32,7 @@ public class SubscriptionUpdate {
         this.subscriptionId = subscriptionId;
         this.data = new TreeMap<>();
         for (TsKvEntry tsEntry : data) {
-            List<Object> values = this.data.get(tsEntry.getKey());
-            if (values == null) {
-                values = new ArrayList<>();
-                this.data.put(tsEntry.getKey(), values);
-            }
+            List<Object> values = this.data.computeIfAbsent(tsEntry.getKey(), k -> new ArrayList<>());
             Object[] value = new Object[2];
             value[0] = tsEntry.getTs();
             value[1] = tsEntry.getValueAsString();
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index bf033dc..36769d7 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -132,7 +132,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
                 break;
             case TO_SERVER_RPC_RESPONSE:
                 ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
-                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcResponse.getRequestId(),
+                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(),
                         rpcResponse);
                 break;
             case RULE_ENGINE_ERROR: