thingsboard-aplcache

Tmp commit

2/21/2017 12:30:24 PM

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 70bb4f2..f9c4e6f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -195,9 +195,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     }
 
     void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
-        //TODO: improve this procedure to fetch only changed attributes.
+        //TODO: improve this procedure to fetch only changed attributes and support attributes deletion
         refreshAttributes();
-        //TODO: support attributes deletion
         Set<AttributeKey> keys = msg.getKeys();
         if (attributeSubscriptions.size() > 0) {
             ToDeviceMsg notification = null;
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 9474a62..98149c2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -56,6 +56,7 @@ import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRe
 import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
 
 import akka.actor.ActorRef;
+import org.w3c.dom.Attr;
 
 import javax.annotation.Nullable;
 
@@ -91,49 +92,86 @@ public final class PluginProcessingContext implements PluginContext {
     @Override
     public void saveAttributes(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
         validate(deviceId);
-        Set<AttributeKey> keys = new HashSet<>();
-        for (AttributeKvEntry attribute : attributes) {
-            keys.add(new AttributeKey(scope, attribute.getKey()));
-        }
 
         ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
         Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
-            onDeviceAttributesChanged(deviceId, keys);
+            onDeviceAttributesChanged(deviceId, scope, attributes);
             return null;
         }), executor);
     }
 
     @Override
-    public Optional<AttributeKvEntry> loadAttribute(DeviceId deviceId, String attributeType, String attributeKey) {
+    public void removeAttributes(DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
         validate(deviceId);
-        AttributeKvEntry attribute = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
-        return Optional.ofNullable(attribute);
+        ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
+        Futures.addCallback(future, getCallback(callback, v -> null), executor);
+        onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
     }
 
     @Override
-    public List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType, List<String> attributeKeys) {
+    public void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
         validate(deviceId);
-        List<AttributeKvEntry> result = new ArrayList<>(attributeKeys.size());
-        for (String attributeKey : attributeKeys) {
-            AttributeKvEntry attribute = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
-            if (attribute != null) {
-                result.add(attribute);
-            }
-        }
-        return result;
+
+        ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
+        Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
+            onDeviceAttributesChanged(tenantId, deviceId, scope, attributes);
+            return null;
+        }), executor);
+    }
+
+    @Override
+    public void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
+        validate(deviceId);
+        ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
+        Futures.addCallback(future, getCallback(callback, v -> null), executor);
+        onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
+    }
+
+    @Override
+    public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback) {
+        validate(deviceId);
+        ListenableFuture<Optional<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
+        Futures.addCallback(future, getCallback(callback, v -> v), executor);
     }
 
     @Override
-    public List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType) {
+    public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
         validate(deviceId);
-        return pluginCtx.attributesService.findAll(deviceId, attributeType);
+        ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys);
+        Futures.addCallback(future, getCallback(callback, v -> v), executor);
     }
 
     @Override
-    public void removeAttributes(DeviceId deviceId, String scope, List<String> keys) {
+    public void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback) {
         validate(deviceId);
-        pluginCtx.attributesService.removeAll(deviceId, scope, keys);
-        onDeviceAttributesDeleted(deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
+        ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.findAll(deviceId, attributeType);
+        Futures.addCallback(future, getCallback(callback, v -> v), executor);
+    }
+
+    @Override
+    public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback) {
+        validate(deviceId);
+        List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+        attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType)));
+        convertFuturesAndAddCallback(callback, futures);
+    }
+
+    @Override
+    public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
+        validate(deviceId);
+        List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+        attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys)));
+        convertFuturesAndAddCallback(callback, futures);
+    }
+
+    private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
+        ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
+                (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
+                    List<AttributeKvEntry> result = new ArrayList<>();
+                    input.forEach(r -> result.addAll(r));
+                    return result;
+                }, executor);
+        Futures.addCallback(future, getCallback(callback, v -> v), executor);
     }
 
     @Override
@@ -205,18 +243,12 @@ public final class PluginProcessingContext implements PluginContext {
         return securityCtx;
     }
 
-    private void onDeviceAttributesChanged(DeviceId deviceId, AttributeKey key) {
-        onDeviceAttributesChanged(deviceId, Collections.singleton(key));
+    private void onDeviceAttributesDeleted(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
+        pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onDelete(tenantId, deviceId, keys));
     }
 
