thingsboard-memoizeit

Refactored attributes dao

6/9/2017 1:07:30 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 7d986a1..701f585 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -93,8 +93,8 @@ public final class PluginProcessingContext implements PluginContext {
     @Override
     public void saveAttributes(final TenantId tenantId, final EntityId entityId, final String scope, final List<AttributeKvEntry> attributes, final PluginCallback<Void> callback) {
         validate(entityId, new ValidationCallback(callback, ctx -> {
-            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(entityId, scope, attributes);
-            Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
+            ListenableFuture<List<Void>> attrKvListFuture = pluginCtx.attributesService.save(entityId, scope, attributes);
+            Futures.addCallback(attrKvListFuture, getListCallback(callback, v -> {
                 if (entityId.getEntityType() == EntityType.DEVICE) {
                     onDeviceAttributesChanged(tenantId, new DeviceId(entityId.getId()), scope, attributes);
                 }
@@ -106,7 +106,7 @@ public final class PluginProcessingContext implements PluginContext {
     @Override
     public void removeAttributes(final TenantId tenantId, final EntityId entityId, final String scope, final List<String> keys, final PluginCallback<Void> callback) {
         validate(entityId, new ValidationCallback(callback, ctx -> {
-            ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(entityId, scope, keys);
+            ListenableFuture<List<Void>> future = pluginCtx.attributesService.removeAll(entityId, scope, keys);
             Futures.addCallback(future, getCallback(callback, v -> null), executor);
             if (entityId.getEntityType() == EntityType.DEVICE) {
                 onDeviceAttributesDeleted(tenantId, new DeviceId(entityId.getId()), keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
@@ -235,10 +235,10 @@ public final class PluginProcessingContext implements PluginContext {
         pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, scope, values));
     }
 
-    private <T> FutureCallback<List<ResultSet>> getListCallback(final PluginCallback<T> callback, Function<List<ResultSet>, T> transformer) {
-        return new FutureCallback<List<ResultSet>>() {
+    private <T, R> FutureCallback<List<T>> getListCallback(final PluginCallback<R> callback, Function<List<T>, R> transformer) {
+        return new FutureCallback<List<T>>() {
             @Override
-            public void onSuccess(@Nullable List<ResultSet> result) {
+            public void onSuccess(@Nullable List<T> result) {
                 pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
             }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
index ae58d4d..4f0af49 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
@@ -15,8 +15,6 @@
  */
 package org.thingsboard.server.dao.attributes;
 
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@@ -36,7 +34,7 @@ public interface AttributesDao {
 
     ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType);
 
-    ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
+    ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
 
-    ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys);
+    ListenableFuture<List<Void>> removeAll(EntityId entityId, String scope, List<String> keys);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
index 6bf9fb2..6090fa9 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
@@ -15,12 +15,8 @@
  */
 package org.thingsboard.server.dao.attributes;
 
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.id.UUIDBased;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 
 import java.util.Collection;
@@ -38,7 +34,7 @@ public interface AttributesService {
 
     ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope);
 
-    ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
+    ListenableFuture<List<Void>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
 
-    ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> attributeKeys);
+    ListenableFuture<List<Void>> removeAll(EntityId entityId, String scope, List<String> attributeKeys);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
index da46119..bef2faa 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
@@ -61,10 +61,10 @@ public class BaseAttributesService implements AttributesService {
     }
 
     @Override
-    public ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+    public ListenableFuture<List<Void>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
         validate(entityId, scope);
         attributes.forEach(attribute -> validate(attribute));
-        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(attributes.size());
+        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(attributes.size());
         for (AttributeKvEntry attribute : attributes) {
             futures.add(attributesDao.save(entityId, scope, attribute));
         }
@@ -72,7 +72,7 @@ public class BaseAttributesService implements AttributesService {
     }
 
     @Override
-    public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys) {
+    public ListenableFuture<List<Void>> removeAll(EntityId entityId, String scope, List<String> keys) {
         validate(entityId, scope);
         return attributesDao.removeAll(entityId, scope, keys);
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
index db5babe..95f9270 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
@@ -101,7 +101,7 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
     }
 
     @Override
-    public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
+    public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
         BoundStatement stmt = getSaveStmt().bind();
         stmt.setString(0, entityId.getEntityType().name());
         stmt.setUUID(1, entityId.getId());
@@ -124,23 +124,31 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
         } else {
             stmt.setToNull(8);
         }
-        return executeAsyncWrite(stmt);
+        log.trace("Generated save stmt [{}] for entityId {} and attributeType {} and attribute", stmt, entityId, attributeType, attribute);
+        return getFuture(executeAsyncWrite(stmt), rs -> getAttributeKvEntryFromRs(rs));
+    }
+
+    private Void getAttributeKvEntryFromRs(ResultSet rs) {
+        return null;
     }
 
     @Override
-    public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String attributeType, List<String> keys) {
-        List<ResultSetFuture> futures = keys.stream().map(key -> delete(entityId, attributeType, key)).collect(Collectors.toList());
+    public ListenableFuture<List<Void>> removeAll(EntityId entityId, String attributeType, List<String> keys) {
+        List<ListenableFuture<Void>> futures = keys
+                .stream()
+                .map(key -> delete(entityId, attributeType, key))
+                .collect(Collectors.toList());
         return Futures.allAsList(futures);
     }
 
-    private ResultSetFuture delete(EntityId entityId, String attributeType, String key) {
+    private ListenableFuture<Void> delete(EntityId entityId, String attributeType, String key) {
         Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF)
                 .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
                 .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
                 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
                 .and(eq(ATTRIBUTE_KEY_COLUMN, key));
         log.debug("Remove request: {}", delete.toString());
-        return getSession().executeAsync(delete);
+        return getFuture(getSession().executeAsync(delete), rs -> getAttributeKvEntryFromRs(rs));
     }
 
     private PreparedStatement getSaveStmt() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractAsyncDao.java
index f241f2c..d61218f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractAsyncDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractAsyncDao.java
@@ -15,6 +15,13 @@
  */
 package org.thingsboard.server.dao;
 
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import javax.annotation.Nullable;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.concurrent.ExecutorService;
@@ -39,4 +46,13 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
         }
     }
 
+    protected <T> ListenableFuture<T> getFuture(ResultSetFuture future, java.util.function.Function<ResultSet, T> transformer) {
+        return Futures.transform(future, new Function<ResultSet, T>() {
+            @Nullable
+            @Override
+            public T apply(@Nullable ResultSet input) {
+                return transformer.apply(input);
+            }
+        }, readResultsProcessingExecutor);
+    }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractDao.java
index bd7346a..8bb44bf 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractDao.java
@@ -66,7 +66,6 @@ public abstract class CassandraAbstractDao {
         return execute(statement, defaultWriteLevel);
     }
 
-
     protected ResultSetFuture executeAsyncRead(Statement statement) {
         return executeAsync(statement, defaultReadLevel);
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
index 8415bc9..0fdd6ca 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
@@ -298,16 +298,6 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
         return getFuture(rsFuture, rs -> rs != null ? rs.wasApplied() : false);
     }
 
-    private <T> ListenableFuture<T> getFuture(ResultSetFuture future, java.util.function.Function<ResultSet, T> transformer) {
-        return Futures.transform(future, new Function<ResultSet, T>() {
-            @Nullable
-            @Override
-            public T apply(@Nullable ResultSet input) {
-                return transformer.apply(input);
-            }
-        }, readResultsProcessingExecutor);
-    }
-
     private List<EntityRelation> getEntityRelations(ResultSet rs) {
         List<Row> rows = rs.all();
         List<EntityRelation> entries = new ArrayList<>(rows.size());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributesDao.java
index eb4a639..d3a111f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributesDao.java
@@ -15,8 +15,6 @@
  */
 package org.thingsboard.server.dao.sql.attributes;
 
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -50,12 +48,12 @@ public class JpaAttributesDao implements AttributesDao {
     }
 
     @Override
-    public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
+    public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
         return null;
     }
 
     @Override
-    public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys) {
+    public ListenableFuture<List<Void>> removeAll(EntityId entityId, String scope, List<String> keys) {
         return null;
     }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 8590876..ae7573e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -54,12 +54,12 @@ public class JpaTimeseriesDao implements TimeseriesDao {
     }
 
     @Override
-    public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry) {
+    public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
         return null;
     }
 
     @Override
-    public ResultSetFuture savePartition(EntityId entityId, long partition, String key) {
+    public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
         return null;
     }