thingsboard-aplcache

Added DELETE attributes case handle

10/16/2018 9:26:00 AM

Details

diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java
index 51386dc..5d29a43 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java
@@ -17,7 +17,9 @@ package org.thingsboard.rule.engine.action;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
 import org.thingsboard.rule.engine.api.RuleNode;
@@ -25,7 +27,6 @@ import org.thingsboard.rule.engine.api.TbContext;
 import org.thingsboard.rule.engine.api.TbNode;
 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 import org.thingsboard.rule.engine.api.TbNodeException;
-import org.thingsboard.rule.engine.api.TbRelationTypes;
 import org.thingsboard.rule.engine.api.util.DonAsynchron;
 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 import org.thingsboard.server.common.data.DataConstants;
@@ -37,12 +38,11 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 
 @Slf4j
@@ -68,66 +68,93 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
     }
 
     @Override
-    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
-        if (!msg.getMetaData().getData().isEmpty()) {
-            long now = System.currentTimeMillis();
-            String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ?
-                    DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue("scope");
+    public void onMsg(TbContext ctx, TbMsg msg) {
+        if (DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType()) ||
+                DataConstants.ATTRIBUTES_DELETED.equals(msg.getType()) ||
+                SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType())) {
+            if (!msg.getMetaData().getData().isEmpty()) {
+                long now = System.currentTimeMillis();
+                String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ?
+                        DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue("scope");
 
-            ListenableFuture<List<EntityView>> entityViewsFuture =
-                    ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator());
+                ListenableFuture<List<EntityView>> entityViewsFuture =
+                        ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator());
 
-            DonAsynchron.withCallback(entityViewsFuture,
-                    entityViews -> {
-                        for (EntityView entityView : entityViews) {
-                            long startTime = entityView.getStartTimeMs();
-                            long endTime = entityView.getEndTimeMs();
-                            if ((endTime != 0  && endTime > now && startTime < now) || (endTime == 0 && startTime < now)) {
-                                Set<AttributeKvEntry> attributes =
-                                        JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())).getAttributes();
-                                List<AttributeKvEntry> filteredAttributes =
-                                        attributes.stream()
-                                                .filter(attr -> {
-                                                    switch (scope) {
-                                                        case DataConstants.CLIENT_SCOPE:
-                                                            if (entityView.getKeys().getAttributes().getCs().isEmpty()) {
-                                                                return true;
-                                                            }
-                                                            return entityView.getKeys().getAttributes().getCs().contains(attr.getKey());
-                                                        case DataConstants.SERVER_SCOPE:
-                                                            if (entityView.getKeys().getAttributes().getSs().isEmpty()) {
-                                                                return true;
-                                                            }
-                                                            return entityView.getKeys().getAttributes().getSs().contains(attr.getKey());
-                                                        case  DataConstants.SHARED_SCOPE:
-                                                            if (entityView.getKeys().getAttributes().getSh().isEmpty()) {
-                                                                return true;
-                                                            }
-                                                            return entityView.getKeys().getAttributes().getSh().contains(attr.getKey());
+                DonAsynchron.withCallback(entityViewsFuture,
+                        entityViews -> {
+                            for (EntityView entityView : entityViews) {
+                                long startTime = entityView.getStartTimeMs();
+                                long endTime = entityView.getEndTimeMs();
+                                if ((endTime != 0 && endTime > now && startTime < now) || (endTime == 0 && startTime < now)) {
+                                    if (DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType()) ||
+                                            SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType())) {
+                                        Set<AttributeKvEntry> attributes =
+                                                JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())).getAttributes();
+                                        List<AttributeKvEntry> filteredAttributes =
+                                                attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr.getKey(), entityView)).collect(Collectors.toList());
+                                        ctx.getTelemetryService().saveAndNotify(entityView.getId(), scope, filteredAttributes,
+                                                new FutureCallback<Void>() {
+                                                    @Override
+                                                    public void onSuccess(@Nullable Void result) {
+                                                        transformAndTellNext(ctx, msg, entityView);
                                                     }
-                                                    return false;
-                                                }).collect(Collectors.toList());
 
-                                ctx.getTelemetryService().saveAndNotify(entityView.getId(), scope, filteredAttributes,
-                                        new FutureCallback<Void>() {
-                                            @Override
-                                            public void onSuccess(@Nullable Void result) {
-                                                TbMsg updMsg = ctx.transformMsg(msg, msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData());
-                                                ctx.tellNext(updMsg, SUCCESS);
-                                            }
-
-                                            @Override
-                                            public void onFailure(Throwable t) {
-                                                ctx.tellFailure(msg, t);
+                                                    @Override
+                                                    public void onFailure(Throwable t) {
+                                                        ctx.tellFailure(msg, t);
+                                                    }
+                                                });
+                                    } else if (DataConstants.ATTRIBUTES_DELETED.equals(msg.getType())) {
+                                        List<String> attributes = new ArrayList<>();
+                                        for (JsonElement element : new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray()) {
+                                            if (element.isJsonPrimitive()) {
+                                                JsonPrimitive value = element.getAsJsonPrimitive();
+                                                if (value.isString()) {
+                                                    attributes.add(value.getAsString());
+                                                }
                                             }
-                                        });
+                                            List<String> filteredAttributes =
+                                                    attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr, entityView)).collect(Collectors.toList());
+                                            ctx.getAttributesService().removeAll(entityView.getId(), scope, filteredAttributes);
+                                        }
+                                        transformAndTellNext(ctx, msg, entityView);
+                                    }
+                                }
                             }
-                        }
-                    },
-                    t -> ctx.tellFailure(msg, t));
+                        },
+                        t -> ctx.tellFailure(msg, t));
+            } else {
+                ctx.tellFailure(msg, new IllegalArgumentException("Message metadata is empty"));
+            }
         } else {
-            ctx.tellNext(msg, FAILURE);
+            ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type [" + msg.getType() + "]"));
+        }
+    }
+
+    private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) {
+        TbMsg updMsg = ctx.transformMsg(msg, msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData());
+        ctx.tellNext(updMsg, SUCCESS);
+    }
+
+    private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) {
+        switch (scope) {
+            case DataConstants.CLIENT_SCOPE:
+                if (entityView.getKeys().getAttributes().getCs().isEmpty()) {
+                    return true;
+                }
+                return entityView.getKeys().getAttributes().getCs().contains(attrKey);
+            case DataConstants.SERVER_SCOPE:
+                if (entityView.getKeys().getAttributes().getSs().isEmpty()) {
+                    return true;
+                }
+                return entityView.getKeys().getAttributes().getSs().contains(attrKey);
+            case DataConstants.SHARED_SCOPE:
+                if (entityView.getKeys().getAttributes().getSh().isEmpty()) {
+                    return true;
+                }
+                return entityView.getKeys().getAttributes().getSh().contains(attrKey);
         }
+        return false;
     }
 
     @Override