-    private void onDeviceAttributesDeleted(DeviceId deviceId, Set<AttributeKey> keys) {
-        Device device = pluginCtx.deviceService.findDeviceById(deviceId);
-        pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), deviceId, keys));
-    }
-
-    private void onDeviceAttributesChanged(DeviceId deviceId, Set<AttributeKey> keys) {
-        Device device = pluginCtx.deviceService.findDeviceById(deviceId);
-        pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(device.getTenantId(), deviceId, keys));
+    private void onDeviceAttributesChanged(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> values) {
+        pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, scope, values));
     }
 
     private <T> FutureCallback<List<ResultSet>> getListCallback(final PluginCallback<T> callback, Function<List<ResultSet>, T> transformer) {
@@ -256,11 +288,12 @@ public final class PluginProcessingContext implements PluginContext {
     }
 
     // TODO: replace with our own exceptions
-    private boolean validate(DeviceId deviceId) {
+    private boolean validate(DeviceId deviceId, PluginCallback<Device> callback) {
         if (securityCtx.isPresent()) {
-            PluginApiCallSecurityContext ctx = securityCtx.get();
+            final PluginApiCallSecurityContext ctx = securityCtx.get();
             if (ctx.isTenantAdmin() || ctx.isCustomerUser()) {
-                Device device = pluginCtx.deviceService.findDeviceById(deviceId);
+                ListenableFuture<Device> device = pluginCtx.deviceService.findDeviceById(deviceId);
+                Futures.addCallback(device, );
                 if (device == null) {
                     throw new IllegalStateException("Device not found!");
                 } else {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java
new file mode 100644
index 0000000..9b9368d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public abstract class AbstractAsyncDao extends AbstractDao {
+
+    protected ExecutorService readResultsProcessingExecutor;
+
+    @PostConstruct
+    public void startExecutor() {
+        readResultsProcessingExecutor = Executors.newCachedThreadPool();
+    }
+
+    @PreDestroy
+    public void stopExecutor() {
+        if (readResultsProcessingExecutor != null) {
+            readResultsProcessingExecutor.shutdownNow();
+        }
+    }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
index ead2c04..ae58d4d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
@@ -15,23 +15,28 @@
  */
 package org.thingsboard.server.dao.attributes;
 
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 
+import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
+import java.util.Optional;
 
 /**
  * @author Andrew Shvayka
  */
 public interface AttributesDao {
 
-    AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey);
+    ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String attributeType, String attributeKey);
 
-    List<AttributeKvEntry> findAll(EntityId entityId, String attributeType);
+    ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String attributeType, Collection<String> attributeKey);
+
+    ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType);
 
     ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
 
-    void removeAll(EntityId entityId, String scope, List<String> keys);
+    ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
index 5a1fd70..6bf9fb2 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
@@ -23,18 +23,22 @@ import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.UUIDBased;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * @author Andrew Shvayka
  */
 public interface AttributesService {
 
-    AttributeKvEntry find(EntityId entityId, String scope, String attributeKey);
+    ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String scope, String attributeKey);
 
-    List<AttributeKvEntry> findAll(EntityId entityId, String scope);
+    ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String scope, Collection<String> attributeKeys);
+
+    ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope);
 
     ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
 
-    void removeAll(EntityId entityId, String scope, List<String> attributeKeys);
+    ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> attributeKeys);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
index 5148a12..262d15d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
@@ -18,19 +18,24 @@ package org.thingsboard.server.dao.attributes;
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.kv.DataType;
-import org.thingsboard.server.dao.AbstractDao;
+import org.thingsboard.server.dao.AbstractAsyncDao;
 import org.thingsboard.server.dao.model.ModelConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.dao.timeseries.BaseTimeseriesDao;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.thingsboard.server.dao.model.ModelConstants.*;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
@@ -40,29 +45,55 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
  */
 @Component
 @Slf4j
-public class BaseAttributesDao extends AbstractDao implements AttributesDao {
-    
+public class BaseAttributesDao extends AbstractAsyncDao implements AttributesDao {
+
     private PreparedStatement saveStmt;
 
+    @PostConstruct
+    public void init() {
+        super.startExecutor();
+    }
+
+    @PreDestroy
+    public void stop() {
+        super.stopExecutor();
+    }
+
     @Override
-    public AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey) {
+    public ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String attributeType, String attributeKey) {
         Select.Where select = select().from(ATTRIBUTES_KV_CF)
                 .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
                 .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
                 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
                 .and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey));
         log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey);
