thingsboard-memoizeit

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 98149c2..b102226 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -90,133 +90,115 @@ public final class PluginProcessingContext implements PluginContext {
     }
 
     @Override
-    public void saveAttributes(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
-        validate(deviceId);
-
-        ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
-        Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
-            onDeviceAttributesChanged(deviceId, scope, attributes);
-            return null;
-        }), executor);
+    public void saveAttributes(final TenantId tenantId, final DeviceId deviceId, final String scope, final List<AttributeKvEntry> attributes, final PluginCallback<Void> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
+            Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
+                onDeviceAttributesChanged(tenantId, deviceId, scope, attributes);
+                return null;
+            }), executor);
+        }));
     }
 
     @Override
-    public void removeAttributes(DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
-        validate(deviceId);
-        ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
-        Futures.addCallback(future, getCallback(callback, v -> null), executor);
-        onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
+    public void removeAttributes(final TenantId tenantId, final DeviceId deviceId, final String scope, final List<String> keys, final PluginCallback<Void> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
+            Futures.addCallback(future, getCallback(callback, v -> null), executor);
+            onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
+        }));
     }
 
     @Override
-    public void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
-        validate(deviceId);
-
-        ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
-        Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
-            onDeviceAttributesChanged(tenantId, deviceId, scope, attributes);
-            return null;
-        }), executor);
+    public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, final PluginCallback<Optional<AttributeKvEntry>> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<Optional<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
+            Futures.addCallback(future, getCallback(callback, v -> v), executor);
+        }));
     }
 
     @Override
-    public void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
-        validate(deviceId);
-        ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
-        Futures.addCallback(future, getCallback(callback, v -> null), executor);
-        onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
-    }
-
-    @Override
-    public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback) {
-        validate(deviceId);
-        ListenableFuture<Optional<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
-        Futures.addCallback(future, getCallback(callback, v -> v), executor);
-    }
-
-    @Override
-    public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
-        validate(deviceId);
-        ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys);
-        Futures.addCallback(future, getCallback(callback, v -> v), executor);
+    public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, final PluginCallback<List<AttributeKvEntry>> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys);
+            Futures.addCallback(future, getCallback(callback, v -> v), executor);
+        }));
     }
 
     @Override
     public void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback) {
-        validate(deviceId);
-        ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.findAll(deviceId, attributeType);
-        Futures.addCallback(future, getCallback(callback, v -> v), executor);
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.findAll(deviceId, attributeType);
+            Futures.addCallback(future, getCallback(callback, v -> v), executor);
+        }));
     }
 
     @Override
-    public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback) {
-        validate(deviceId);
-        List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
-        attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType)));
-        convertFuturesAndAddCallback(callback, futures);
+    public void loadAttributes(final DeviceId deviceId, final Collection<String> attributeTypes, final PluginCallback<List<AttributeKvEntry>> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+            attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType)));
+            convertFuturesAndAddCallback(callback, futures);
+        }));
     }
 
     @Override
-    public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
-        validate(deviceId);
-        List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
-        attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys)));
-        convertFuturesAndAddCallback(callback, futures);
-    }
-
-    private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
-        ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
-                (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
-                    List<AttributeKvEntry> result = new ArrayList<>();
-                    input.forEach(r -> result.addAll(r));
-                    return result;
-                }, executor);
-        Futures.addCallback(future, getCallback(callback, v -> v), executor);
+    public void loadAttributes(final DeviceId deviceId, final Collection<String> attributeTypes, final Collection<String> attributeKeys, final PluginCallback<List<AttributeKvEntry>> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+            attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys)));
+            convertFuturesAndAddCallback(callback, futures);
+        }));
     }
 
     @Override
-    public void saveTsData(DeviceId deviceId, TsKvEntry entry, PluginCallback<Void> callback) {
-        validate(deviceId);
-        ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entry);
-        Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
+    public void saveTsData(final DeviceId deviceId, final TsKvEntry entry, final PluginCallback<Void> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entry);
+            Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
+        }));
     }
 
     @Override
