diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java
index 51386dc..5d29a43 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java
@@ -17,7 +17,9 @@ package org.thingsboard.rule.engine.action;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
@@ -25,7 +27,6 @@ import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
-import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.util.DonAsynchron;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.DataConstants;
@@ -37,12 +38,11 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
@@ -68,66 +68,93 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
}
@Override
- public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
- if (!msg.getMetaData().getData().isEmpty()) {
- long now = System.currentTimeMillis();
- String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ?
- DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue("scope");
+ public void onMsg(TbContext ctx, TbMsg msg) {
+ if (DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType()) ||
+ DataConstants.ATTRIBUTES_DELETED.equals(msg.getType()) ||
+ SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType())) {
+ if (!msg.getMetaData().getData().isEmpty()) {
+ long now = System.currentTimeMillis();
+ String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ?
+ DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue("scope");
- ListenableFuture<List<EntityView>> entityViewsFuture =
- ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator());
+ ListenableFuture<List<EntityView>> entityViewsFuture =
+ ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator());
- DonAsynchron.withCallback(entityViewsFuture,
- entityViews -> {
- for (EntityView entityView : entityViews) {
- long startTime = entityView.getStartTimeMs();
- long endTime = entityView.getEndTimeMs();
- if ((endTime != 0 && endTime > now && startTime < now) || (endTime == 0 && startTime < now)) {
- Set<AttributeKvEntry> attributes =
- JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())).getAttributes();
- List<AttributeKvEntry> filteredAttributes =
- attributes.stream()
- .filter(attr -> {
- switch (scope) {
- case DataConstants.CLIENT_SCOPE:
- if (entityView.getKeys().getAttributes().getCs().isEmpty()) {
- return true;
- }
- return entityView.getKeys().getAttributes().getCs().contains(attr.getKey());
- case DataConstants.SERVER_SCOPE:
- if (entityView.getKeys().getAttributes().getSs().isEmpty()) {
- return true;
- }
- return entityView.getKeys().getAttributes().getSs().contains(attr.getKey());
- case DataConstants.SHARED_SCOPE:
- if (entityView.getKeys().getAttributes().getSh().isEmpty()) {
- return true;
- }
- return entityView.getKeys().getAttributes().getSh().contains(attr.getKey());
+ DonAsynchron.withCallback(entityViewsFuture,
+ entityViews -> {
+ for (EntityView entityView : entityViews) {
+ long startTime = entityView.getStartTimeMs();
+ long endTime = entityView.getEndTimeMs();
+ if ((endTime != 0 && endTime > now && startTime < now) || (endTime == 0 && startTime < now)) {
+ if (DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType()) ||
+ SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType())) {
+ Set<AttributeKvEntry> attributes =
+ JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())).getAttributes();
+ List<AttributeKvEntry> filteredAttributes =
+ attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr.getKey(), entityView)).collect(Collectors.toList());
+ ctx.getTelemetryService().saveAndNotify(entityView.getId(), scope, filteredAttributes,
+ new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ transformAndTellNext(ctx, msg, entityView);
}
- return false;
- }).collect(Collectors.toList());
- ctx.getTelemetryService().saveAndNotify(entityView.getId(), scope, filteredAttributes,
- new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable Void result) {
- TbMsg updMsg = ctx.transformMsg(msg, msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData());
- ctx.tellNext(updMsg, SUCCESS);
- }
-
- @Override
- public void onFailure(Throwable t) {
- ctx.tellFailure(msg, t);
+ @Override
+ public void onFailure(Throwable t) {
+ ctx.tellFailure(msg, t);
+ }
+ });
+ } else if (DataConstants.ATTRIBUTES_DELETED.equals(msg.getType())) {
+ List<String> attributes = new ArrayList<>();
+ for (JsonElement element : new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray()) {
+ if (element.isJsonPrimitive()) {
+ JsonPrimitive value = element.getAsJsonPrimitive();
+ if (value.isString()) {
+ attributes.add(value.getAsString());
+ }
}
- });
+ List<String> filteredAttributes =
+ attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr, entityView)).collect(Collectors.toList());
+ ctx.getAttributesService().removeAll(entityView.getId(), scope, filteredAttributes);
+ }
+ transformAndTellNext(ctx, msg, entityView);
+ }
+ }
}
- }
- },
- t -> ctx.tellFailure(msg, t));
+ },
+ t -> ctx.tellFailure(msg, t));
+ } else {
+ ctx.tellFailure(msg, new IllegalArgumentException("Message metadata is empty"));
+ }
} else {
- ctx.tellNext(msg, FAILURE);
+ ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type [" + msg.getType() + "]"));
+ }
+ }
+
+ private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) {
+ TbMsg updMsg = ctx.transformMsg(msg, msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData());
+ ctx.tellNext(updMsg, SUCCESS);
+ }
+
+ private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) {
+ switch (scope) {
+ case DataConstants.CLIENT_SCOPE:
+ if (entityView.getKeys().getAttributes().getCs().isEmpty()) {
+ return true;
+ }
+ return entityView.getKeys().getAttributes().getCs().contains(attrKey);
+ case DataConstants.SERVER_SCOPE:
+ if (entityView.getKeys().getAttributes().getSs().isEmpty()) {
+ return true;
+ }
+ return entityView.getKeys().getAttributes().getSs().contains(attrKey);
+ case DataConstants.SHARED_SCOPE:
+ if (entityView.getKeys().getAttributes().getSh().isEmpty()) {
+ return true;
+ }
+ return entityView.getKeys().getAttributes().getSh().contains(attrKey);
}
+ return false;
}
@Override