-        return convertResultToAttributesKvEntry(attributeKey, executeRead(select).one());
+        return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends Optional<AttributeKvEntry>>) input ->
+                        Optional.of(convertResultToAttributesKvEntry(attributeKey, input.one()))
+                , readResultsProcessingExecutor);
     }
 
     @Override
-    public List<AttributeKvEntry> findAll(EntityId entityId, String attributeType) {
+    public ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String attributeType, Collection<String> attributeKeys) {
+        List<ListenableFuture<Optional<AttributeKvEntry>>> entries = new ArrayList<>();
+        attributeKeys.forEach(attributeKey -> entries.add(find(entityId, attributeType, attributeKey)));
+        return Futures.transform(Futures.allAsList(entries), (Function<List<Optional<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
+            List<AttributeKvEntry> result = new ArrayList<>();
+            input.stream().filter(opt -> opt.isPresent()).forEach(opt -> result.add(opt.get()));
+            return result;
+        }, readResultsProcessingExecutor);
+    }
+
+
+    @Override
+    public ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType) {
         Select.Where select = select().from(ATTRIBUTES_KV_CF)
                 .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
                 .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
                 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType));
         log.trace("Generated query [{}] for entityId {} and attributeType {}", select, entityId, attributeType);
-        return convertResultToAttributesKvEntryList(executeRead(select));
+        return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends List<AttributeKvEntry>>) input ->
+                        convertResultToAttributesKvEntryList(input)
+                , readResultsProcessingExecutor);
     }
 
     @Override
@@ -93,20 +124,19 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
     }
 
     @Override
-    public void removeAll(EntityId entityId, String attributeType, List<String> keys) {
-        for (String key : keys) {
-            delete(entityId, attributeType, key);
-        }
+    public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String attributeType, List<String> keys) {
+        List<ResultSetFuture> futures = keys.stream().map(key -> delete(entityId, attributeType, key)).collect(Collectors.toList());
+        return Futures.allAsList(futures);
     }
 
-    private void delete(EntityId entityId, String attributeType, String key) {
+    private ResultSetFuture delete(EntityId entityId, String attributeType, String key) {
         Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF)
                 .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
                 .and(eq(ENTITY_ID_COLUMN, entityId.getId()))
                 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
                 .and(eq(ATTRIBUTE_KEY_COLUMN, key));
         log.debug("Remove request: {}", delete.toString());
-        getSession().execute(delete);
+        return getSession().executeAsync(delete);
     }
 
     private PreparedStatement getSaveStmt() {
@@ -150,5 +180,4 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
         }
         return entries;
     }
-
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
index d3a1cb3..4361241 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
@@ -27,7 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.dao.service.Validator;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * @author Andrew Shvayka
@@ -39,14 +41,21 @@ public class BaseAttributesService implements AttributesService {
     private AttributesDao attributesDao;
 
     @Override
-    public AttributeKvEntry find(EntityId entityId, String scope, String attributeKey) {
+    public ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String scope, String attributeKey) {
         validate(entityId, scope);
         Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
         return attributesDao.find(entityId, scope, attributeKey);
     }
 
     @Override
-    public List<AttributeKvEntry> findAll(EntityId entityId, String scope) {
+    public ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String scope, Collection<String> attributeKeys) {
+        validate(entityId, scope);
+        attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
+        return attributesDao.find(entityId, scope, attributeKeys);
+    }
+
+    @Override
+    public ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope) {
         validate(entityId, scope);
         return attributesDao.findAll(entityId, scope);
     }