-    public void saveTsData(DeviceId deviceId, List<TsKvEntry> entries, PluginCallback<Void> callback) {
-        validate(deviceId);
-        ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entries);
-        Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
+    public void saveTsData(final DeviceId deviceId, final List<TsKvEntry> entries, final PluginCallback<Void> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entries);
+            Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
+        }));
     }
 
     @Override
-    public void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback) {
-        validate(deviceId);
-        ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries);
-        Futures.addCallback(future, getCallback(callback, v -> v), executor);
+    public void loadTimeseries(final DeviceId deviceId, final List<TsKvQuery> queries, final PluginCallback<List<TsKvEntry>> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries);
+            Futures.addCallback(future, getCallback(callback, v -> v), executor);
+        }));
     }
 
     @Override
-    public void loadLatestTimeseries(DeviceId deviceId, PluginCallback<List<TsKvEntry>> callback) {
-        validate(deviceId);
-        ResultSetFuture future = pluginCtx.tsService.findAllLatest(DataConstants.DEVICE, deviceId);
-        Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor);
+    public void loadLatestTimeseries(final DeviceId deviceId, final PluginCallback<List<TsKvEntry>> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ResultSetFuture future = pluginCtx.tsService.findAllLatest(DataConstants.DEVICE, deviceId);
+            Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor);
+        }));
     }
 
     @Override
-    public void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback) {
-        validate(deviceId);
-        ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.findLatest(DataConstants.DEVICE, deviceId, keys);
-        Futures.addCallback(rsListFuture, getListCallback(callback, rsList ->
-        {
-            List<TsKvEntry> result = new ArrayList<>();
-            for (ResultSet rs : rsList) {
-                Row row = rs.one();
-                if (row != null) {
-                    result.add(pluginCtx.tsService.convertResultToTsKvEntry(row));
+    public void loadLatestTimeseries(final DeviceId deviceId, final Collection<String> keys, final PluginCallback<List<TsKvEntry>> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> {
+            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.findLatest(DataConstants.DEVICE, deviceId, keys);
+            Futures.addCallback(rsListFuture, getListCallback(callback, rsList ->
+            {
+                List<TsKvEntry> result = new ArrayList<>();
+                for (ResultSet rs : rsList) {
+                    Row row = rs.one();
+                    if (row != null) {
+                        result.add(pluginCtx.tsService.convertResultToTsKvEntry(row));
+                    }
                 }
-            }
-            return result;
-        }), executor);
+                return result;
+            }), executor);
+        }));
     }
 
     @Override
@@ -225,15 +207,6 @@ public final class PluginProcessingContext implements PluginContext {
     }
 
     @Override
-    public boolean checkAccess(DeviceId deviceId) {
-        try {
-            return validate(deviceId);
-        } catch (IllegalStateException | IllegalArgumentException e) {
-            return false;
-        }
-    }
-
-    @Override
     public PluginId getPluginId() {
         return pluginCtx.pluginId;
     }
