thingsboard-memoizeit

Fix for notification to the devices when shared attribute is

8/30/2018 5:42:53 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 263b484..5c85036 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -24,12 +24,15 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
+import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
 import org.thingsboard.rule.engine.api.util.DonAsynchron;
+import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
 import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
@@ -42,6 +45,7 @@ import org.thingsboard.server.common.data.kv.LongDataEntry;
 import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvQuery;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.attributes.AttributesService;
 import org.thingsboard.server.dao.timeseries.TimeseriesService;
@@ -101,6 +105,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     @Lazy
     private DeviceStateService stateService;
 
+    @Autowired
+    @Lazy
+    private ActorService actorService;
+
     private ExecutorService tsCallBackExecutor;
     private ExecutorService wsCallBackExecutor;
 
@@ -204,6 +212,13 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     }
 
     @Override
+    public void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKvEntry> attributes) {
+        DeviceAttributesEventNotificationMsg notificationMsg = DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
+                deviceId, DataConstants.SHARED_SCOPE, new ArrayList<>(attributes));
+        actorService.onMsg(new SendToClusterMsg(deviceId, notificationMsg));
+    }
+
+    @Override
     public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data) {
         ClusterAPIProtos.SubscriptionProto proto;
         try {
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
index aa57a02..c70b906 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
@@ -16,11 +16,14 @@
 package org.thingsboard.rule.engine.api;
 
 import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * Created by ashvayka on 02.04.18.
@@ -41,4 +44,6 @@ public interface RuleEngineTelemetryService {
 
     void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback);
 
+    void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKvEntry> attributes);
+
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java
index 4f82884..5cdb04e 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java
@@ -17,12 +17,15 @@ package org.thingsboard.rule.engine.telemetry;
 
 import com.google.gson.JsonParser;
 import lombok.extern.slf4j.Slf4j;
-import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 import org.thingsboard.rule.engine.api.RuleNode;
 import org.thingsboard.rule.engine.api.TbContext;
 import org.thingsboard.rule.engine.api.TbNode;
 import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.plugin.ComponentType;
 import org.thingsboard.server.common.msg.TbMsg;
@@ -62,6 +65,9 @@ public class TbMsgAttributesNode implements TbNode {
         String src = msg.getData();
         Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)).getAttributes();
         ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg));
+        if (msg.getOriginator().getEntityType() == EntityType.DEVICE && DataConstants.SHARED_SCOPE.equals(config.getScope())) {
+            ctx.getTelemetryService().onSharedAttributesUpdate(ctx.getTenantId(), new DeviceId(msg.getOriginator().getId()), attributes);
+        }
     }
 
     @Override