@@ -56,16 +65,16 @@ public class BaseAttributesService implements AttributesService {
         validate(entityId, scope);
         attributes.forEach(attribute -> validate(attribute));
         List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(attributes.size());
-        for(AttributeKvEntry attribute : attributes) {
+        for (AttributeKvEntry attribute : attributes) {
             futures.add(attributesDao.save(entityId, scope, attribute));
         }
         return Futures.allAsList(futures);
     }
 
     @Override
-    public void removeAll(EntityId entityId, String scope, List<String> keys) {
+    public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys) {
         validate(entityId, scope);
-        attributesDao.removeAll(entityId, scope, keys);
+        return attributesDao.removeAll(entityId, scope, keys);
     }
 
     private static void validate(EntityId id, String scope) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
index 8d780b6..b760f55 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.dao.device;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.CustomerId;
 import org.thingsboard.server.common.data.id.DeviceId;
@@ -28,6 +29,8 @@ public interface DeviceService {
     
     Device findDeviceById(DeviceId deviceId);
 
+    ListenableFuture<Device> findDeviceByIdAsync(DeviceId deviceId);
+
     Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name);
 
     Device saveDevice(Device device);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 81bbdf9..42fede4 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.common.data.kv.DataType;
+import org.thingsboard.server.dao.AbstractAsyncDao;
 import org.thingsboard.server.dao.AbstractDao;
 import org.thingsboard.server.dao.model.ModelConstants;
 
@@ -50,7 +51,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
  */
 @Component
 @Slf4j
-public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
+public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao {
 
     @Value("${cassandra.query.min_aggregation_step_ms}")
     private int minAggregationStepMs;
@@ -60,8 +61,6 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
 
     private TsPartitionDate tsFormat;
 
-    private ExecutorService readResultsProcessingExecutor;
-
     private PreparedStatement partitionInsertStmt;
     private PreparedStatement[] latestInsertStmts;
     private PreparedStatement[] saveStmts;
@@ -71,8 +70,8 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
 
     @PostConstruct
     public void init() {
+        super.startExecutor();
         getFetchStmt(Aggregation.NONE);
-        readResultsProcessingExecutor = Executors.newCachedThreadPool();
         Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
         if (partition.isPresent()) {
             tsFormat = partition.get();
@@ -84,9 +83,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
 
     @PreDestroy
     public void stop() {
-        if (readResultsProcessingExecutor != null) {
-            readResultsProcessingExecutor.shutdownNow();
-        }
+        super.stopExecutor();
     }
 
     @Override
diff --git a/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java
index 8fb5d86..559a313 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java
@@ -32,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
 import static org.thingsboard.server.common.data.DataConstants.DEVICE;
@@ -54,8 +55,9 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
         KvEntry attrValue = new StringDataEntry("attribute1", "value1");
         AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
         attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attr)).get();
-        AttributeKvEntry saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attr.getKey());
-        Assert.assertEquals(attr, saved);
+        Optional<AttributeKvEntry> saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attr.getKey()).get();
+        Assert.assertTrue(saved.isPresent());
+        Assert.assertEquals(attr, saved.get());
     }
 
     @Test
@@ -65,15 +67,17 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
         AttributeKvEntry attrOld = new BaseAttributeKvEntry(attrOldValue, 42L);
 
         attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrOld)).get();
-        AttributeKvEntry saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey());
-        Assert.assertEquals(attrOld, saved);
+        Optional<AttributeKvEntry> saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()).get();
+
+        Assert.assertTrue(saved.isPresent());
+        Assert.assertEquals(attrOld, saved.get());
 
         KvEntry attrNewValue = new StringDataEntry("attribute1", "value2");
         AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L);
         attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrNew)).get();
 
-        saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey());
-        Assert.assertEquals(attrNew, saved);
+        saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()).get();
+        Assert.assertEquals(attrNew, saved.get());
     }
 
     @Test
@@ -91,7 +95,7 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
         attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrANew)).get();
         attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrBNew)).get();
 
-        List<AttributeKvEntry> saved = attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE);
+        List<AttributeKvEntry> saved = attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE).get();
 
         Assert.assertNotNull(saved);
         Assert.assertEquals(2, saved.size());
diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
index 13c25c3..fd16b75 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
@@ -114,8 +114,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
         entries.add(save(deviceId, 45000, 500));
         entries.add(save(deviceId, 55000, 600));
 
