thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index cb097fb..fac5d97 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -241,6 +241,9 @@ public class DefaultActorService implements ActorService {
             case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
                 actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray());
                 break;
+            case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
+                actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
+                break;
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 6002b0e..c3ffbab 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -36,6 +36,7 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.service.state.DeviceStateService;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 import org.thingsboard.server.utils.MiscUtils;
 
@@ -74,6 +75,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     @Lazy
     private TelemetrySubscriptionService tsSubService;
 
+    @Autowired
+    @Lazy
+    private DeviceStateService deviceStateService;
+
     private final List<DiscoveryServiceListener> listeners = new CopyOnWriteArrayList<>();
 
     private CuratorFramework client;
@@ -203,6 +208,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
         switch (pathChildrenCacheEvent.getType()) {
             case CHILD_ADDED:
                 tsSubService.onClusterUpdate();
+                deviceStateService.onClusterUpdate();
                 listeners.forEach(listener -> listener.onServerAdded(instance));
                 break;
             case CHILD_UPDATED:
@@ -210,6 +216,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
                 break;
             case CHILD_REMOVED:
                 tsSubService.onClusterUpdate();
+                deviceStateService.onClusterUpdate();
                 listeners.forEach(listener -> listener.onServerRemoved(instance));
                 break;
             default:
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
index 45b9696..a41fdee 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
@@ -23,11 +23,13 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.RpcError;
 import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.Device;
@@ -36,13 +38,19 @@ import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.page.TextPageLink;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgDataType;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.dao.attributes.AttributesService;
 import org.thingsboard.server.dao.device.DeviceService;
 import org.thingsboard.server.dao.tenant.TenantService;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 
 import javax.annotation.Nullable;
@@ -54,6 +62,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -98,6 +107,12 @@ public class DefaultDeviceStateService implements DeviceStateService {
     @Autowired
     private TelemetrySubscriptionService tsSubService;
 
+    @Autowired
+    private ClusterRoutingService routingService;
+
+    @Autowired
+    private ClusterRpcService clusterRpcService;
+
     @Value("${state.defaultInactivityTimeoutInSec}")
     @Getter
     private long defaultInactivityTimeoutInSec;
@@ -172,12 +187,57 @@ public class DefaultDeviceStateService implements DeviceStateService {
     }
 
     @Override
-    public Optional<DeviceState> getDeviceState(DeviceId deviceId) {
-        DeviceStateData state = deviceStates.get(deviceId);
-        if (state != null) {
-            return Optional.of(state.getState());
+    public void onClusterUpdate() {
+        queueExecutor.submit(this::onClusterUpdateSync);
+    }
+
+    @Override
+    public void onRemoteMsg(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.DeviceStateServiceMsgProto proto;
+        try {
+            proto = ClusterAPIProtos.DeviceStateServiceMsgProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
+        DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB()));
+        if (proto.getDeleted()) {
+            queueExecutor.submit(() -> onDeviceDeleted(tenantId, deviceId));
         } else {
-            return Optional.empty();
+            Device device = deviceService.findDeviceById(deviceId);
+            if (device != null) {
+                if (proto.getAdded()) {
+                    onDeviceAdded(device);
+                } else if (proto.getUpdated()) {
+                    onDeviceUpdated(device);
+                }
+            }
+        }
+    }
+
+    private void onClusterUpdateSync() {
+        List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
+        for (Tenant tenant : tenants) {
+            List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
+            List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
+            for (Device device : devices) {
+                if (!routingService.resolveById(device.getId()).isPresent()) {
+                    if (!deviceStates.containsKey(device.getId())) {
+                        fetchFutures.add(fetchDeviceState(device));
+                    }
+                } else {
+                    Set<DeviceId> tenantDeviceSet = tenantDevices.get(tenant.getId());
+                    if (tenantDeviceSet != null) {
+                        tenantDeviceSet.remove(device.getId());
+                    }
+                    deviceStates.remove(device.getId());
+                }
+            }
+            try {
+                Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
+            } catch (InterruptedException | ExecutionException e) {
+                log.warn("Failed to init device state service from DB", e);
+            }
         }
     }
 
@@ -187,7 +247,9 @@ public class DefaultDeviceStateService implements DeviceStateService {
             List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
             List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
             for (Device device : devices) {
-                fetchFutures.add(fetchDeviceState(device));
+                if (!routingService.resolveById(device.getId()).isPresent()) {
+                    fetchFutures.add(fetchDeviceState(device));
+                }
             }
             try {
                 Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
@@ -209,7 +271,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
             DeviceStateData stateData = deviceStates.get(deviceId);
             DeviceState state = stateData.getState();
             state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
-            if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) {
+            if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())) {
                 state.setLastInactivityAlarmTime(ts);
                 pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
                 saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
@@ -219,7 +281,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
     }
 
     private void onDeviceConnectSync(DeviceId deviceId) {
-        DeviceStateData stateData = deviceStates.get(deviceId);
+        DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
         if (stateData != null) {
             long ts = System.currentTimeMillis();
             stateData.getState().setLastConnectTime(ts);
@@ -229,7 +291,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
     }
 
     private void onDeviceDisconnectSync(DeviceId deviceId) {
-        DeviceStateData stateData = deviceStates.get(deviceId);
+        DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
         if (stateData != null) {
             long ts = System.currentTimeMillis();
             stateData.getState().setLastDisconnectTime(ts);
@@ -239,7 +301,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
     }
 
     private void onDeviceActivitySync(DeviceId deviceId) {
-        DeviceStateData stateData = deviceStates.get(deviceId);
+        DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
         if (stateData != null) {
             DeviceState state = stateData.getState();
             long ts = System.currentTimeMillis();
@@ -251,6 +313,23 @@ public class DefaultDeviceStateService implements DeviceStateService {
         }
     }
 
+    private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
+        DeviceStateData deviceStateData = deviceStates.get(deviceId);
+        if (deviceStateData == null) {
+            if (!routingService.resolveById(deviceId).isPresent()) {
+                Device device = deviceService.findDeviceById(deviceId);
+                if (device != null) {
+                    try {
+                        deviceStateData = fetchDeviceState(device).get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        log.debug("[{}] Failed to fetch device state!", deviceId, e);
+                    }
+                }
+            }
+        }
+        return deviceStateData;
+    }
+
     private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
         if (inactivityTimeout == 0L) {
             return;
@@ -269,37 +348,65 @@ public class DefaultDeviceStateService implements DeviceStateService {
     }
 
     private void onDeviceAddedSync(Device device) {
-        Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
-            @Override
-            public void onSuccess(@Nullable DeviceStateData state) {
-                addDeviceUsingState(state);
-            }
+        Optional<ServerAddress> address = routingService.resolveById(device.getId());
+        if (!address.isPresent()) {
+            Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
+                @Override
+                public void onSuccess(@Nullable DeviceStateData state) {
+                    addDeviceUsingState(state);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    log.warn("Failed to register device to the state service", t);
+                }
+            });
+        } else {
+            sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), true, false, false);
+        }
+    }
 
-            @Override
-            public void onFailure(Throwable t) {
-                log.warn("Failed to register device to the state service", t);
-            }
-        });
+    private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, ServerAddress address, boolean added, boolean updated, boolean deleted) {
+        log.trace("[{}][{}] Device is monitored on other server: {}", tenantId, deviceId, address);
+        ClusterAPIProtos.DeviceStateServiceMsgProto.Builder builder = ClusterAPIProtos.DeviceStateServiceMsgProto.newBuilder();
+        builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
+        builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
+        builder.setDeviceIdMSB(deviceId.getId().getMostSignificantBits());
+        builder.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits());
+        builder.setAdded(added);
+        builder.setUpdated(updated);
+        builder.setDeleted(deleted);
+        clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_DEVICE_STATE_SERVICE_MESSAGE, builder.build().toByteArray());
     }
 
     private void onDeviceUpdatedSync(Device device) {
-        DeviceStateData stateData = deviceStates.get(device.getId());
-        if (stateData != null) {
-            TbMsgMetaData md = new TbMsgMetaData();
-            md.putValue("deviceName", device.getName());
-            md.putValue("deviceType", device.getType());
-            stateData.setMetaData(md);
+        Optional<ServerAddress> address = routingService.resolveById(device.getId());
+        if (!address.isPresent()) {
+            DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
+            if (stateData != null) {
+                TbMsgMetaData md = new TbMsgMetaData();
+                md.putValue("deviceName", device.getName());
+                md.putValue("deviceType", device.getType());
+                stateData.setMetaData(md);
+            }
+        } else {
+            sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), false, true, false);
         }
     }
 
     private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
