thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 236(+116 -120)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java 22(+22 -0)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java 10(+3 -7)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java 42(+25 -17)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java 8(+4 -4)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 98149c2..b102226 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -90,133 +90,115 @@ public final class PluginProcessingContext implements PluginContext {
}
@Override
- public void saveAttributes(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
- validate(deviceId);
-
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
- Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
- onDeviceAttributesChanged(deviceId, scope, attributes);
- return null;
- }), executor);
+ public void saveAttributes(final TenantId tenantId, final DeviceId deviceId, final String scope, final List<AttributeKvEntry> attributes, final PluginCallback<Void> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
+ Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
+ onDeviceAttributesChanged(tenantId, deviceId, scope, attributes);
+ return null;
+ }), executor);
+ }));
}
@Override
- public void removeAttributes(DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
- validate(deviceId);
- ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
- Futures.addCallback(future, getCallback(callback, v -> null), executor);
- onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
+ public void removeAttributes(final TenantId tenantId, final DeviceId deviceId, final String scope, final List<String> keys, final PluginCallback<Void> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
+ Futures.addCallback(future, getCallback(callback, v -> null), executor);
+ onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
+ }));
}
@Override
- public void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
- validate(deviceId);
-
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
- Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
- onDeviceAttributesChanged(tenantId, deviceId, scope, attributes);
- return null;
- }), executor);
+ public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, final PluginCallback<Optional<AttributeKvEntry>> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<Optional<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ }));
}
@Override
- public void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
- validate(deviceId);
- ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
- Futures.addCallback(future, getCallback(callback, v -> null), executor);
- onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
- }
-
- @Override
- public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback) {
- validate(deviceId);
- ListenableFuture<Optional<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
- Futures.addCallback(future, getCallback(callback, v -> v), executor);
- }
-
- @Override
- public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
- validate(deviceId);
- ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys);
- Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, final PluginCallback<List<AttributeKvEntry>> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ }));
}
@Override
public void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback) {
- validate(deviceId);
- ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.findAll(deviceId, attributeType);
- Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.findAll(deviceId, attributeType);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ }));
}
@Override
- public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback) {
- validate(deviceId);
- List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
- attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType)));
- convertFuturesAndAddCallback(callback, futures);
+ public void loadAttributes(final DeviceId deviceId, final Collection<String> attributeTypes, final PluginCallback<List<AttributeKvEntry>> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+ attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType)));
+ convertFuturesAndAddCallback(callback, futures);
+ }));
}
@Override
- public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
- validate(deviceId);
- List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
- attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys)));
- convertFuturesAndAddCallback(callback, futures);
- }
-
- private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
- ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
- (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
- List<AttributeKvEntry> result = new ArrayList<>();
- input.forEach(r -> result.addAll(r));
- return result;
- }, executor);
- Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ public void loadAttributes(final DeviceId deviceId, final Collection<String> attributeTypes, final Collection<String> attributeKeys, final PluginCallback<List<AttributeKvEntry>> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+ attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys)));
+ convertFuturesAndAddCallback(callback, futures);
+ }));
}
@Override
- public void saveTsData(DeviceId deviceId, TsKvEntry entry, PluginCallback<Void> callback) {
- validate(deviceId);
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entry);
- Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
+ public void saveTsData(final DeviceId deviceId, final TsKvEntry entry, final PluginCallback<Void> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entry);
+ Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
+ }));
}
@Override
- public void saveTsData(DeviceId deviceId, List<TsKvEntry> entries, PluginCallback<Void> callback) {
- validate(deviceId);
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entries);
- Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
+ public void saveTsData(final DeviceId deviceId, final List<TsKvEntry> entries, final PluginCallback<Void> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(DataConstants.DEVICE, deviceId, entries);
+ Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
+ }));
}
@Override
- public void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback) {
- validate(deviceId);
- ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries);
- Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ public void loadTimeseries(final DeviceId deviceId, final List<TsKvQuery> queries, final PluginCallback<List<TsKvEntry>> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ }));
}
@Override
- public void loadLatestTimeseries(DeviceId deviceId, PluginCallback<List<TsKvEntry>> callback) {
- validate(deviceId);
- ResultSetFuture future = pluginCtx.tsService.findAllLatest(DataConstants.DEVICE, deviceId);
- Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor);
+ public void loadLatestTimeseries(final DeviceId deviceId, final PluginCallback<List<TsKvEntry>> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ResultSetFuture future = pluginCtx.tsService.findAllLatest(DataConstants.DEVICE, deviceId);
+ Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor);
+ }));
}
@Override
- public void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback) {
- validate(deviceId);
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.findLatest(DataConstants.DEVICE, deviceId, keys);
- Futures.addCallback(rsListFuture, getListCallback(callback, rsList ->
- {
- List<TsKvEntry> result = new ArrayList<>();
- for (ResultSet rs : rsList) {
- Row row = rs.one();
- if (row != null) {
- result.add(pluginCtx.tsService.convertResultToTsKvEntry(row));
+ public void loadLatestTimeseries(final DeviceId deviceId, final Collection<String> keys, final PluginCallback<List<TsKvEntry>> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> {
+ ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.findLatest(DataConstants.DEVICE, deviceId, keys);
+ Futures.addCallback(rsListFuture, getListCallback(callback, rsList ->
+ {
+ List<TsKvEntry> result = new ArrayList<>();
+ for (ResultSet rs : rsList) {
+ Row row = rs.one();
+ if (row != null) {
+ result.add(pluginCtx.tsService.convertResultToTsKvEntry(row));
+ }
}
- }
- return result;
- }), executor);
+ return result;
+ }), executor);
+ }));
}
@Override
@@ -225,15 +207,6 @@ public final class PluginProcessingContext implements PluginContext {
}
@Override
- public boolean checkAccess(DeviceId deviceId) {
- try {
- return validate(deviceId);
- } catch (IllegalStateException | IllegalArgumentException e) {
- return false;
- }
- }
-
- @Override
public PluginId getPluginId() {
return pluginCtx.pluginId;
}
@@ -273,7 +246,11 @@ public final class PluginProcessingContext implements PluginContext {
return new FutureCallback<R>() {
@Override
public void onSuccess(@Nullable R result) {
- pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
+ try {
+ pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
+ } catch (Exception e) {
+ pluginCtx.self().tell(PluginCallbackMessage.onError(callback, e), ActorRef.noSender());
+ }
}
@Override
@@ -287,27 +264,35 @@ public final class PluginProcessingContext implements PluginContext {
};
}
- // TODO: replace with our own exceptions
- private boolean validate(DeviceId deviceId, PluginCallback<Device> callback) {
+ @Override
+ public void checkAccess(DeviceId deviceId, PluginCallback<Void> callback) {
+ validate(deviceId, new ValidationCallback(callback, ctx -> callback.onSuccess(ctx, null)));
+ }
+
+ private void validate(DeviceId deviceId, ValidationCallback callback) {
if (securityCtx.isPresent()) {
final PluginApiCallSecurityContext ctx = securityCtx.get();
if (ctx.isTenantAdmin() || ctx.isCustomerUser()) {
- ListenableFuture<Device> device = pluginCtx.deviceService.findDeviceById(deviceId);
- Futures.addCallback(device, );
- if (device == null) {
- throw new IllegalStateException("Device not found!");
- } else {
- if (!device.getTenantId().equals(ctx.getTenantId())) {
- throw new IllegalArgumentException("Device belongs to different tenant!");
- } else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) {
- throw new IllegalArgumentException("Device belongs to different customer!");
+ ListenableFuture<Device> deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(deviceId);
+ Futures.addCallback(deviceFuture, getCallback(callback, device -> {
+ if (device == null) {
+ return Boolean.FALSE;
+ } else {
+ if (!device.getTenantId().equals(ctx.getTenantId())) {
+ return Boolean.FALSE;
+ } else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) {
+ return Boolean.FALSE;
+ } else {
+ return Boolean.TRUE;
+ }
}
- }
+ }));
} else {
- return false;
+ callback.onSuccess(this, Boolean.FALSE);
}
+ } else {
+ callback.onSuccess(this, Boolean.TRUE);
}
- return true;
}
@Override
@@ -338,4 +323,15 @@ public final class PluginProcessingContext implements PluginContext {
public void scheduleTimeoutMsg(TimeoutMsg msg) {
pluginCtx.scheduleTimeoutMsg(msg);
}
+
+
+ private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
+ ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
+ (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
+ List<AttributeKvEntry> result = new ArrayList<>();
+ input.forEach(r -> result.addAll(r));
+ return result;
+ }, executor);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
new file mode 100644
index 0000000..707afa5
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.plugin;
+
+import com.hazelcast.util.function.Consumer;
+import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class ValidationCallback implements PluginCallback<Boolean> {
+
+ private final PluginCallback<?> callback;
+ private final Consumer<PluginContext> action;
+
+ public ValidationCallback(PluginCallback<?> callback, Consumer<PluginContext> action) {
+ this.callback = callback;
+ this.action = action;
+ }
+
+ @Override
+ public void onSuccess(PluginContext ctx, Boolean value) {
+ if (value) {
+ action.accept(ctx);
+ } else {
+ onFailure(ctx, new UnauthorizedException());
+ }
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ callback.onFailure(ctx, e);
+ }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
index 73a5537..60c833b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java
@@ -16,17 +16,22 @@
package org.thingsboard.server.dao;
import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.utils.UUIDs;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.Result;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.dao.model.BaseEntity;
import org.thingsboard.server.dao.model.wrapper.EntityResultSet;
import org.thingsboard.server.dao.model.ModelConstants;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -72,6 +77,27 @@ public abstract class AbstractModelDao<T extends BaseEntity<?>> extends Abstract
return object;
}
+ protected ListenableFuture<T> findOneByStatementAsync(Statement statement) {
+ if (statement != null) {
+ statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
+ ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
+ ListenableFuture<T> result = Futures.transform(resultSetFuture, new Function<ResultSet, T>() {
+ @Nullable
+ @Override
+ public T apply(@Nullable ResultSet resultSet) {
+ Result<T> result = getMapper().map(resultSet);
+ if (result != null) {
+ return result.one();
+ } else {
+ return null;
+ }
+ }
+ });
+ return result;
+ }
+ return Futures.immediateFuture(null);
+ }
+
protected Statement getSaveQuery(T dto) {
return getMapper().saveQuery(dto);
}
@@ -100,6 +126,14 @@ public abstract class AbstractModelDao<T extends BaseEntity<?>> extends Abstract
return findOneByStatement(query);
}
+ public ListenableFuture<T> findByIdAsync(UUID key) {
+ log.debug("Get entity by key {}", key);
+ Select.Where query = select().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
+ log.trace("Execute query {}", query);
+ return findOneByStatementAsync(query);
+ }
+
+
public ResultSet removeById(UUID key) {
Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
log.debug("Remove request: {}", delete.toString());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/Dao.java b/dao/src/main/java/org/thingsboard/server/dao/Dao.java
index 7aa35e7..2703cdc 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/Dao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/Dao.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.dao;
import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.UUID;
@@ -26,6 +27,8 @@ public interface Dao<T> {
T findById(UUID id);
+ ListenableFuture<T> findByIdAsync(UUID id);
+
T save(T t);
ResultSet removeById(UUID id);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
index 681188e..214d37c 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
@@ -15,6 +15,9 @@
*/
package org.thingsboard.server.dao.device;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -71,6 +74,14 @@ public class DeviceServiceImpl implements DeviceService {
}
@Override
+ public ListenableFuture<Device> findDeviceByIdAsync(DeviceId deviceId) {
+ log.trace("Executing findDeviceById [{}]", deviceId);
+ validateId(deviceId, "Incorrect deviceId " + deviceId);
+ ListenableFuture<DeviceEntity> deviceEntity = deviceDao.findByIdAsync(deviceId.getId());
+ return Futures.transform(deviceEntity, (Function<? super DeviceEntity, ? extends Device>) input -> getData(input));
+ }
+
+ @Override
public Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name) {
log.trace("Executing findDeviceByTenantIdAndName [{}][{}]", tenantId, name);
validateId(tenantId, "Incorrect tenantId " + tenantId);
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java
new file mode 100644
index 0000000..7b7d0ec
--- /dev/null
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/exception/UnauthorizedException.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.api.exception;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class UnauthorizedException extends Exception {
+}
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index 64d4d53..d25c8db 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -42,7 +42,7 @@ public interface PluginContext {
void reply(PluginToRuleMsg<?> msg);
- boolean checkAccess(DeviceId deviceId);
+ void checkAccess(DeviceId deviceId, PluginCallback<Void> callback);
Optional<PluginApiCallSecurityContext> getSecurityCtx();
@@ -92,13 +92,9 @@ public interface PluginContext {
Attributes API
*/
- void saveAttributes(DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
+ void saveAttributes(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
- void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
-
- void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
-
- void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
+ void removeAttributes(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback);
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java
index 4601916..f60f2d8 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/handlers/RpcRestMsgHandler.java
@@ -25,6 +25,7 @@ import org.springframework.util.StringUtils;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHandler;
import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
@@ -62,27 +63,34 @@ public class RpcRestMsgHandler extends DefaultRestMsgHandler {
String method = pathParams[0].toUpperCase();
if (DataConstants.ONEWAY.equals(method) || DataConstants.TWOWAY.equals(method)) {
DeviceId deviceId = DeviceId.fromString(pathParams[1]);
- if (!ctx.checkAccess(deviceId)) {
- msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
- return;
- }
JsonNode rpcRequestBody = jsonMapper.readTree(request.getRequestBody());
RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(),
jsonMapper.writeValueAsString(rpcRequestBody.get("params")));
- if (rpcRequestBody.has("timeout")) {
- cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
- }
- long timeout = cmd.getTimeout() != null ? cmd.getTimeout() : defaultTimeout;
- ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
- ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
- ctx.getSecurityCtx().orElseThrow(() -> new IllegalStateException("Security context is empty!")).getTenantId(),
- deviceId,
- DataConstants.ONEWAY.equals(method),
- System.currentTimeMillis() + timeout,
- body
- );
- rpcManager.process(ctx, new LocalRequestMetaData(rpcRequest, msg.getResponseHolder()));
+
+ ctx.checkAccess(deviceId, new PluginCallback<Void>() {
+ @Override
+ public void onSuccess(PluginContext ctx, Void value) {
+ if (rpcRequestBody.has("timeout")) {
+ cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
+ }
+ long timeout = cmd.getTimeout() != null ? cmd.getTimeout() : defaultTimeout;
+ ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
+ ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
+ ctx.getSecurityCtx().orElseThrow(() -> new IllegalStateException("Security context is empty!")).getTenantId(),
+ deviceId,
+ DataConstants.ONEWAY.equals(method),
+ System.currentTimeMillis() + timeout,
+ body
+ );
+ rpcManager.process(ctx, new LocalRequestMetaData(rpcRequest, msg.getResponseHolder()));
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+ }
+ });
valid = true;
}
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index 28d8b7c..2bd17aa 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -164,7 +164,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
}
});
if (attributes.size() > 0) {
- ctx.saveAttributes(deviceId, scope, attributes, new PluginCallback<Void>() {
+ ctx.saveAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), deviceId, scope, attributes, new PluginCallback<Void>() {
@Override
public void onSuccess(PluginContext ctx, Void value) {
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
@@ -182,8 +182,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
}
}
}
- } catch (IOException e) {
- log.debug("Failed to process POST request due to IO exception", e);
+ } catch (IOException | RuntimeException e) {
+ log.debug("Failed to process POST request due to exception", e);
}
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
@@ -202,7 +202,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
String keysParam = request.getParameter("keys");
if (!StringUtils.isEmpty(keysParam)) {
String[] keys = keysParam.split(",");
- ctx.removeAttributes(deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
+ ctx.removeAttributes(ctx.getSecurityCtx().orElseThrow(() -> new IllegalArgumentException()).getTenantId(), deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
@Override
public void onSuccess(PluginContext ctx, Void value) {
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
index d9bfba0..1ce797f 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
@@ -120,7 +120,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
@Override
public void handleUpdateAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, UpdateAttributesRequestRuleToPluginMsg msg) {
UpdateAttributesRequest request = msg.getPayload();
- ctx.saveAttributes(msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()),
+ ctx.saveAttributes(msg.getTenantId(), msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getAttributes().stream().collect(Collectors.toList()),
new PluginCallback<Void>() {
@Override
public void onSuccess(PluginContext ctx, Void value) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index dd30f99..fbfacd3 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -21,6 +21,7 @@ import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.extensions.api.exception.UnauthorizedException;
import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.DefaultWebsocketMsgHandler;
@@ -122,8 +123,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
@Override
public void onFailure(PluginContext ctx, Exception e) {
log.error("Failed to fetch attributes!", e);
- SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
- "Failed to fetch attributes!");
+ SubscriptionUpdate update;
+ if (UnauthorizedException.class.isInstance(e)) {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+ SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+ } else {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ "Failed to fetch attributes!");
+ }
sendWsMsg(ctx, sessionRef, update);
}
};
@@ -207,8 +214,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
@Override
public void onFailure(PluginContext ctx, Exception e) {
- SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
- "Failed to fetch data!");
+ SubscriptionUpdate update;
+ if (UnauthorizedException.class.isInstance(e)) {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+ SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+ } else {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ "Failed to fetch data!");
+ }
sendWsMsg(ctx, sessionRef, update);
}
});
@@ -263,12 +276,6 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
return;
}
DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
- if (!ctx.checkAccess(deviceId)) {
- SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
- SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
- sendWsMsg(ctx, sessionRef, update);
- return;
- }
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
@@ -279,8 +286,15 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
@Override
public void onFailure(PluginContext ctx, Exception e) {
- sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
- "Failed to fetch data!"));
+ SubscriptionUpdate update;
+ if (UnauthorizedException.class.isInstance(e)) {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
+ SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
+ } else {
+ update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ "Failed to fetch data!");
+ }
+ sendWsMsg(ctx, sessionRef, update);
}
});
}
@@ -313,13 +327,6 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
sendWsMsg(ctx, sessionRef, update);
return false;
}
- DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
- if (!ctx.checkAccess(deviceId)) {
- SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.UNAUTHORIZED,
- SubscriptionErrorCode.UNAUTHORIZED.getDefaultMsg());
- sendWsMsg(ctx, sessionRef, update);
- return false;
- }
return true;
}