-        List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 3, Aggregation.NONE)).get();
+        List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.NONE))).get();
         assertEquals(3, list.size());
         assertEquals(55000, list.get(0).getTs());
         assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
@@ -126,8 +126,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(35000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
 
-        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 3, Aggregation.AVG)).get();
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.AVG))).get();
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
         assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
@@ -138,8 +138,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
 
-        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 3, Aggregation.SUM)).get();
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.SUM))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -151,8 +151,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
 
-        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 3, Aggregation.MIN)).get();
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.MIN))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -164,8 +164,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
 
-        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 3, Aggregation.MAX)).get();
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.MAX))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -177,8 +177,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
 
-        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 3, Aggregation.COUNT)).get();
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.COUNT))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 210d2c0..a4eb3d0 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster
 
 cassandra.keyspace_name=thingsboard
 
-cassandra.url=127.0.0.1:9042
+cassandra.url=127.0.0.1:9142
 
 cassandra.ssl=false
 
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index c2c5587..64d4d53 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -94,13 +94,21 @@ public interface PluginContext {
 
     void saveAttributes(DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
 
-    Optional<AttributeKvEntry> loadAttribute(DeviceId deviceId, String attributeType, String attributeKey);
+    void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
 
-    List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType, List<String> attributeKeys);
+    void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
 
-    List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType);
+    void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
 
-    void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys);
+    void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback);
+
+    void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
+
+    void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback);
+
+    void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback);
+
+    void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
 
     void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback<List<Device>> callback);
 }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java
new file mode 100644
index 0000000..bc5285c
--- /dev/null
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.core.plugin.telemetry.handlers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+@Slf4j
+public abstract class BiPluginCallBack<V1, V2> {
+
+    private V1 v1;
+    private V2 v2;
+
+    public PluginCallback<V1> getV1Callback() {
+        return new PluginCallback<V1>() {
+            @Override
+            public void onSuccess(PluginContext ctx, V1 value) {
+                synchronized (BiPluginCallBack.this) {
+                    BiPluginCallBack.this.v1 = value;
+                    if (v2 != null) {
+                        BiPluginCallBack.this.onSuccess(ctx, v1, v2);
+                    }
+                }
+            }
+
+            @Override
+            public void onFailure(PluginContext ctx, Exception e) {
+                BiPluginCallBack.this.onFailure(ctx, e);
+            }
+        };
+    }
+
+    public PluginCallback<V2> getV2Callback() {
+        return new PluginCallback<V2>() {
+            @Override
+            public void onSuccess(PluginContext ctx, V2 value) {
+                synchronized (BiPluginCallBack.this) {
+                    BiPluginCallBack.this.v2 = value;
+                    if (v1 != null) {
+                        BiPluginCallBack.this.onSuccess(ctx, v1, v2);
+                    }
+                }
+
+            }
+
+            @Override
+            public void onFailure(PluginContext ctx, Exception e) {
+                BiPluginCallBack.this.onFailure(ctx, e);
+            }
+        };
+    }
+
+    abstract public void onSuccess(PluginContext ctx, V1 v1, V2 v2);
+
+    abstract public void onFailure(PluginContext ctx, Exception e);
+
+}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index e93f0b5..28d8b7c 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -77,41 +77,58 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                         }
                     });
                 } else if (entity.equals("attributes")) {
-                    List<AttributeKvEntry> attributes;
+                    PluginCallback<List<AttributeKvEntry>> callback = getAttributeKeysPluginCallback(msg);
                     if (!StringUtils.isEmpty(scope)) {
-                        attributes = ctx.loadAttributes(deviceId, scope);
+                        ctx.loadAttributes(deviceId, scope, callback);
                     } else {
-                        attributes = new ArrayList<>();
-                        Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s)));
+                        ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
                     }
