thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java 7(+7 -0)
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java 171(+139 -32)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java 2(+1 -1)
application/src/main/proto/cluster.proto 12(+12 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index cb097fb..fac5d97 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -241,6 +241,9 @@ public class DefaultActorService implements ActorService {
case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray());
break;
+ case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
+ actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
+ break;
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 6002b0e..c3ffbab 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -36,6 +36,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.utils.MiscUtils;
@@ -74,6 +75,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
@Lazy
private TelemetrySubscriptionService tsSubService;
+ @Autowired
+ @Lazy
+ private DeviceStateService deviceStateService;
+
private final List<DiscoveryServiceListener> listeners = new CopyOnWriteArrayList<>();
private CuratorFramework client;
@@ -203,6 +208,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
tsSubService.onClusterUpdate();
+ deviceStateService.onClusterUpdate();
listeners.forEach(listener -> listener.onServerAdded(instance));
break;
case CHILD_UPDATED:
@@ -210,6 +216,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
break;
case CHILD_REMOVED:
tsSubService.onClusterUpdate();
+ deviceStateService.onClusterUpdate();
listeners.forEach(listener -> listener.onServerRemoved(instance));
break;
default:
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
index 45b9696..a41fdee 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
@@ -23,11 +23,13 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
@@ -36,13 +38,19 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.TextPageLink;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.tenant.TenantService;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.Nullable;
@@ -54,6 +62,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -98,6 +107,12 @@ public class DefaultDeviceStateService implements DeviceStateService {
@Autowired
private TelemetrySubscriptionService tsSubService;
+ @Autowired
+ private ClusterRoutingService routingService;
+
+ @Autowired
+ private ClusterRpcService clusterRpcService;
+
@Value("${state.defaultInactivityTimeoutInSec}")
@Getter
private long defaultInactivityTimeoutInSec;
@@ -172,12 +187,57 @@ public class DefaultDeviceStateService implements DeviceStateService {
}
@Override
- public Optional<DeviceState> getDeviceState(DeviceId deviceId) {
- DeviceStateData state = deviceStates.get(deviceId);
- if (state != null) {
- return Optional.of(state.getState());
+ public void onClusterUpdate() {
+ queueExecutor.submit(this::onClusterUpdateSync);
+ }
+
+ @Override
+ public void onRemoteMsg(ServerAddress serverAddress, byte[] data) {
+ ClusterAPIProtos.DeviceStateServiceMsgProto proto;
+ try {
+ proto = ClusterAPIProtos.DeviceStateServiceMsgProto.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
+ DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB()));
+ if (proto.getDeleted()) {
+ queueExecutor.submit(() -> onDeviceDeleted(tenantId, deviceId));
} else {
- return Optional.empty();
+ Device device = deviceService.findDeviceById(deviceId);
+ if (device != null) {
+ if (proto.getAdded()) {
+ onDeviceAdded(device);
+ } else if (proto.getUpdated()) {
+ onDeviceUpdated(device);
+ }
+ }
+ }
+ }
+
+ private void onClusterUpdateSync() {
+ List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
+ for (Tenant tenant : tenants) {
+ List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
+ List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
+ for (Device device : devices) {
+ if (!routingService.resolveById(device.getId()).isPresent()) {
+ if (!deviceStates.containsKey(device.getId())) {
+ fetchFutures.add(fetchDeviceState(device));
+ }
+ } else {
+ Set<DeviceId> tenantDeviceSet = tenantDevices.get(tenant.getId());
+ if (tenantDeviceSet != null) {
+ tenantDeviceSet.remove(device.getId());
+ }
+ deviceStates.remove(device.getId());
+ }
+ }
+ try {
+ Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Failed to init device state service from DB", e);
+ }
}
}
@@ -187,7 +247,9 @@ public class DefaultDeviceStateService implements DeviceStateService {
List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
for (Device device : devices) {
- fetchFutures.add(fetchDeviceState(device));
+ if (!routingService.resolveById(device.getId()).isPresent()) {
+ fetchFutures.add(fetchDeviceState(device));
+ }
}
try {
Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
@@ -209,7 +271,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
DeviceStateData stateData = deviceStates.get(deviceId);
DeviceState state = stateData.getState();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
- if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) {
+ if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())) {
state.setLastInactivityAlarmTime(ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
@@ -219,7 +281,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
}
private void onDeviceConnectSync(DeviceId deviceId) {
- DeviceStateData stateData = deviceStates.get(deviceId);
+ DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
stateData.getState().setLastConnectTime(ts);
@@ -229,7 +291,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
}
private void onDeviceDisconnectSync(DeviceId deviceId) {
- DeviceStateData stateData = deviceStates.get(deviceId);
+ DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
stateData.getState().setLastDisconnectTime(ts);
@@ -239,7 +301,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
}
private void onDeviceActivitySync(DeviceId deviceId) {
- DeviceStateData stateData = deviceStates.get(deviceId);
+ DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
long ts = System.currentTimeMillis();
@@ -251,6 +313,23 @@ public class DefaultDeviceStateService implements DeviceStateService {
}
}
+ private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
+ DeviceStateData deviceStateData = deviceStates.get(deviceId);
+ if (deviceStateData == null) {
+ if (!routingService.resolveById(deviceId).isPresent()) {
+ Device device = deviceService.findDeviceById(deviceId);
+ if (device != null) {
+ try {
+ deviceStateData = fetchDeviceState(device).get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.debug("[{}] Failed to fetch device state!", deviceId, e);
+ }
+ }
+ }
+ }
+ return deviceStateData;
+ }
+
private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
if (inactivityTimeout == 0L) {
return;
@@ -269,37 +348,65 @@ public class DefaultDeviceStateService implements DeviceStateService {
}
private void onDeviceAddedSync(Device device) {
- Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
- @Override
- public void onSuccess(@Nullable DeviceStateData state) {
- addDeviceUsingState(state);
- }
+ Optional<ServerAddress> address = routingService.resolveById(device.getId());
+ if (!address.isPresent()) {
+ Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
+ @Override
+ public void onSuccess(@Nullable DeviceStateData state) {
+ addDeviceUsingState(state);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("Failed to register device to the state service", t);
+ }
+ });
+ } else {
+ sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), true, false, false);
+ }
+ }
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to register device to the state service", t);
- }
- });
+ private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, ServerAddress address, boolean added, boolean updated, boolean deleted) {
+ log.trace("[{}][{}] Device is monitored on other server: {}", tenantId, deviceId, address);
+ ClusterAPIProtos.DeviceStateServiceMsgProto.Builder builder = ClusterAPIProtos.DeviceStateServiceMsgProto.newBuilder();
+ builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
+ builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
+ builder.setDeviceIdMSB(deviceId.getId().getMostSignificantBits());
+ builder.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits());
+ builder.setAdded(added);
+ builder.setUpdated(updated);
+ builder.setDeleted(deleted);
+ clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_DEVICE_STATE_SERVICE_MESSAGE, builder.build().toByteArray());
}
private void onDeviceUpdatedSync(Device device) {
- DeviceStateData stateData = deviceStates.get(device.getId());
- if (stateData != null) {
- TbMsgMetaData md = new TbMsgMetaData();
- md.putValue("deviceName", device.getName());
- md.putValue("deviceType", device.getType());
- stateData.setMetaData(md);
+ Optional<ServerAddress> address = routingService.resolveById(device.getId());
+ if (!address.isPresent()) {
+ DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
+ if (stateData != null) {
+ TbMsgMetaData md = new TbMsgMetaData();
+ md.putValue("deviceName", device.getName());
+ md.putValue("deviceType", device.getType());
+ stateData.setMetaData(md);
+ }
+ } else {
+ sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), false, true, false);
}
}
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
- deviceStates.remove(deviceId);
- Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
- if (deviceIds != null) {
- deviceIds.remove(deviceId);
- if (deviceIds.isEmpty()) {
- tenantDevices.remove(tenantId);
+ Optional<ServerAddress> address = routingService.resolveById(deviceId);
+ if (!address.isPresent()) {
+ deviceStates.remove(deviceId);
+ Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
+ if (deviceIds != null) {
+ deviceIds.remove(deviceId);
+ if (deviceIds.isEmpty()) {
+ tenantDevices.remove(tenantId);
+ }
}
+ } else {
+ sendDeviceEvent(tenantId, deviceId, address.get(), false, false, true);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
index 37f785c..dab9e69 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.service.state;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
import java.util.Optional;
@@ -39,6 +40,7 @@ public interface DeviceStateService {
void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
- Optional<DeviceState> getDeviceState(DeviceId deviceId);
+ void onClusterUpdate();
+ void onRemoteMsg(ServerAddress serverAddress, byte[] bytes);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 0942b40..dbd58d5 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -362,7 +362,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
DonAsynchron.withCallback(tsService.findAll(entityId, queries),
missedUpdates -> {
- if (!missedUpdates.isEmpty()) {
+ if (missedUpdates != null && !missedUpdates.isEmpty()) {
tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
}
},
application/src/main/proto/cluster.proto 12(+12 -0)
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index ac96010..21c963b 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -57,6 +57,8 @@ enum MessageType {
CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE = 10;
CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE = 11;
CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
+
+ CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13;
}
// Messages related to CLUSTER_TELEMETRY_MESSAGE
@@ -128,3 +130,13 @@ message FromDeviceRPCResponseProto {
string response = 3;
int32 error = 4;
}
+
+message DeviceStateServiceMsgProto {
+ int64 tenantIdMSB = 1;
+ int64 tenantIdLSB = 2;
+ int64 deviceIdMSB = 3;
+ int64 deviceIdLSB = 4;
+ bool added = 5;
+ bool updated = 6;
+ bool deleted = 7;
+}
\ No newline at end of file