diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 2b77d08..3219d58 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -19,6 +19,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
@@ -39,6 +40,8 @@ import org.thingsboard.server.dao.util.SqlDao;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
@@ -80,7 +83,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
return findAllAsyncWithLimit(entityId, query);
} else {
long stepTs = query.getStartTs();
- List<ListenableFuture<TsKvEntry>> futures = new ArrayList<>();
+ List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
while (stepTs < query.getEndTs()) {
long startTs = stepTs;
long endTs = stepTs + query.getInterval();
@@ -88,16 +91,30 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
stepTs = endTs;
}
- return Futures.allAsList(futures);
+ ListenableFuture<List<Optional<TsKvEntry>>> future = Futures.allAsList(futures);
+ return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
+ @Nullable
+ @Override
+ public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> results) {
+ if (results == null || results.isEmpty()) {
+ return null;
+ }
+ return results.stream()
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ }
+ }, service);
}
}
- private ListenableFuture<TsKvEntry> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
- TsKvEntity entity;
+ private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
+ CompletableFuture<TsKvEntity> entity;
+ String entityIdStr = fromTimeUUID(entityId.getId());
switch (aggregation) {
case AVG:
entity = tsKvRepository.findAvg(
- fromTimeUUID(entityId.getId()),
+ entityIdStr,
entityId.getEntityType(),
key,
startTs,
@@ -106,7 +123,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
break;
case MAX:
entity = tsKvRepository.findMax(
- fromTimeUUID(entityId.getId()),
+ entityIdStr,
entityId.getEntityType(),
key,
startTs,
@@ -115,7 +132,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
break;
case MIN:
entity = tsKvRepository.findMin(
- fromTimeUUID(entityId.getId()),
+ entityIdStr,
entityId.getEntityType(),
key,
startTs,
@@ -124,7 +141,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
break;
case SUM:
entity = tsKvRepository.findSum(
- fromTimeUUID(entityId.getId()),
+ entityIdStr,
entityId.getEntityType(),
key,
startTs,
@@ -133,7 +150,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
break;
case COUNT:
entity = tsKvRepository.findCount(
- fromTimeUUID(entityId.getId()),
+ entityIdStr,
entityId.getEntityType(),
key,
startTs,
@@ -141,12 +158,32 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
break;
default:
- entity = null;
+ throw new IllegalArgumentException("Not supported aggregation type: " + aggregation);
}
- if (entity != null) {
- entity.setTs(ts);
- }
- return service.submit(() -> DaoUtil.getData(entity));
+
+ SettableFuture<TsKvEntity> listenableFuture = SettableFuture.create();
+ entity.whenComplete((tsKvEntity, throwable) -> {
+ if (throwable != null) {
+ listenableFuture.setException(throwable);
+ } else {
+ listenableFuture.set(tsKvEntity);
+ }
+ });
+ return Futures.transform(listenableFuture, new Function<TsKvEntity, Optional<TsKvEntry>>() {
+ @Nullable
+ @Override
+ public Optional<TsKvEntry> apply(@Nullable TsKvEntity entity) {
+ if (entity != null && entity.isNotEmpty()) {
+ entity.setEntityId(entityIdStr);
+ entity.setEntityType(entityId.getEntityType());
+ entity.setKey(key);
+ entity.setTs(ts);
+ return Optional.of(DaoUtil.getData(entity));
+ } else {
+ return Optional.empty();
+ }
+ }
+ });
}
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index b37d272..d1e206e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -26,7 +26,7 @@ import org.thingsboard.server.dao.model.sql.TsKvEntity;
import org.thingsboard.server.dao.util.SqlDao;
import java.util.List;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
@SqlDao
public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
@@ -45,17 +45,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
- Future<TsKvEntity> findMax(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ CompletableFuture<TsKvEntity> findMax(@Param("entityId") String entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
- Future<TsKvEntity> findMin(@Param("entityId") String entityId,
+ CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType,
@Param("entityKey") String entityKey,
@Param("startTs") long startTs,
@@ -65,7 +65,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
- Future<TsKvEntity> findCount(@Param("entityId") String entityId,
+ CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType,
@Param("entityKey") String entityKey,
@Param("startTs") long startTs,
@@ -75,7 +75,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
- Future<TsKvEntity> findAvg(@Param("entityId") String entityId,
+ CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType,
@Param("entityKey") String entityKey,
@Param("startTs") long startTs,
@@ -86,7 +86,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
- Future<TsKvEntity> findSum(@Param("entityId") String entityId,
+ CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType,
@Param("entityKey") String entityKey,
@Param("startTs") long startTs,