-                    List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
-                    msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
                 }
             } else if (method.equals("values")) {
                 if ("timeseries".equals(entity)) {
-                    String keys = request.getParameter("keys");
+                    String keysStr = request.getParameter("keys");
                     Optional<Long> startTs = request.getLongParamValue("startTs");
                     Optional<Long> endTs = request.getLongParamValue("endTs");
                     Optional<Integer> limit = request.getIntParamValue("limit");
-                    Map<String, List<TsData>> data = new LinkedHashMap<>();
-                    for (String key : keys.split(",")) {
-                        //TODO: refactoring
-//                        List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
-//                        data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
-                    }
-                    msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
+                    Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
+
+                    List<String> keys = Arrays.asList(keysStr.split(","));
+                    List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.get(), agg)).collect(Collectors.toList());
+                    ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
+                        @Override
+                        public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
+                            Map<String, List<TsData>> result = new LinkedHashMap<>();
+                            for (TsKvEntry entry : data) {
+                                result.put(entry.getKey(), data.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
+                            }
+                            msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
+                        }
+
+                        @Override
+                        public void onFailure(PluginContext ctx, Exception e) {
+                            log.error("Failed to fetch historical data", e);
+                            msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+                        }
+                    });
                 } else if ("attributes".equals(entity)) {
                     String keys = request.getParameter("keys", "");
-                    List<AttributeKvEntry> attributes;
+
+                    PluginCallback<List<AttributeKvEntry>> callback = getAttributeValuesPluginCallback(msg);
                     if (!StringUtils.isEmpty(scope)) {
-                        attributes = getAttributeKvEntries(ctx, scope, deviceId, keys);
+                        if (!StringUtils.isEmpty(keys)) {
+                            List<String> keyList = Arrays.asList(keys.split(","));
+                            ctx.loadAttributes(deviceId, scope, keyList, callback);
+                        } else {
+                            ctx.loadAttributes(deviceId, scope, callback);
+                        }
                     } else {
-                        attributes = new ArrayList<>();
-                        Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys)));
+                        if (!StringUtils.isEmpty(keys)) {
+                            List<String> keyList = Arrays.asList(keys.split(","));
+                            ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), keyList, callback);
+                        } else {
+                            ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
+                        }
                     }
-                    List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
-                            attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
-                    msg.getResponseHolder().setResult(new ResponseEntity<>(values, HttpStatus.OK));
                 }
             }
         } else {
@@ -156,6 +173,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 
                                 @Override
                                 public void onFailure(PluginContext ctx, Exception e) {
+                                    log.error("Failed to save attributes", e);
                                     msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
                                 }
                             });
@@ -184,8 +202,18 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                     String keysParam = request.getParameter("keys");
                     if (!StringUtils.isEmpty(keysParam)) {
                         String[] keys = keysParam.split(",");
-                        ctx.removeAttributes(deviceId, scope, Arrays.asList(keys));
-                        msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
+                        ctx.removeAttributes(deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
+                            @Override
+                            public void onSuccess(PluginContext ctx, Void value) {
+                                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
+                            }
+
+                            @Override
+                            public void onFailure(PluginContext ctx, Exception e) {
+                                log.error("Failed to remove attributes", e);
+                                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+                            }
+                        });
                         return;
                     }
                 }
@@ -196,14 +224,37 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
         msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
     }
 
-    private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, String scope, DeviceId deviceId, String keysParam) {
-        List<AttributeKvEntry> attributes;
-        if (!StringUtils.isEmpty(keysParam)) {
-            String[] keys = keysParam.split(",");
-            attributes = ctx.loadAttributes(deviceId, scope, Arrays.asList(keys));
-        } else {
-            attributes = ctx.loadAttributes(deviceId, scope);
-        }
-        return attributes;
+
+    private PluginCallback<List<AttributeKvEntry>> getAttributeKeysPluginCallback(final PluginRestMsg msg) {
+        return new PluginCallback<List<AttributeKvEntry>>() {
+            @Override
+            public void onSuccess(PluginContext ctx, List<AttributeKvEntry> attributes) {
+                List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
+                msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
+            }
+
+            @Override
+            public void onFailure(PluginContext ctx, Exception e) {
+                log.error("Failed to fetch attributes", e);
+                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+            }
+        };
+    }
+
+    private PluginCallback<List<AttributeKvEntry>> getAttributeValuesPluginCallback(final PluginRestMsg msg) {
+        return new PluginCallback<List<AttributeKvEntry>>() {
+            @Override
+            public void onSuccess(PluginContext ctx, List<AttributeKvEntry> attributes) {
+                List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
+                        attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
+                msg.getResponseHolder().setResult(new ResponseEntity<>(values, HttpStatus.OK));
+            }
+
+            @Override
+            public void onFailure(PluginContext ctx, Exception e) {
+                log.error("Failed to fetch attributes", e);
+                msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+            }
+        };
     }
 }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
