thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 3(+1 -2)
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 111(+72 -39)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java 16(+12 -4)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java 74(+74 -0)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java 115(+83 -32)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java 35(+24 -11)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 70bb4f2..f9c4e6f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -195,9 +195,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
- //TODO: improve this procedure to fetch only changed attributes.
+ //TODO: improve this procedure to fetch only changed attributes and support attributes deletion
refreshAttributes();
- //TODO: support attributes deletion
Set<AttributeKey> keys = msg.getKeys();
if (attributeSubscriptions.size() > 0) {
ToDeviceMsg notification = null;
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 9474a62..98149c2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -56,6 +56,7 @@ import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRe
import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
import akka.actor.ActorRef;
+import org.w3c.dom.Attr;
import javax.annotation.Nullable;
@@ -91,49 +92,86 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void saveAttributes(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
validate(deviceId);
- Set<AttributeKey> keys = new HashSet<>();
- for (AttributeKvEntry attribute : attributes) {
- keys.add(new AttributeKey(scope, attribute.getKey()));
- }
ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
- onDeviceAttributesChanged(deviceId, keys);
+ onDeviceAttributesChanged(deviceId, scope, attributes);
return null;
}), executor);
}
@Override
- public Optional<AttributeKvEntry> loadAttribute(DeviceId deviceId, String attributeType, String attributeKey) {
+ public void removeAttributes(DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
validate(deviceId);
- AttributeKvEntry attribute = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
- return Optional.ofNullable(attribute);
+ ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
+ Futures.addCallback(future, getCallback(callback, v -> null), executor);
+ onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
}
@Override
- public List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType, List<String> attributeKeys) {
+ public void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes, PluginCallback<Void> callback) {
validate(deviceId);
- List<AttributeKvEntry> result = new ArrayList<>(attributeKeys.size());
- for (String attributeKey : attributeKeys) {
- AttributeKvEntry attribute = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
- if (attribute != null) {
- result.add(attribute);
- }
- }
- return result;
+
+ ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(deviceId, scope, attributes);
+ Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
+ onDeviceAttributesChanged(tenantId, deviceId, scope, attributes);
+ return null;
+ }), executor);
+ }
+
+ @Override
+ public void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> keys, PluginCallback<Void> callback) {
+ validate(deviceId);
+ ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(deviceId, scope, keys);
+ Futures.addCallback(future, getCallback(callback, v -> null), executor);
+ onDeviceAttributesDeleted(tenantId, deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
+ }
+
+ @Override
+ public void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback) {
+ validate(deviceId);
+ ListenableFuture<Optional<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKey);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
}
@Override
- public List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType) {
+ public void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
validate(deviceId);
- return pluginCtx.attributesService.findAll(deviceId, attributeType);
+ ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
}
@Override
- public void removeAttributes(DeviceId deviceId, String scope, List<String> keys) {
+ public void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback) {
validate(deviceId);
- pluginCtx.attributesService.removeAll(deviceId, scope, keys);
- onDeviceAttributesDeleted(deviceId, keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
+ ListenableFuture<List<AttributeKvEntry>> future = pluginCtx.attributesService.findAll(deviceId, attributeType);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
+ }
+
+ @Override
+ public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback) {
+ validate(deviceId);
+ List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+ attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.findAll(deviceId, attributeType)));
+ convertFuturesAndAddCallback(callback, futures);
+ }
+
+ @Override
+ public void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback) {
+ validate(deviceId);
+ List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
+ attributeTypes.forEach(attributeType -> futures.add(pluginCtx.attributesService.find(deviceId, attributeType, attributeKeys)));
+ convertFuturesAndAddCallback(callback, futures);
+ }
+
+ private void convertFuturesAndAddCallback(PluginCallback<List<AttributeKvEntry>> callback, List<ListenableFuture<List<AttributeKvEntry>>> futures) {
+ ListenableFuture<List<AttributeKvEntry>> future = Futures.transform(Futures.successfulAsList(futures),
+ (Function<? super List<List<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
+ List<AttributeKvEntry> result = new ArrayList<>();
+ input.forEach(r -> result.addAll(r));
+ return result;
+ }, executor);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
}
@Override
@@ -205,18 +243,12 @@ public final class PluginProcessingContext implements PluginContext {
return securityCtx;
}
- private void onDeviceAttributesChanged(DeviceId deviceId, AttributeKey key) {
- onDeviceAttributesChanged(deviceId, Collections.singleton(key));
+ private void onDeviceAttributesDeleted(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
+ pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onDelete(tenantId, deviceId, keys));
}
- private void onDeviceAttributesDeleted(DeviceId deviceId, Set<AttributeKey> keys) {
- Device device = pluginCtx.deviceService.findDeviceById(deviceId);
- pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), deviceId, keys));
- }
-
- private void onDeviceAttributesChanged(DeviceId deviceId, Set<AttributeKey> keys) {
- Device device = pluginCtx.deviceService.findDeviceById(deviceId);
- pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(device.getTenantId(), deviceId, keys));
+ private void onDeviceAttributesChanged(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> values) {
+ pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, scope, values));
}
private <T> FutureCallback<List<ResultSet>> getListCallback(final PluginCallback<T> callback, Function<List<ResultSet>, T> transformer) {
@@ -256,11 +288,12 @@ public final class PluginProcessingContext implements PluginContext {
}
// TODO: replace with our own exceptions
- private boolean validate(DeviceId deviceId) {
+ private boolean validate(DeviceId deviceId, PluginCallback<Device> callback) {
if (securityCtx.isPresent()) {
- PluginApiCallSecurityContext ctx = securityCtx.get();
+ final PluginApiCallSecurityContext ctx = securityCtx.get();
if (ctx.isTenantAdmin() || ctx.isCustomerUser()) {
- Device device = pluginCtx.deviceService.findDeviceById(deviceId);
+ ListenableFuture<Device> device = pluginCtx.deviceService.findDeviceById(deviceId);
+ Futures.addCallback(device, );
if (device == null) {
throw new IllegalStateException("Device not found!");
} else {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java
new file mode 100644
index 0000000..9b9368d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractAsyncDao.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public abstract class AbstractAsyncDao extends AbstractDao {
+
+ protected ExecutorService readResultsProcessingExecutor;
+
+ @PostConstruct
+ public void startExecutor() {
+ readResultsProcessingExecutor = Executors.newCachedThreadPool();
+ }
+
+ @PreDestroy
+ public void stopExecutor() {
+ if (readResultsProcessingExecutor != null) {
+ readResultsProcessingExecutor.shutdownNow();
+ }
+ }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
index ead2c04..ae58d4d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
@@ -15,23 +15,28 @@
*/
package org.thingsboard.server.dao.attributes;
+import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import java.util.Collection;
import java.util.List;
-import java.util.UUID;
+import java.util.Optional;
/**
* @author Andrew Shvayka
*/
public interface AttributesDao {
- AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey);
+ ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String attributeType, String attributeKey);
- List<AttributeKvEntry> findAll(EntityId entityId, String attributeType);
+ ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String attributeType, Collection<String> attributeKey);
+
+ ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType);
ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
- void removeAll(EntityId entityId, String scope, List<String> keys);
+ ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
index 5a1fd70..6bf9fb2 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
@@ -23,18 +23,22 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import java.util.Collection;
import java.util.List;
+import java.util.Optional;
/**
* @author Andrew Shvayka
*/
public interface AttributesService {
- AttributeKvEntry find(EntityId entityId, String scope, String attributeKey);
+ ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String scope, String attributeKey);
- List<AttributeKvEntry> findAll(EntityId entityId, String scope);
+ ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String scope, Collection<String> attributeKeys);
+
+ ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope);
ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
- void removeAll(EntityId entityId, String scope, List<String> attributeKeys);
+ ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> attributeKeys);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
index 5148a12..262d15d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
@@ -18,19 +18,24 @@ package org.thingsboard.server.dao.attributes;
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.kv.DataType;
-import org.thingsboard.server.dao.AbstractDao;
+import org.thingsboard.server.dao.AbstractAsyncDao;
import org.thingsboard.server.dao.model.ModelConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.dao.timeseries.BaseTimeseriesDao;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
import static org.thingsboard.server.dao.model.ModelConstants.*;
import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
@@ -40,29 +45,55 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
*/
@Component
@Slf4j
-public class BaseAttributesDao extends AbstractDao implements AttributesDao {
-
+public class BaseAttributesDao extends AbstractAsyncDao implements AttributesDao {
+
private PreparedStatement saveStmt;
+ @PostConstruct
+ public void init() {
+ super.startExecutor();
+ }
+
+ @PreDestroy
+ public void stop() {
+ super.stopExecutor();
+ }
+
@Override
- public AttributeKvEntry find(EntityId entityId, String attributeType, String attributeKey) {
+ public ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String attributeType, String attributeKey) {
Select.Where select = select().from(ATTRIBUTES_KV_CF)
.where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
.and(eq(ENTITY_ID_COLUMN, entityId.getId()))
.and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
.and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey));
log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey);
- return convertResultToAttributesKvEntry(attributeKey, executeRead(select).one());
+ return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends Optional<AttributeKvEntry>>) input ->
+ Optional.of(convertResultToAttributesKvEntry(attributeKey, input.one()))
+ , readResultsProcessingExecutor);
}
@Override
- public List<AttributeKvEntry> findAll(EntityId entityId, String attributeType) {
+ public ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String attributeType, Collection<String> attributeKeys) {
+ List<ListenableFuture<Optional<AttributeKvEntry>>> entries = new ArrayList<>();
+ attributeKeys.forEach(attributeKey -> entries.add(find(entityId, attributeType, attributeKey)));
+ return Futures.transform(Futures.allAsList(entries), (Function<List<Optional<AttributeKvEntry>>, ? extends List<AttributeKvEntry>>) input -> {
+ List<AttributeKvEntry> result = new ArrayList<>();
+ input.stream().filter(opt -> opt.isPresent()).forEach(opt -> result.add(opt.get()));
+ return result;
+ }, readResultsProcessingExecutor);
+ }
+
+
+ @Override
+ public ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType) {
Select.Where select = select().from(ATTRIBUTES_KV_CF)
.where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
.and(eq(ENTITY_ID_COLUMN, entityId.getId()))
.and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType));
log.trace("Generated query [{}] for entityId {} and attributeType {}", select, entityId, attributeType);
- return convertResultToAttributesKvEntryList(executeRead(select));
+ return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends List<AttributeKvEntry>>) input ->
+ convertResultToAttributesKvEntryList(input)
+ , readResultsProcessingExecutor);
}
@Override
@@ -93,20 +124,19 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
}
@Override
- public void removeAll(EntityId entityId, String attributeType, List<String> keys) {
- for (String key : keys) {
- delete(entityId, attributeType, key);
- }
+ public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String attributeType, List<String> keys) {
+ List<ResultSetFuture> futures = keys.stream().map(key -> delete(entityId, attributeType, key)).collect(Collectors.toList());
+ return Futures.allAsList(futures);
}
- private void delete(EntityId entityId, String attributeType, String key) {
+ private ResultSetFuture delete(EntityId entityId, String attributeType, String key) {
Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF)
.where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
.and(eq(ENTITY_ID_COLUMN, entityId.getId()))
.and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
.and(eq(ATTRIBUTE_KEY_COLUMN, key));
log.debug("Remove request: {}", delete.toString());
- getSession().execute(delete);
+ return getSession().executeAsync(delete);
}
private PreparedStatement getSaveStmt() {
@@ -150,5 +180,4 @@ public class BaseAttributesDao extends AbstractDao implements AttributesDao {
}
return entries;
}
-
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
index d3a1cb3..4361241 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
@@ -27,7 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.service.Validator;
+import java.util.Collection;
import java.util.List;
+import java.util.Optional;
/**
* @author Andrew Shvayka
@@ -39,14 +41,21 @@ public class BaseAttributesService implements AttributesService {
private AttributesDao attributesDao;
@Override
- public AttributeKvEntry find(EntityId entityId, String scope, String attributeKey) {
+ public ListenableFuture<Optional<AttributeKvEntry>> find(EntityId entityId, String scope, String attributeKey) {
validate(entityId, scope);
Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
return attributesDao.find(entityId, scope, attributeKey);
}
@Override
- public List<AttributeKvEntry> findAll(EntityId entityId, String scope) {
+ public ListenableFuture<List<AttributeKvEntry>> find(EntityId entityId, String scope, Collection<String> attributeKeys) {
+ validate(entityId, scope);
+ attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
+ return attributesDao.find(entityId, scope, attributeKeys);
+ }
+
+ @Override
+ public ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope) {
validate(entityId, scope);
return attributesDao.findAll(entityId, scope);
}
@@ -56,16 +65,16 @@ public class BaseAttributesService implements AttributesService {
validate(entityId, scope);
attributes.forEach(attribute -> validate(attribute));
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(attributes.size());
- for(AttributeKvEntry attribute : attributes) {
+ for (AttributeKvEntry attribute : attributes) {
futures.add(attributesDao.save(entityId, scope, attribute));
}
return Futures.allAsList(futures);
}
@Override
- public void removeAll(EntityId entityId, String scope, List<String> keys) {
+ public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys) {
validate(entityId, scope);
- attributesDao.removeAll(entityId, scope, keys);
+ return attributesDao.removeAll(entityId, scope, keys);
}
private static void validate(EntityId id, String scope) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
index 8d780b6..b760f55 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.dao.device;
+import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
@@ -28,6 +29,8 @@ public interface DeviceService {
Device findDeviceById(DeviceId deviceId);
+ ListenableFuture<Device> findDeviceByIdAsync(DeviceId deviceId);
+
Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name);
Device saveDevice(Device device);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 81bbdf9..42fede4 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.data.kv.DataType;
+import org.thingsboard.server.dao.AbstractAsyncDao;
import org.thingsboard.server.dao.AbstractDao;
import org.thingsboard.server.dao.model.ModelConstants;
@@ -50,7 +51,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
*/
@Component
@Slf4j
-public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
+public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao {
@Value("${cassandra.query.min_aggregation_step_ms}")
private int minAggregationStepMs;
@@ -60,8 +61,6 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
private TsPartitionDate tsFormat;
- private ExecutorService readResultsProcessingExecutor;
-
private PreparedStatement partitionInsertStmt;
private PreparedStatement[] latestInsertStmts;
private PreparedStatement[] saveStmts;
@@ -71,8 +70,8 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
@PostConstruct
public void init() {
+ super.startExecutor();
getFetchStmt(Aggregation.NONE);
- readResultsProcessingExecutor = Executors.newCachedThreadPool();
Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
if (partition.isPresent()) {
tsFormat = partition.get();
@@ -84,9 +83,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
@PreDestroy
public void stop() {
- if (readResultsProcessingExecutor != null) {
- readResultsProcessingExecutor.shutdownNow();
- }
+ super.stopExecutor();
}
@Override
diff --git a/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java
index 8fb5d86..559a313 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/attributes/BaseAttributesServiceTest.java
@@ -32,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
import static org.thingsboard.server.common.data.DataConstants.DEVICE;
@@ -54,8 +55,9 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
KvEntry attrValue = new StringDataEntry("attribute1", "value1");
AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attr)).get();
- AttributeKvEntry saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attr.getKey());
- Assert.assertEquals(attr, saved);
+ Optional<AttributeKvEntry> saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attr.getKey()).get();
+ Assert.assertTrue(saved.isPresent());
+ Assert.assertEquals(attr, saved.get());
}
@Test
@@ -65,15 +67,17 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
AttributeKvEntry attrOld = new BaseAttributeKvEntry(attrOldValue, 42L);
attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrOld)).get();
- AttributeKvEntry saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey());
- Assert.assertEquals(attrOld, saved);
+ Optional<AttributeKvEntry> saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()).get();
+
+ Assert.assertTrue(saved.isPresent());
+ Assert.assertEquals(attrOld, saved.get());
KvEntry attrNewValue = new StringDataEntry("attribute1", "value2");
AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L);
attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrNew)).get();
- saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey());
- Assert.assertEquals(attrNew, saved);
+ saved = attributesService.find(deviceId, DataConstants.CLIENT_SCOPE, attrOld.getKey()).get();
+ Assert.assertEquals(attrNew, saved.get());
}
@Test
@@ -91,7 +95,7 @@ public class BaseAttributesServiceTest extends AbstractServiceTest {
attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrANew)).get();
attributesService.save(deviceId, DataConstants.CLIENT_SCOPE, Collections.singletonList(attrBNew)).get();
- List<AttributeKvEntry> saved = attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE);
+ List<AttributeKvEntry> saved = attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE).get();
Assert.assertNotNull(saved);
Assert.assertEquals(2, saved.size());
diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
index 13c25c3..fd16b75 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
@@ -114,8 +114,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
entries.add(save(deviceId, 45000, 500));
entries.add(save(deviceId, 55000, 600));
- List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 3, Aggregation.NONE)).get();
+ List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+ 60000, 3, Aggregation.NONE))).get();
assertEquals(3, list.size());
assertEquals(55000, list.get(0).getTs());
assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
@@ -126,8 +126,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
assertEquals(35000, list.get(2).getTs());
assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
- list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 3, Aggregation.AVG)).get();
+ list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+ 60000, 3, Aggregation.AVG))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
@@ -138,8 +138,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
assertEquals(50000, list.get(2).getTs());
assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
- list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 3, Aggregation.SUM)).get();
+ list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+ 60000, 3, Aggregation.SUM))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -151,8 +151,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
assertEquals(50000, list.get(2).getTs());
assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
- list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 3, Aggregation.MIN)).get();
+ list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+ 60000, 3, Aggregation.MIN))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -164,8 +164,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
assertEquals(50000, list.get(2).getTs());
assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
- list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 3, Aggregation.MAX)).get();
+ list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+ 60000, 3, Aggregation.MAX))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -177,8 +177,8 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
assertEquals(50000, list.get(2).getTs());
assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
- list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 3, Aggregation.COUNT)).get();
+ list = tsService.findAll(DataConstants.DEVICE, deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+ 60000, 3, Aggregation.COUNT))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 210d2c0..a4eb3d0 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster
cassandra.keyspace_name=thingsboard
-cassandra.url=127.0.0.1:9042
+cassandra.url=127.0.0.1:9142
cassandra.ssl=false
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index c2c5587..64d4d53 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -94,13 +94,21 @@ public interface PluginContext {
void saveAttributes(DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
- Optional<AttributeKvEntry> loadAttribute(DeviceId deviceId, String attributeType, String attributeKey);
+ void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
- List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType, List<String> attributeKeys);
+ void saveAttributesByDevice(TenantId tenantId, DeviceId deviceId, String attributeType, List<AttributeKvEntry> attributes, PluginCallback<Void> callback);
- List<AttributeKvEntry> loadAttributes(DeviceId deviceId, String attributeType);
+ void removeAttributesByDevice(TenantId tenantId, DeviceId deviceId, String scope, List<String> attributeKeys, PluginCallback<Void> callback);
- void removeAttributes(DeviceId deviceId, String scope, List<String> attributeKeys);
+ void loadAttribute(DeviceId deviceId, String attributeType, String attributeKey, PluginCallback<Optional<AttributeKvEntry>> callback);
+
+ void loadAttributes(DeviceId deviceId, String attributeType, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
+
+ void loadAttributes(DeviceId deviceId, String attributeType, PluginCallback<List<AttributeKvEntry>> callback);
+
+ void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, PluginCallback<List<AttributeKvEntry>> callback);
+
+ void loadAttributes(DeviceId deviceId, Collection<String> attributeTypes, Collection<String> attributeKeys, PluginCallback<List<AttributeKvEntry>> callback);
void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback<List<Device>> callback);
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java
new file mode 100644
index 0000000..bc5285c
--- /dev/null
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/BiPluginCallBack.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.core.plugin.telemetry.handlers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+@Slf4j
+public abstract class BiPluginCallBack<V1, V2> {
+
+ private V1 v1;
+ private V2 v2;
+
+ public PluginCallback<V1> getV1Callback() {
+ return new PluginCallback<V1>() {
+ @Override
+ public void onSuccess(PluginContext ctx, V1 value) {
+ synchronized (BiPluginCallBack.this) {
+ BiPluginCallBack.this.v1 = value;
+ if (v2 != null) {
+ BiPluginCallBack.this.onSuccess(ctx, v1, v2);
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ BiPluginCallBack.this.onFailure(ctx, e);
+ }
+ };
+ }
+
+ public PluginCallback<V2> getV2Callback() {
+ return new PluginCallback<V2>() {
+ @Override
+ public void onSuccess(PluginContext ctx, V2 value) {
+ synchronized (BiPluginCallBack.this) {
+ BiPluginCallBack.this.v2 = value;
+ if (v1 != null) {
+ BiPluginCallBack.this.onSuccess(ctx, v1, v2);
+ }
+ }
+
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ BiPluginCallBack.this.onFailure(ctx, e);
+ }
+ };
+ }
+
+ abstract public void onSuccess(PluginContext ctx, V1 v1, V2 v2);
+
+ abstract public void onFailure(PluginContext ctx, Exception e);
+
+}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index e93f0b5..28d8b7c 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -77,41 +77,58 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
}
});
} else if (entity.equals("attributes")) {
- List<AttributeKvEntry> attributes;
+ PluginCallback<List<AttributeKvEntry>> callback = getAttributeKeysPluginCallback(msg);
if (!StringUtils.isEmpty(scope)) {
- attributes = ctx.loadAttributes(deviceId, scope);
+ ctx.loadAttributes(deviceId, scope, callback);
} else {
- attributes = new ArrayList<>();
- Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s)));
+ ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
}
- List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
- msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
}
} else if (method.equals("values")) {
if ("timeseries".equals(entity)) {
- String keys = request.getParameter("keys");
+ String keysStr = request.getParameter("keys");
Optional<Long> startTs = request.getLongParamValue("startTs");
Optional<Long> endTs = request.getLongParamValue("endTs");
Optional<Integer> limit = request.getIntParamValue("limit");
- Map<String, List<TsData>> data = new LinkedHashMap<>();
- for (String key : keys.split(",")) {
- //TODO: refactoring
-// List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
-// data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
- }
- msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
+ Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
+
+ List<String> keys = Arrays.asList(keysStr.split(","));
+ List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.get(), agg)).collect(Collectors.toList());
+ ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
+ Map<String, List<TsData>> result = new LinkedHashMap<>();
+ for (TsKvEntry entry : data) {
+ result.put(entry.getKey(), data.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
+ }
+ msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to fetch historical data", e);
+ msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+ });
} else if ("attributes".equals(entity)) {
String keys = request.getParameter("keys", "");
- List<AttributeKvEntry> attributes;
+
+ PluginCallback<List<AttributeKvEntry>> callback = getAttributeValuesPluginCallback(msg);
if (!StringUtils.isEmpty(scope)) {
- attributes = getAttributeKvEntries(ctx, scope, deviceId, keys);
+ if (!StringUtils.isEmpty(keys)) {
+ List<String> keyList = Arrays.asList(keys.split(","));
+ ctx.loadAttributes(deviceId, scope, keyList, callback);
+ } else {
+ ctx.loadAttributes(deviceId, scope, callback);
+ }
} else {
- attributes = new ArrayList<>();
- Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys)));
+ if (!StringUtils.isEmpty(keys)) {
+ List<String> keyList = Arrays.asList(keys.split(","));
+ ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), keyList, callback);
+ } else {
+ ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
+ }
}
- List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
- attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
- msg.getResponseHolder().setResult(new ResponseEntity<>(values, HttpStatus.OK));
}
}
} else {
@@ -156,6 +173,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
@Override
public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to save attributes", e);
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
});
@@ -184,8 +202,18 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
String keysParam = request.getParameter("keys");
if (!StringUtils.isEmpty(keysParam)) {
String[] keys = keysParam.split(",");
- ctx.removeAttributes(deviceId, scope, Arrays.asList(keys));
- msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
+ ctx.removeAttributes(deviceId, scope, Arrays.asList(keys), new PluginCallback<Void>() {
+ @Override
+ public void onSuccess(PluginContext ctx, Void value) {
+ msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to remove attributes", e);
+ msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+ });
return;
}
}
@@ -196,14 +224,37 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
- private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, String scope, DeviceId deviceId, String keysParam) {
- List<AttributeKvEntry> attributes;
- if (!StringUtils.isEmpty(keysParam)) {
- String[] keys = keysParam.split(",");
- attributes = ctx.loadAttributes(deviceId, scope, Arrays.asList(keys));
- } else {
- attributes = ctx.loadAttributes(deviceId, scope);
- }
- return attributes;
+
+ private PluginCallback<List<AttributeKvEntry>> getAttributeKeysPluginCallback(final PluginRestMsg msg) {
+ return new PluginCallback<List<AttributeKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<AttributeKvEntry> attributes) {
+ List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
+ msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to fetch attributes", e);
+ msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+ };
+ }
+
+ private PluginCallback<List<AttributeKvEntry>> getAttributeValuesPluginCallback(final PluginRestMsg msg) {
+ return new PluginCallback<List<AttributeKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<AttributeKvEntry> attributes) {
+ List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
+ attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
+ msg.getResponseHolder().setResult(new ResponseEntity<>(values, HttpStatus.OK));
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to fetch attributes", e);
+ msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+ };
}
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
index 85a25da..d9bfba0 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.extensions.core.plugin.telemetry.handlers;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleId;
@@ -38,6 +39,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
import java.util.*;
import java.util.stream.Collectors;
+@Slf4j
public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
private final SubscriptionManager subscriptionManager;
@@ -49,27 +51,36 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
public void handleGetAttributesRequest(PluginContext ctx, TenantId tenantId, RuleId ruleId, GetAttributesRequestRuleToPluginMsg msg) {
GetAttributesRequest request = msg.getPayload();
- List<AttributeKvEntry> clientAttributes = getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
- List<AttributeKvEntry> sharedAttributes = getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.SHARED_SCOPE, request.getSharedAttributeNames());
+ BiPluginCallBack<List<AttributeKvEntry>, List<AttributeKvEntry>> callback = new BiPluginCallBack<List<AttributeKvEntry>, List<AttributeKvEntry>>() {
- BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
- request.getRequestId(), BasicAttributeKVMsg.from(clientAttributes, sharedAttributes));
+ @Override
+ public void onSuccess(PluginContext ctx, List<AttributeKvEntry> clientAttributes, List<AttributeKvEntry> sharedAttributes) {
+ BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
+ request.getRequestId(), BasicAttributeKVMsg.from(clientAttributes, sharedAttributes));
+ ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to process get attributes request", e);
+ ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
+ }
+ };
- ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
+ getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.CLIENT_SCOPE, request.getClientAttributeNames(), callback.getV1Callback());
+ getAttributeKvEntries(ctx, msg.getDeviceId(), DataConstants.SHARED_SCOPE, request.getSharedAttributeNames(), callback.getV2Callback());
}
- private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names) {
- List<AttributeKvEntry> attributes;
+ private void getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names, PluginCallback<List<AttributeKvEntry>> callback) {
if (names.isPresent()) {
if (!names.get().isEmpty()) {
- attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()));
+ ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()), callback);
} else {
- attributes = ctx.loadAttributes(deviceId, scope);
+ ctx.loadAttributes(deviceId, scope, callback);
}
} else {
- attributes = Collections.emptyList();
+ callback.onSuccess(ctx, Collections.emptyList());
}
- return attributes;
}
@Override
@@ -100,6 +111,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
@Override
public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to process telemetry upload request", e);
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
}
});
@@ -127,6 +139,7 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
@Override
public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to process attributes update request", e);
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, BasicStatusCodeResponse.onError(request.getMsgType(), request.getRequestId(), e)));
}
});
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 739bedf..dd30f99 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -104,37 +104,64 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
SubscriptionState sub;
if (keysOptional.isPresent()) {
List<String> keys = new ArrayList<>(keysOptional.get());
- List<AttributeKvEntry> data = new ArrayList<>();
+
+ PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
+ List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
+ sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+
+ Map<String, Long> subState = new HashMap<>(keys.size());
+ keys.forEach(key -> subState.put(key, 0L));
+ attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
+
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
+ subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to fetch attributes!", e);
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ "Failed to fetch attributes!");
+ sendWsMsg(ctx, sessionRef, update);
+ }
+ };
+
if (StringUtils.isEmpty(cmd.getScope())) {
- Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
+ ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), keys, callback);
} else {
- data.addAll(ctx.loadAttributes(deviceId, cmd.getScope(), keys));
+ ctx.loadAttributes(deviceId, cmd.getScope(), keys, callback);
}
+ } else {
+ PluginCallback<List<AttributeKvEntry>> callback = new PluginCallback<List<AttributeKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<AttributeKvEntry> data) {
+ List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
+ sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
- List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
- sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
+ Map<String, Long> subState = new HashMap<>(attributesData.size());
+ attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
- Map<String, Long> subState = new HashMap<>(keys.size());
- keys.forEach(key -> subState.put(key, 0L));
- attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
+ subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to fetch attributes!", e);
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ "Failed to fetch attributes!");
+ sendWsMsg(ctx, sessionRef, update);
+ }
+ };
- sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
- } else {
- List<AttributeKvEntry> data = new ArrayList<>();
if (StringUtils.isEmpty(cmd.getScope())) {
- Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
+ ctx.loadAttributes(deviceId, Arrays.asList(DataConstants.ALL_SCOPES), callback);
} else {
- data.addAll(ctx.loadAttributes(deviceId, cmd.getScope()));
+ ctx.loadAttributes(deviceId, cmd.getScope(), callback);
}
- List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
- sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
-
- Map<String, Long> subState = new HashMap<>(attributesData.size());
- attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
-
- sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, true, subState);
}
- subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
}
}
}
@@ -205,6 +232,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
@Override
public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to fetch data!", e);
SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
"Failed to fetch data!");
sendWsMsg(ctx, sessionRef, update);
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index 0624876..ea7a185 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -15,12 +15,14 @@
*/
package org.thingsboard.server.extensions.core.plugin.telemetry;
+import com.sun.javafx.collections.MappingChange;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.extensions.api.plugins.PluginCallback;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryRpcMsgHandler;
import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryWebsocketMsgHandler;
@@ -66,28 +68,49 @@ public class SubscriptionManager {
DeviceId deviceId = subscription.getDeviceId();
log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), deviceId, address);
registerSubscription(sessionId, deviceId, subscription);
- List<TsKvEntry> missedUpdates = new ArrayList<>();
if (subscription.getType() == SubscriptionType.ATTRIBUTES) {
- subscription.getKeyStates().entrySet().forEach(e -> {
- Optional<AttributeKvEntry> latestOpt = ctx.loadAttribute(deviceId, DataConstants.CLIENT_SCOPE, e.getKey());
- if (latestOpt.isPresent()) {
- AttributeKvEntry latestEntry = latestOpt.get();
- if (latestEntry.getLastUpdateTs() > e.getValue()) {
- missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
- }
+ final Map<String, Long> keyStates = subscription.getKeyStates();
+ ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keyStates.keySet(), new PluginCallback<List<AttributeKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<AttributeKvEntry> values) {
+ List<TsKvEntry> missedUpdates = new ArrayList<>();
+ values.forEach(latestEntry -> {
+ if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) {
+ missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
}
+ });
+ if (!missedUpdates.isEmpty()) {
+ rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
}
- );
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to fetch missed updates.", e);
+ }
+ });
} else if (subscription.getType() == SubscriptionType.TIMESERIES) {
long curTs = System.currentTimeMillis();
+ List<TsKvQuery> queries = new ArrayList<>();
subscription.getKeyStates().entrySet().forEach(e -> {
- TsKvQuery query = new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs);
- missedUpdates.addAll(ctx.loadTimeseries(deviceId, query));
+ queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
+ });
+
+ ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<TsKvEntry> missedUpdates) {
+ if (!missedUpdates.isEmpty()) {
+ rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+ }
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ log.error("Failed to fetch missed updates.", e);
+ }
});
}
- if (!missedUpdates.isEmpty()) {
- rpcHandler.onSubscriptionUpdate(ctx, address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
- }
+
}
private void registerSubscription(String sessionId, DeviceId deviceId, Subscription subscription) {