@@ -273,7 +246,11 @@ public final class PluginProcessingContext implements PluginContext {
         return new FutureCallback<R>() {
             @Override
             public void onSuccess(@Nullable R result) {
-                pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
+                try {
+                    pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
+                } catch (Exception e) {
+                    pluginCtx.self().tell(PluginCallbackMessage.onError(callback, e), ActorRef.noSender());
+                }
             }
 
             @Override
@@ -287,27 +264,35 @@ public final class PluginProcessingContext implements PluginContext {
         };
     }
 
-    // TODO: replace with our own exceptions
-    private boolean validate(DeviceId deviceId, PluginCallback<Device> callback) {
+    @Override
+    public void checkAccess(DeviceId deviceId, PluginCallback<Void> callback) {
+        validate(deviceId, new ValidationCallback(callback, ctx -> callback.onSuccess(ctx, null)));
+    }
+
+    private void validate(DeviceId deviceId, ValidationCallback callback) {
         if (securityCtx.isPresent()) {
             final PluginApiCallSecurityContext ctx = securityCtx.get();
             if (ctx.isTenantAdmin() || ctx.isCustomerUser()) {
-                ListenableFuture<Device> device = pluginCtx.deviceService.findDeviceById(deviceId);
-                Futures.addCallback(device, );
-                if (device == null) {
-                    throw new IllegalStateException("Device not found!");
-                } else {
-                    if (!device.getTenantId().equals(ctx.getTenantId())) {
-                        throw new IllegalArgumentException("Device belongs to different tenant!");
-                    } else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) {
-                        throw new IllegalArgumentException("Device belongs to different customer!");
+                ListenableFuture<Device> deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(deviceId);
+                Futures.addCallback(deviceFuture, getCallback(callback, device -> {
+                    if (device == null) {
+                        return Boolean.FALSE;
+                    } else {
+                        if (!device.getTenantId().equals(ctx.getTenantId())) {
+                            return Boolean.FALSE;
+                        } else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) {
+                            return Boolean.FALSE;
+                        } else {
+                            return Boolean.TRUE;
+                        }
                     }
-                }
+                }));
             } else {
-                return false;
+                callback.onSuccess(this, Boolean.FALSE);
             }
+        } else {
+            callback.onSuccess(this, Boolean.TRUE);
         }
-        return true;
     }
 
     @Override
@@ -338,4 +323,15 @@ public final class PluginProcessingContext implements PluginContext {
     public void scheduleTimeoutMsg(TimeoutMsg msg) {
         pluginCtx.scheduleTimeoutMsg(msg);
     }
+
+
+    private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
+        ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
+                (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
+                    List<AttributeKvEntry> result = new ArrayList<>();
+                    input.forEach(r -> result.addAll(r));
+                    return result;
+                }, executor);
+        Futures.addCallback(future, getCallback(callback, v -> v), executor);
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
new file mode 100644
index 0000000..707afa5
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.plugin;
+
+import com.hazelcast.util.function.Consumer;
+import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class ValidationCallback implements PluginCallback<Boolean> {
+
+    private final PluginCallback<?> callback;
+    private final Consumer<PluginContext> action;
+
+    public ValidationCallback(PluginCallback<?> callback, Consumer<PluginContext> action) {
+        this.callback = callback;
+        this.action = action;
+    }
+
+    @Override
+    public void onSuccess(PluginContext ctx, Boolean value) {
+        if (value) {
+            action.accept(ctx);
+        } else {
+            onFailure(ctx, new UnauthorizedException());
+        }
+    }
+
+    @Override
+    public void onFailure(PluginContext ctx, Exception e) {
+        callback.onFailure(ctx, e);
+    }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
index 73a5537..60c833b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
@@ -16,17 +16,22 @@
 package org.thingsboard.server.dao;
 
 import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select;
 import com.datastax.driver.core.utils.UUIDs;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.Result;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.dao.model.BaseEntity;
 import org.thingsboard.server.dao.model.wrapper.EntityResultSet;
 import org.thingsboard.server.dao.model.ModelConstants;
 
+import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -72,6 +77,27 @@ public abstract class AbstractModelDao<T extends BaseEntity<?>> extends Abstract
         return object;
     }
 
+    protected ListenableFuture<T> findOneByStatementAsync(Statement statement) {
+        if (statement != null) {
+            statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
+            ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
+            ListenableFuture<T> result = Futures.transform(resultSetFuture, new Function<ResultSet, T>() {
+                @Nullable
+                @Override
+                public T apply(@Nullable ResultSet resultSet) {
+                    Result<T> result = getMapper().map(resultSet);
+                    if (result != null) {
+                        return result.one();
+                    } else {
+                        return null;
+                    }
+                }
+            });
+            return result;
+        }
+        return Futures.immediateFuture(null);
+    }
+
     protected Statement getSaveQuery(T dto) {
         return getMapper().saveQuery(dto);
     }
@@ -100,6 +126,14 @@ public abstract class AbstractModelDao<T extends BaseEntity<?>> extends Abstract
         return findOneByStatement(query);
     }
 