index 85a25da..d9bfba0 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.extensions.core.plugin.telemetry.handlers;
 
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.RuleId;
@@ -38,6 +39,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
 import java.util.*;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
     private final SubscriptionManager subscriptionManager;
 
@@ -49,27 +51,36 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
     public void handleGetAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, GetAttributesRequestRuleToPluginMsg msg) {
         GetAttributesRequest request = msg.getPayload();
 
-        List<AttributeKvEntry> clientAttributes = getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
-        List<AttributeKvEntry> sharedAttributes = getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.SHARED_SCOPE, request.getSharedAttributeNames());
+        BiPluginCallBack<List<AttributeKvEntry>, List<AttributeKvEntry>> callback = new BiPluginCallBack<List<AttributeKvEntry>, List<AttributeKvEntry>>() {
 
-        BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
-                request.getRequestId(), BasicAttributeKVMsg.from(clientAttributes, sharedAttributes));
+            @Override
+            public void onSuccess(PluginContext ctx, List<AttributeKvEntry> clientAttributes, List<AttributeKvEntry> sharedAttributes) {
+                BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
+                        request.getRequestId(), BasicAttributeKVMsg.from(clientAttributes, sharedAttributes));
+                ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
+            }
+
+            @Override
+            public void onFailure(PluginContext ctx, Exception e) {
+                log.error("Failed to process get attributes request", e);
+                ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
+            }
+        };
 
-        ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
+        getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getClientAttributeNames(), callback.getV1Callback());
+        getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.SHARED_SCOPE, request.getSharedAttributeNames(), callback.getV2Callback());
     }
 
-    private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names) {
-        List<AttributeKvEntry> attributes;
+    private void getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names, PluginCallback<List<AttributeKvEntry>> callback) {
         if (names.isPresent()) {
             if (!names.get().isEmpty()) {
-                attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()));
+                ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()), callback);
             } else {
-                attributes = ctx.loadAttributes(deviceId, scope);
+                ctx.loadAttributes(deviceId, scope, callback);
             }
         } else {
-            attributes = Collections.emptyList();
+            callback.onSuccess(ctx, Collections.emptyList());
         }
-        return attributes;
     }
 
     @Override
@@ -100,6 +111,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
 
             @Override
             public void onFailure(PluginContext ctx, Exception e) {
+                log.error("Failed to process telemetry upload request", e);
                 ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
             }
         });
@@ -127,6 +139,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
 
                     @Override
                     public void onFailure(PluginContext ctx, Exception e) {
+                        log.error("Failed to process attributes update request", e);
                         ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
                     }
                 });
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 739bedf..dd30f99 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -104,37 +104,64 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 SubscriptionState sub;
                 if (keysOptional.isPresent()) {
                     List<String> keys = new ArrayList<>(keysOptional.get());
-                    List<AttributeKvEntry> data = new ArrayList<>();
+
+                    PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
+                        @Override
+                        public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
+                            List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
+                            sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+
+                            Map<String, Long> subState = new HashMap<>(keys.size());
+                            keys.forEach(key -> subState.put(key, 0L));
+                            attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
+
+                            SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
+                            subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
+                        }
+
+                        @Override
+                        public void onFailure(PluginContext ctx, Exception e) {
+                            log.error("Failed to fetch attributes!", e);
+                            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                                    "Failed to fetch attributes!");
+                            sendWsMsg(ctx, sessionRef, update);
+                        }
+                    };
+
                     if (StringUtils.isEmpty(cmd.getScope())) {
-                        Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
+                        ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), keys, callback);
                     } else {
-                        data.addAll(ctx.loadAttributes(deviceId, cmd.getScope(), keys));
+                        ctx.loadAttributes(deviceId, cmd.getScope(), keys, callback);
                     }