-        deviceStates.remove(deviceId);
-        Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
-        if (deviceIds != null) {
-            deviceIds.remove(deviceId);
-            if (deviceIds.isEmpty()) {
-                tenantDevices.remove(tenantId);
+        Optional<ServerAddress> address = routingService.resolveById(deviceId);
+        if (!address.isPresent()) {
+            deviceStates.remove(deviceId);
+            Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
+            if (deviceIds != null) {
+                deviceIds.remove(deviceId);
+                if (deviceIds.isEmpty()) {
+                    tenantDevices.remove(tenantId);
+                }
             }
+        } else {
+            sendDeviceEvent(tenantId, deviceId, address.get(), false, false, true);
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
index 37f785c..dab9e69 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.service.state;
 
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 
 import java.util.Optional;
 
@@ -39,6 +40,7 @@ public interface DeviceStateService {
 
     void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
 
-    Optional<DeviceState> getDeviceState(DeviceId deviceId);
+    void onClusterUpdate();
 
+    void onRemoteMsg(ServerAddress serverAddress, byte[] bytes);
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 0942b40..dbd58d5 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -362,7 +362,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
 
             DonAsynchron.withCallback(tsService.findAll(entityId, queries),
                     missedUpdates -> {
-                        if (!missedUpdates.isEmpty()) {
+                        if (missedUpdates != null && !missedUpdates.isEmpty()) {
                             tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
                         }
                     },
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index ac96010..21c963b 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -57,6 +57,8 @@ enum MessageType {
   CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE = 10;
   CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE = 11;
   CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
+
+  CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13;
 }
 
 // Messages related to CLUSTER_TELEMETRY_MESSAGE
@@ -128,3 +130,13 @@ message FromDeviceRPCResponseProto {
     string response = 3;
     int32 error = 4;
 }
+
+message DeviceStateServiceMsgProto {
+    int64 tenantIdMSB = 1;
+    int64 tenantIdLSB = 2;
+    int64 deviceIdMSB = 3;
+    int64 deviceIdLSB = 4;
+    bool added = 5;
+    bool updated = 6;
+    bool deleted = 7;
+}
\ No newline at end of file