+    public ListenableFuture<T> findByIdAsync(UUID key) {
+        log.debug("Get entity by key {}", key);
+        Select.Where query = select().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
+        log.trace("Execute query {}", query);
+        return findOneByStatementAsync(query);
+    }
+
+
     public ResultSet removeById(UUID key) {
         Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
         log.debug("Remove request: {}", delete.toString());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/Dao.java b/dao/src/main/java/org/thingsboard/server/dao/Dao.java
index 7aa35e7..2703cdc 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/Dao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/Dao.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.dao;
 
 import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.List;
 import java.util.UUID;
@@ -26,6 +27,8 @@ public interface Dao<T> {
 
     T findById(UUID id);
 
+    ListenableFuture<T> findByIdAsync(UUID id);
+
     T save(T t);
 
     ResultSet removeById(UUID id);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
index 681188e..214d37c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
@@ -15,6 +15,9 @@
  */
 package org.thingsboard.server.dao.device;
 
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -71,6 +74,14 @@ public class DeviceServiceImpl implements DeviceService {
     }
 
     @Override
+    public ListenableFuture<Device> findDeviceByIdAsync(DeviceId deviceId) {
+        log.trace("Executing findDeviceById [{}]", deviceId);
+        validateId(deviceId, "Incorrect deviceId " + deviceId);
+        ListenableFuture<DeviceEntity> deviceEntity = deviceDao.findByIdAsync(deviceId.getId());
+        return Futures.transform(deviceEntity, (Function<? super DeviceEntity, ? extends Device>) input -> getData(input));
+    }
+
+    @Override
     public Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name) {
         log.trace("Executing findDeviceByTenantIdAndName [{}][{}]", tenantId, name);
         validateId(tenantId, "Incorrect tenantId " + tenantId);
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java
new file mode 100644
index 0000000..7b7d0ec
--- /dev/null
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.api.exception;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class UnauthorizedException extends Exception {
+}
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index 64d4d53..d25c8db 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -42,7 +42,7 @@ public interface PluginContext {
 
     void reply(PluginToRuleMsg<?> msg);
 
-    boolean checkAccess(DeviceId deviceId);
+    void checkAccess(DeviceId deviceId, PluginCallback<Void> callback);
 
     Optional<PluginApiCallSecurityContext> getSecurityCtx();
 
@@ -92,13 +92,9 @@ public interface PluginContext {
         Attributes API
      */
 
-    void saveAttributes(DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
+    void saveAttributes(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
 
-    void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
-
-    void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
-
-    void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
+    void removeAttributes(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
 
     void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback);
 
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java
index 4601916..f60f2d8 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java
@@ -25,6 +25,7 @@ import org.springframework.util.StringUtils;
 import org.springframework.web.context.request.async.DeferredResult;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
 import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHandler;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
@@ -62,27 +63,34 @@ public class RpcRestMsgHandler extends DefaultRestMsgHandler {
                 String method = pathParams[0].toUpperCase();
                 if (DataConstants.ONEWAY.equals(method) || DataConstants.TWOWAY.equals(method)) {
                     DeviceId deviceId = DeviceId.fromString(pathParams[1]);
-                    if (!ctx.checkAccess(deviceId)) {
-                        msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-                        return;
-                    }
                     JsonNode rpcRequestBody = jsonMapper.readTree(request.getRequestBody());
 
                     RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(),
                             jsonMapper.writeValueAsString(rpcRequestBody.get("params")));
-                    if (rpcRequestBody.has("timeout")) {
-                        cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
-                    }
-                    long timeout = cmd.getTimeout() != null ? cmd.getTimeout() : defaultTimeout;
-                    ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
-                    ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
-                            ctx.getSecurityCtx().orElseThrow(() -> new IllegalStateException("Security context is empty!")).getTenantId(),
-                            deviceId,
-                            DataConstants.ONEWAY.equals(method),
-                            System.currentTimeMillis() + timeout,
-                            body
-                    );
-                    rpcManager.process(ctx, new LocalRequestMetaData(rpcRequest, msg.getResponseHolder()));
+
+                    ctx.checkAccess(deviceId, new PluginCallback<Void>() {
+                        @Override
+                        public void onSuccess(PluginContext ctx, Void value) {
+                            if (rpcRequestBody.has("timeout")) {
+                                cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
+                            }
+                            long timeout = cmd.getTimeout() != null ? cmd.getTimeout() : defaultTimeout;
+                            ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
+                            ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
+                                    ctx.getSecurityCtx().orElseThrow(() -> new IllegalStateException("Security context is empty!")).getTenantId(),
+                                    deviceId,
+                                    DataConstants.ONEWAY.equals(method),
+                                    System.currentTimeMillis() + timeout,
+                                    body
+                            );
+                            rpcManager.process(ctx, new LocalRequestMetaData(rpcRequest, msg.getResponseHolder()));
+                        }
+
+                        @Override
+                        public void onFailure(PluginContext ctx, Exception e) {
+                            msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+                        }
+                    });
                     valid = true;
                 }
             }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index 28d8b7c..2bd17aa 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -164,7 +164,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                             }
                         });
                         if (attributes.size() > 0) {
-                            ctx.saveAttributes(deviceId, scope, attributes, new PluginCallback<Void>() {
+                            ctx.saveAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), deviceId, scope, attributes, new PluginCallback<Void>() {
                                 @Override
                                 public void onSuccess(PluginContext ctx, Void value) {
                                     msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
@@ -182,8 +182,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                     }
                 }
             }
-        } catch (IOException e) {
-            log.debug("Failed to process POST request due to IO exception", e);
+        } catch (IOException | RuntimeException e) {
+            log.debug("Failed to process POST request due to exception", e);
         }
         msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
     }