+                } else {
+                    PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
+                        @Override
+                        public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
+                            List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
+                            sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
 
-                    List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
-                    sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+                            Map<String, Long> subState = new HashMap<>(attributesData.size());
+                            attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 
-                    Map<String, Long> subState = new HashMap<>(keys.size());
-                    keys.forEach(key -> subState.put(key, 0L));
-                    attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
+                            SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
+                            subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
+                        }
+
+                        @Override
+                        public void onFailure(PluginContext ctx, Exception e) {
+                            log.error("Failed to fetch attributes!", e);
+                            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                                    "Failed to fetch attributes!");
+                            sendWsMsg(ctx, sessionRef, update);
+                        }
+                    };
 
-                    sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
-                } else {
-                    List<AttributeKvEntry> data = new ArrayList<>();
                     if (StringUtils.isEmpty(cmd.getScope())) {
-                        Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
+                        ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
                     } else {
-                        data.addAll(ctx.loadAttributes(deviceId, cmd.getScope()));
+                        ctx.loadAttributes(deviceId, cmd.getScope(), callback);
                     }
-                    List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
-                    sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
-
-                    Map<String, Long> subState = new HashMap<>(attributesData.size());
-                    attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
-
-                    sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
                 }
-                subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
             }
         }
     }
@@ -205,6 +232,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 
             @Override
             public void onFailure(PluginContext ctx, Exception e) {
+                log.error("Failed to fetch data!", e);
                 SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
                         "Failed to fetch data!");
                 sendWsMsg(ctx, sessionRef, update);
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index 0624876..ea7a185 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -15,12 +15,14 @@
  */
 package org.thingsboard.server.extensions.core.plugin.telemetry;
 
+import com.sun.javafx.collections.MappingChange;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
 import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryRpcMsgHandler;
 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryWebsocketMsgHandler;
@@ -66,28 +68,49 @@ public class SubscriptionManager {
         DeviceId deviceId = subscription.getDeviceId();
         log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), deviceId, address);
         registerSubscription(sessionId, deviceId, subscription);
-        List<TsKvEntry> missedUpdates = new ArrayList<>();
         if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
-            subscription.getKeyStates().entrySet().forEach(e -> {
-                        Optional<AttributeKvEntry> latestOpt = ctx.loadAttribute(deviceId, DataConstants.CLIENT_SCOPE, e.getKey());
-                        if (latestOpt.isPresent()) {
-                            AttributeKvEntry latestEntry = latestOpt.get();
-                            if (latestEntry.getLastUpdateTs() > e.getValue()) {
-                                missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
-                            }
+            final Map<String, Long> keyStates = subscription.getKeyStates();
+            ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keyStates.keySet(), new PluginCallback<List<AttributeKvEntry>>() {
+                @Override
+                public void onSuccess(PluginContext ctx, List<AttributeKvEntry> values) {
+                    List<TsKvEntry> missedUpdates = new ArrayList<>();
+                    values.forEach(latestEntry -> {
+                        if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) {
+                            missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
                         }
+                    });
+                    if (!missedUpdates.isEmpty()) {
+                        rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
                     }
-            );
+                }
+
+                @Override
+                public void onFailure(PluginContext ctx, Exception e) {
+                    log.error("Failed to fetch missed updates.", e);
+                }
+            });
         } else if (subscription.getType() == SubscriptionType.TIMESERIES) {
             long curTs = System.currentTimeMillis();
+            List<TsKvQuery> queries = new ArrayList<>();
             subscription.getKeyStates().entrySet().forEach(e -> {
-                TsKvQuery query = new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs);
-                missedUpdates.addAll(ctx.loadTimeseries(deviceId, query));
+                queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
+            });
+
+            ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
+                @Override
+                public void onSuccess(PluginContext ctx, List<TsKvEntry> missedUpdates) {
+                    if (!missedUpdates.isEmpty()) {
+                        rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+                    }
+                }
+
+                @Override
+                public void onFailure(PluginContext ctx, Exception e) {
+                    log.error("Failed to fetch missed updates.", e);
+                }
             });
         }
-        if (!missedUpdates.isEmpty()) {
-            rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
-        }
+
     }
 
     private void registerSubscription(String sessionId, DeviceId deviceId, Subscription subscription) {