@@ -202,7 +202,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                     String keysParam = request.getParameter("keys");
                     if (!StringUtils.isEmpty(keysParam)) {
                         String[] keys = keysParam.split(",");
-                        ctx.removeAttributes(deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
+                        ctx.removeAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
                             @Override
                             public void onSuccess(PluginContext ctx, Void value) {
                                 msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
index d9bfba0..1ce797f 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
@@ -120,7 +120,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
     @Override
     public void handleUpdateAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, UpdateAttributesRequestRuleToPluginMsg msg) {
         UpdateAttributesRequest request = msg.getPayload();
-        ctx.saveAttributes(msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()),
+        ctx.saveAttributes(msg.getTenantId(), msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()),
                 new PluginCallback<Void>() {
                     @Override
                     public void onSuccess(PluginContext ctx, Void value) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index dd30f99..fbfacd3 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -21,6 +21,7 @@ import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
 import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.api.plugins.handlers.DefaultWebsocketMsgHandler;
@@ -122,8 +123,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                         @Override
                         public void onFailure(PluginContext ctx, Exception e) {
                             log.error("Failed to fetch attributes!", e);
-                            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
-                                    "Failed to fetch attributes!");
+                            SubscriptionUpdate update;
+                            if (UnauthorizedException.class.isInstance(e)) {
+                                update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+                                        SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+                            } else {
+                                update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                                        "Failed to fetch attributes!");
+                            }
                             sendWsMsg(ctx, sessionRef, update);
                         }
                     };
@@ -207,8 +214,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 
                         @Override
                         public void onFailure(PluginContext ctx, Exception e) {
-                            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
-                                    "Failed to fetch data!");
+                            SubscriptionUpdate update;
+                            if (UnauthorizedException.class.isInstance(e)) {
+                                update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+                                        SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+                            } else {
+                                update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                                        "Failed to fetch data!");
+                            }
                             sendWsMsg(ctx, sessionRef, update);
                         }
                     });
@@ -263,12 +276,6 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
             return;
         }
         DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
-        if (!ctx.checkAccess(deviceId)) {
-            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
-                    SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
-            sendWsMsg(ctx, sessionRef, update);
-            return;
-        }
         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
         List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
         ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
@@ -279,8 +286,15 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 
             @Override
             public void onFailure(PluginContext ctx, Exception e) {
-                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
-                        "Failed to fetch data!"));
+                SubscriptionUpdate update;
+                if (UnauthorizedException.class.isInstance(e)) {
+                    update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+                            SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+                } else {
+                    update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                            "Failed to fetch data!");
+                }
+                sendWsMsg(ctx, sessionRef, update);
             }
         });
     }
@@ -313,13 +327,6 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
             sendWsMsg(ctx, sessionRef, update);
             return false;
         }
-        DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
-        if (!ctx.checkAccess(deviceId)) {
-            SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
-                    SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
-            sendWsMsg(ctx, sessionRef, update);
-            return false;
-        }
         return true;
     }